source: lib/format_ndag.c @ 9993cde

cachetimestampsdevelopdpdk-ndagetsiliverc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 9993cde was 9993cde, checked in by Shane Alcock <salcock@…>, 3 years ago

Added code path in format_ndag for machines without recvmmsg.

The performance will likely be terrible and the code is completely
untested, but at least libtrace will compile on those systems.

  • Property mode set to 100644
File size: 43.2 KB
Line 
1
2#define _GNU_SOURCE
3
4#include "config.h"
5#include "common.h"
6#include "libtrace.h"
7#include "libtrace_int.h"
8#include "format_helper.h"
9#include "format_erf.h"
10
11#include <assert.h>
12#include <errno.h>
13#include <fcntl.h>
14#include <stdio.h>
15#include <string.h>
16#include <unistd.h>
17#include <stdlib.h>
18#include <net/if.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netdb.h>
22
23#include "format_ndag.h"
24
25#define NDAG_IDLE_TIMEOUT (600)
26#define ENCAP_BUFSIZE (10000)
27#define CTRL_BUF_SIZE (10000)
28#define ENCAP_BUFFERS (1000)
29
30#define RECV_BATCH_SIZE (50)
31
32#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
33
34static struct libtrace_format_t ndag;
35
36volatile int ndag_paused = 0;
37
38typedef struct monitor {
39        uint16_t monitorid;
40        uint64_t laststart;
41} ndag_monitor_t;
42
43
44typedef struct streamsource {
45        uint16_t monitor;
46        char *groupaddr;
47        char *localiface;
48        uint16_t port;
49} streamsource_t;
50
51typedef struct streamsock {
52        char *groupaddr;
53        int sock;
54        struct addrinfo *srcaddr;
55        uint16_t port;
56        uint32_t expectedseq;
57        ndag_monitor_t *monitorptr;
58        char **saved;
59        char *nextread;
60        int nextreadind;
61        int nextwriteind;
62        int savedsize[ENCAP_BUFFERS];
63        uint32_t startidle;
64        uint64_t recordcount;
65
66        int bufavail;
67
68#if HAVE_RECVMMSG
69        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
70#else
71        struct msghdr singlemsg;
72#endif
73
74} streamsock_t;
75
76typedef struct recvstream {
77        streamsock_t *sources;
78        uint16_t sourcecount;
79        libtrace_message_queue_t mqueue;
80        int threadindex;
81        ndag_monitor_t *knownmonitors;
82        uint16_t monitorcount;
83
84        uint64_t dropped_upstream;
85        uint64_t missing_records;
86        uint64_t received_packets;
87} recvstream_t;
88
89typedef struct ndag_format_data {
90        char *multicastgroup;
91        char *portstr;
92        char *localiface;
93        uint16_t nextthreadid;
94        recvstream_t *receivers;
95
96        pthread_t controlthread;
97        libtrace_message_queue_t controlqueue;
98} ndag_format_data_t;
99
100enum {
101        NDAG_CLIENT_HALT = 0x01,
102        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
103        NDAG_CLIENT_NEWGROUP = 0x03
104};
105
106typedef struct ndagreadermessage {
107        uint8_t type;
108        streamsource_t contents;
109} ndag_internal_message_t;
110
111
112static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
113
114        /* Calculate seq_a - seq_b, taking wraparound into account */
115        if (seq_a == seq_b) return 0;
116
117        if (seq_a > seq_b) {
118                return (int) (seq_a - seq_b);
119        }
120
121        /* -1 for the wrap and another -1 because we don't use zero */
122        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
123}
124
125static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
126        ndag_common_t *header = (ndag_common_t *)msgbuf;
127
128        if (msgsize < sizeof(ndag_common_t)) {
129                fprintf(stderr,
130                        "nDAG message does not have a complete nDAG header.\n");
131                return 0;
132        }
133
134        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
135                fprintf(stderr,
136                        "nDAG message does not have a valid magic number.\n");
137                return 0;
138        }
139
140        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
141                fprintf(stderr,
142                        "nDAG message has an invalid header version: %u\n",
143                                header->version);
144                return 0;
145        }
146
147        return header->type;
148}
149
150static int join_multicast_group(char *groupaddr, char *localiface,
151        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
152
153        struct addrinfo hints;
154        struct addrinfo *gotten;
155        struct addrinfo *group;
156        unsigned int interface;
157        char pstr[16];
158        struct group_req greq;
159        int bufsize;
160
161        int sock;
162
163        if (portstr == NULL) {
164                snprintf(pstr, 15, "%u", portnum);
165                portstr = pstr;
166        }
167
168        interface = if_nametoindex(localiface);
169        if (interface == 0) {
170                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
171                                localiface, strerror(errno));
172                return -1;
173        }
174
175        hints.ai_family = PF_UNSPEC;
176        hints.ai_socktype = SOCK_DGRAM;
177        hints.ai_flags = AI_PASSIVE;
178        hints.ai_protocol = 0;
179
180        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
181                fprintf(stderr,
182                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
183                                portstr, strerror(errno));
184                return -1;
185        }
186
187        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
188                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
189                                groupaddr, strerror(errno));
190                return -1;
191        }
192
193        *srcinfo = gotten;
194        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
195        if (sock < 0) {
196                fprintf(stderr,
197                        "Failed to create multicast socket for %s:%s -- %s\n",
198                                groupaddr, portstr, strerror(errno));
199                goto sockcreateover;
200        }
201
202        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
203        {
204                fprintf(stderr,
205                        "Failed to bind to multicast socket %s:%s -- %s\n",
206                                groupaddr, portstr, strerror(errno));
207                close(sock);
208                sock = -1;
209                goto sockcreateover;
210        }
211
212        greq.gr_interface = interface;
213        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
214
215        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
216                        sizeof(greq)) < 0) {
217                fprintf(stderr,
218                        "Failed to join multicast group %s:%s -- %s\n",
219                                groupaddr, portstr, strerror(errno));
220                close(sock);
221                sock = -1;
222                goto sockcreateover;
223        }
224
225        bufsize = 16 * 1024 * 1024;
226        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
227                                (socklen_t)sizeof(int)) < 0) {
228
229                fprintf(stderr,
230                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
231                                groupaddr, portstr, strerror(errno));
232                close(sock);
233                sock = -1;
234                goto sockcreateover;
235        }
236
237sockcreateover:
238        freeaddrinfo(group);
239        return sock;
240}
241
242
243static int ndag_init_input(libtrace_t *libtrace) {
244
245        char *scan = NULL;
246        char *next = NULL;
247
248        libtrace->format_data = (ndag_format_data_t *)malloc(
249                        sizeof(ndag_format_data_t));
250
251        FORMAT_DATA->multicastgroup = NULL;
252        FORMAT_DATA->portstr = NULL;
253        FORMAT_DATA->localiface = NULL;
254        FORMAT_DATA->nextthreadid = 0;
255        FORMAT_DATA->receivers = NULL;
256
257        scan = strchr(libtrace->uridata, ',');
258        if (scan == NULL) {
259                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
260                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
261                return -1;
262        }
263        FORMAT_DATA->localiface = strndup(libtrace->uridata,
264                        (size_t)(scan - libtrace->uridata));
265        next = scan + 1;
266
267        scan = strchr(next, ',');
268        if (scan == NULL) {
269                FORMAT_DATA->portstr = strdup("9001");
270                FORMAT_DATA->multicastgroup = strdup(next);
271        } else {
272                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
273
274                FORMAT_DATA->portstr = strdup(scan + 1);
275        }
276        return 0;
277}
278
279static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
280                uint16_t portnum, uint16_t monid) {
281
282        ndag_internal_message_t alert;
283
284        alert.type = NDAG_CLIENT_NEWGROUP;
285        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
286        alert.contents.localiface = FORMAT_DATA->localiface;
287        alert.contents.port = portnum;
288        alert.contents.monitor = monid;
289
290        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
291                        (void *)&alert);
292
293}
294       
295static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
296                int msgsize, uint16_t *ptmap) {
297
298        int i;
299        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
300        uint8_t msgtype;
301
302        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
303        if (msgtype == 0) {
304                return -1;
305        }
306
307        msgsize -= sizeof(ndag_common_t);
308        if (msgtype == NDAG_PKT_BEACON) {
309                /* If message is a beacon, make sure every port included in the
310                 * beacon is assigned to a receive thread.
311                 */
312                uint16_t *ptr, numstreams;
313
314                if ((uint32_t)msgsize < sizeof(uint16_t)) {
315                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
316                        return -1;
317                }
318
319                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
320                numstreams = ntohs(*ptr);
321                ptr ++;
322
323                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
324                {
325                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
326                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
327                        return -1;
328                }
329
330                for (i = 0; i < numstreams; i++) {
331                        uint16_t streamport = ntohs(*ptr);
332
333                        if (ptmap[streamport] == 0xffff) {
334                                new_group_alert(libtrace,
335                                        FORMAT_DATA->nextthreadid, streamport,
336                                        ntohs(ndaghdr->monitorid));
337
338                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
339
340                                if (libtrace->perpkt_thread_count == 0) {
341                                        FORMAT_DATA->nextthreadid = 0;
342                                } else {
343                                        FORMAT_DATA->nextthreadid =
344                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
345                                }
346                        }
347
348                        ptr ++;
349                }
350        } else {
351                fprintf(stderr,
352                        "Unexpected message type on control channel: %u\n",
353                         msgtype);
354                return -1;
355        }
356
357        return 0;
358
359}
360
361static void *ndag_controller_run(void *tdata) {
362
363        libtrace_t *libtrace = (libtrace_t *)tdata;
364        uint16_t ptmap[65536];
365        int sock = -1;
366        struct addrinfo *receiveaddr = NULL;
367        fd_set listening;
368        struct timeval timeout;
369
370        /* ptmap is a dirty hack to allow us to quickly check if we've already
371         * assigned a stream to a thread.
372         */
373        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
374
375        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
376                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
377                        &receiveaddr);
378        if (sock == -1) {
379                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
380                        "Unable to join multicast group for nDAG control channel");
381                trace_interrupt();
382        }
383
384        ndag_paused = 0;
385        while ((is_halted(libtrace) == -1) && !ndag_paused) {
386                int ret;
387                char buf[CTRL_BUF_SIZE];
388
389                FD_ZERO(&listening);
390                FD_SET(sock, &listening);
391
392                timeout.tv_sec = 0;
393                timeout.tv_usec = 500000;
394
395                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
396                if (ret < 0) {
397                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
398                        break;
399                }
400
401                if (!FD_ISSET(sock, &listening)) {
402                        continue;
403                }
404
405                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
406                                receiveaddr->ai_addr,
407                                &(receiveaddr->ai_addrlen));
408                if (ret < 0) {
409                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
410                        break;
411                }
412
413                if (ret == 0) {
414                        break;
415                }
416
417                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
418                        fprintf(stderr, "Error while parsing nDAG control message.\n");
419                        continue;
420                }
421        }
422
423        if (sock >= 0) {
424                close(sock);
425        }
426
427        /* Control channel has fallen over, should probably encourage libtrace
428         * to halt the receiver threads as well.
429         */
430        if (!is_halted(libtrace)) {
431                trace_interrupt();
432        }
433
434        pthread_exit(NULL);
435}
436
437static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
438{
439        int ret;
440        uint32_t i;
441        /* Configure the set of receiver threads */
442
443        if (FORMAT_DATA->receivers == NULL) {
444                /* What if the number of threads changes between a pause and
445                 * a restart? Can this happen? */
446                FORMAT_DATA->receivers = (recvstream_t *)
447                                malloc(sizeof(recvstream_t) * maxthreads);
448        }
449
450        for (i = 0; i < maxthreads; i++) {
451                FORMAT_DATA->receivers[i].sources = NULL;
452                FORMAT_DATA->receivers[i].sourcecount = 0;
453                FORMAT_DATA->receivers[i].knownmonitors = NULL;
454                FORMAT_DATA->receivers[i].monitorcount = 0;
455                FORMAT_DATA->receivers[i].threadindex = i;
456                FORMAT_DATA->receivers[i].dropped_upstream = 0;
457                FORMAT_DATA->receivers[i].received_packets = 0;
458                FORMAT_DATA->receivers[i].missing_records = 0;
459
460                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
461                                sizeof(ndag_internal_message_t));
462        }
463
464        /* Start the controller thread */
465        /* TODO consider affinity of this thread? */
466
467        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
468                        ndag_controller_run, libtrace);
469        if (ret != 0) {
470                return -1;
471        }
472        return maxthreads;
473}
474
475static int ndag_start_input(libtrace_t *libtrace) {
476        return ndag_start_threads(libtrace, 1);
477}
478
479static int ndag_pstart_input(libtrace_t *libtrace) {
480        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
481                        libtrace->perpkt_thread_count)
482                return 0;
483        return -1;
484}
485
486static void halt_ndag_receiver(recvstream_t *receiver) {
487        int j, i;
488        libtrace_message_queue_destroy(&(receiver->mqueue));
489
490        if (receiver->sources == NULL)
491                return;
492        for (i = 0; i < receiver->sourcecount; i++) {
493                streamsock_t src = receiver->sources[i];
494                if (src.saved) {
495                        for (j = 0; j < ENCAP_BUFFERS; j++) {
496                                if (src.saved[j]) {
497                                        free(src.saved[j]);
498                                }
499                        }
500                        free(src.saved);
501                }
502
503#if HAVE_RECVMMSG
504                for (j = 0; j < RECV_BATCH_SIZE; j++) {
505                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
506                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
507                        }
508                }
509#else
510                free(src.singlemsg.msg_iov);
511#endif
512
513                close(src.sock);
514        }
515        if (receiver->knownmonitors) {
516                free(receiver->knownmonitors);
517        }
518
519        if (receiver->sources) {
520                free(receiver->sources);
521        }
522}
523
524static int ndag_pause_input(libtrace_t *libtrace) {
525        int i;
526
527        /* Close the existing receiver sockets */
528        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
529               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
530        }
531        return 0;
532}
533
534static int ndag_fin_input(libtrace_t *libtrace) {
535
536        if (FORMAT_DATA->receivers) {
537                free(FORMAT_DATA->receivers);
538        }
539        if (FORMAT_DATA->multicastgroup) {
540                free(FORMAT_DATA->multicastgroup);
541        }
542        if (FORMAT_DATA->portstr) {
543                free(FORMAT_DATA->portstr);
544        }
545        if (FORMAT_DATA->localiface) {
546                free(FORMAT_DATA->localiface);
547        }
548
549        free(libtrace->format_data);
550        return 0;
551}
552
553static int ndag_prepare_packet_stream(libtrace_t *libtrace,
554                recvstream_t *rt,
555                streamsock_t *ssock, libtrace_packet_t *packet,
556                uint32_t flags) {
557
558        dag_record_t *erfptr;
559        ndag_encap_t *encaphdr;
560        uint16_t ndag_reccount = 0;
561        int nr;
562
563        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
564                packet->buf_control = TRACE_CTRL_PACKET;
565        } else {
566                packet->buf_control = TRACE_CTRL_EXTERNAL;
567        }
568
569        packet->trace = libtrace;
570        packet->buffer = ssock->nextread;
571        packet->header = ssock->nextread;
572        packet->type = TRACE_RT_DATA_ERF;
573
574        erfptr = (dag_record_t *)packet->header;
575
576        if (erfptr->flags.rxerror == 1) {
577                packet->payload = NULL;
578                erfptr->rlen = htons(erf_get_framing_length(packet));
579        } else {
580                packet->payload = (char *)packet->buffer +
581                                erf_get_framing_length(packet);
582        }
583
584        /* Update upstream drops using lctr */
585
586        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
587                /* TODO */
588        } else {
589                if (rt->received_packets > 0) {
590                        rt->dropped_upstream += ntohs(erfptr->lctr);
591                }
592        }
593
594        rt->received_packets ++;
595        ssock->recordcount += 1;
596
597        nr = ssock->nextreadind;
598        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
599                        sizeof(ndag_common_t));
600
601        ndag_reccount = ntohs(encaphdr->recordcount);
602        if ((ndag_reccount & 0x8000) != 0) {
603                /* Record was truncated -- update rlen appropriately */
604                erfptr->rlen = htons(ssock->savedsize[nr] -
605                                (ssock->nextread - ssock->saved[nr]));
606        }
607        ssock->nextread += ntohs(erfptr->rlen);
608
609        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
610                /* Read everything from this buffer, mark as empty and
611                 * move on. */
612                ssock->savedsize[nr] = 0;
613                ssock->bufavail ++;
614
615                assert(ssock->bufavail > 0 && ssock->bufavail <= ENCAP_BUFFERS);
616                nr ++;
617                if (nr == ENCAP_BUFFERS) {
618                        nr = 0;
619                }
620                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
621                                sizeof(ndag_encap_t);
622                ssock->nextreadind = nr;
623        }
624
625        packet->order = erf_get_erf_timestamp(packet);
626        packet->error = packet->payload ? ntohs(erfptr->rlen) :
627                        erf_get_framing_length(packet);
628
629        return ntohs(erfptr->rlen);
630}
631
632static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
633                libtrace_packet_t *packet UNUSED,
634                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
635                uint32_t flags UNUSED) {
636
637        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
638        return 0;
639
640}
641
642static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
643
644        ndag_monitor_t *mon;
645
646        if (rt->monitorcount == 0) {
647                rt->knownmonitors = (ndag_monitor_t *)
648                                malloc(sizeof(ndag_monitor_t) * 5);
649        } else {
650                rt->knownmonitors = (ndag_monitor_t *)
651                            realloc(rt->knownmonitors,
652                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
653        }
654
655        mon = &(rt->knownmonitors[rt->monitorcount]);
656        mon->monitorid = monid;
657        mon->laststart = 0;
658
659        rt->monitorcount ++;
660        return mon;
661}
662
663static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
664
665        streamsock_t *ssock = NULL;
666        ndag_monitor_t *mon = NULL;
667        int i;
668
669        /* TODO consider replacing this with a list or vector so we can
670         * easily remove sources that are no longer in use, rather than
671         * just setting the sock to -1 and having to check them every
672         * time we want to read a packet.
673         */
674        if (rt->sourcecount == 0) {
675                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
676        } else if ((rt->sourcecount % 10) == 0) {
677                rt->sources = (streamsock_t *)realloc(rt->sources,
678                        sizeof(streamsock_t) * (rt->sourcecount + 10));
679        }
680
681        ssock = &(rt->sources[rt->sourcecount]);
682
683        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
684                        NULL, src.port, &(ssock->srcaddr));
685
686        if (ssock->sock < 0) {
687                return -1;
688        }
689
690        for (i = 0; i < rt->monitorcount; i++) {
691                if (rt->knownmonitors[i].monitorid == src.monitor) {
692                        mon = &(rt->knownmonitors[i]);
693                        break;
694                }
695        }
696
697        if (mon == NULL) {
698                mon = add_new_knownmonitor(rt, src.monitor);
699        }
700
701        ssock->port = src.port;
702        ssock->groupaddr = src.groupaddr;
703        ssock->expectedseq = 0;
704        ssock->monitorptr = mon;
705        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
706        ssock->bufavail = ENCAP_BUFFERS;
707        ssock->startidle = 0;
708
709        for (i = 0; i < ENCAP_BUFFERS; i++) {
710                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
711                ssock->savedsize[i] = 0;
712        }
713
714#if HAVE_RECVMMSG
715        for (i = 0; i < RECV_BATCH_SIZE; i++) {
716                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
717                                malloc(sizeof(struct iovec));
718                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
719                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
720                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
721                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
722                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
723                ssock->mmsgbufs[i].msg_len = 0;
724        }
725#else
726        ssock->singlemsg.msg_iov = (struct iovec *) malloc(sizeof(struct iovec));
727#endif
728
729        ssock->nextread = NULL;;
730        ssock->nextreadind = 0;
731        ssock->recordcount = 0;
732        rt->sourcecount += 1;
733
734        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
735                        ssock->groupaddr, ssock->port, rt->threadindex);
736
737        return ssock->port;
738}
739
740static int receiver_read_messages(recvstream_t *rt) {
741
742        ndag_internal_message_t msg;
743
744        while (libtrace_message_queue_try_get(&(rt->mqueue),
745                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
746                switch(msg.type) {
747                        case NDAG_CLIENT_NEWGROUP:
748                                if (add_new_streamsock(rt, msg.contents) < 0) {
749                                        return -1;
750                                }
751                                break;
752                        case NDAG_CLIENT_HALT:
753                                return 0;
754                }
755        }
756        return 1;
757
758}
759
760static inline int readable_data(streamsock_t *ssock) {
761
762        if (ssock->sock == -1) {
763                return 0;
764        }
765        if (ssock->savedsize[ssock->nextreadind] == 0) {
766                return 0;
767        }
768        /*
769        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
770                        ssock->savedsize[ssock->nextreadind]) {
771                return 0;
772        }
773        */
774        return 1;
775
776
777}
778
779static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
780
781        int i;
782        for (i = 0; i < rt->sourcecount; i++) {
783                if (rt->sources[i].monitorptr == mon) {
784                        rt->sources[i].expectedseq = 0;
785                }
786        }
787
788}
789
790static int init_receivers(streamsock_t *ssock, int required) {
791
792        int wind = ssock->nextwriteind;
793        int i = 1;
794
795#if HAVE_RECVMMSG
796        for (i = 0; i < required; i++) {
797                if (i >= RECV_BATCH_SIZE) {
798                        break;
799                }
800
801                if (wind >= ENCAP_BUFFERS) {
802                        wind = 0;
803                }
804
805                ssock->mmsgbufs[i].msg_len = 0;
806                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
807                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
808                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
809
810                wind ++;
811        }
812#else
813        assert(required > 0);
814        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
815        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
816        ssock->singlemsg.msg_iovlen = 1;
817#endif
818        return i;
819}
820
821static int check_ndag_received(streamsock_t *ssock, int index,
822                unsigned int msglen, recvstream_t *rt) {
823
824        ndag_encap_t *encaphdr;
825        ndag_monitor_t *mon;
826        uint8_t rectype;
827
828        /* Check that we have a valid nDAG encap record */
829        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
830
831        if (rectype == NDAG_PKT_KEEPALIVE) {
832                /* Keep-alive, reset startidle and carry on. Don't
833                 * change nextwrite -- we want to overwrite the
834                 * keep-alive with usable content. */
835                return 0;
836        } else if (rectype != NDAG_PKT_ENCAPERF) {
837                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
838                                ssock->groupaddr, ssock->port);
839                close(ssock->sock);
840                ssock->sock = -1;
841                return -1;
842        }
843
844        ssock->savedsize[index] = msglen;
845        ssock->nextwriteind ++;
846        ssock->bufavail --;
847
848        assert(ssock->bufavail >= 0);
849
850        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
851                ssock->nextwriteind = 0;
852        }
853
854        /* Get the useful info from the encap header */
855        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
856
857        mon = ssock->monitorptr;
858
859        if (mon->laststart == 0) {
860                mon->laststart = bswap_be_to_host64(encaphdr->started);
861        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
862                mon->laststart = bswap_be_to_host64(encaphdr->started);
863                reset_expected_seqs(rt, mon);
864
865                /* TODO what is a good way to indicate this to clients?
866                 * set the loss counter in the ERF header? a bit rude?
867                 * use another bit in the ERF header?
868                 * add a queryable flag to libtrace_packet_t?
869                 */
870
871        }
872
873        if (ssock->expectedseq != 0) {
874                rt->missing_records += seq_cmp(
875                                ntohl(encaphdr->seqno), ssock->expectedseq);
876        }
877        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
878        if (ssock->expectedseq == 0) {
879                ssock->expectedseq ++;
880        }
881
882        if (ssock->nextread == NULL) {
883                /* If this is our first read, set up 'nextread'
884                 * by skipping past the nDAG headers */
885                ssock->nextread = ssock->saved[0] +
886                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
887        }
888        return 1;
889
890}
891
892static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
893                int *gottime, recvstream_t *rt) {
894
895        int ret, ndagstat, avail;
896        int toret = 0;
897
898#if HAVE_RECVMMSG
899        int i;
900#endif
901
902        if (ssock->sock == -1) {
903                return 0;
904        }
905
906#if HAVE_RECVMMSG
907        /* Plenty of full buffers, just use the packets in those */
908        if (ssock->bufavail < RECV_BATCH_SIZE / 2) {
909                return 1;
910        }
911#else
912        if (ssock->bufavail == 0) {
913                return 1;
914        }
915#endif
916
917        avail = init_receivers(ssock, ssock->bufavail);
918
919#if HAVE_RECVMMSG
920        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
921                        MSG_DONTWAIT, NULL);
922#else
923        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
924#endif
925        if (ret < 0) {
926                /* Nothing to receive right now, but we should still
927                 * count as 'ready' if at least one buffer is full */
928                if (errno == EAGAIN || errno == EWOULDBLOCK) {
929                        if (readable_data(ssock)) {
930                                toret = 1;
931                        }
932                        if (!(*gottime)) {
933                                gettimeofday(tv, NULL);
934                                *gottime = 1;
935                        }
936                        if (ssock->startidle == 0) {
937                                ssock->startidle = tv->tv_sec;
938                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
939                                fprintf(stderr,
940                                        "Closing channel %s:%u due to inactivity.\n",
941                                        ssock->groupaddr,
942                                        ssock->port);
943
944                                close(ssock->sock);
945                                ssock->sock = -1;
946                        }
947                } else {
948
949                        fprintf(stderr,
950                                "Error receiving encapsulated records from %s:%u -- %s \n",
951                                ssock->groupaddr, ssock->port,
952                                strerror(errno));
953                        close(ssock->sock);
954                        ssock->sock = -1;
955                }
956                return toret;
957        }
958
959        ssock->startidle = 0;
960
961#if HAVE_RECVMMSG
962        for (i = 0; i < ret; i++) {
963                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
964                                ssock->mmsgbufs[i].msg_len, rt);
965                if (ndagstat == -1) {
966                        break;
967                }
968
969                if (ndagstat == 1) {
970                        toret = 1;
971                }
972        }
973#else
974        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
975        if (ndagstat <= 0) {
976                toret = 0;
977        } else {
978                toret = 1;
979        }
980#endif
981
982        return toret;
983}
984
985static int receive_from_sockets(recvstream_t *rt) {
986
987        int i, readybufs, gottime;
988        struct timeval tv;
989
990        readybufs = 0;
991        gottime = 0;
992
993        for (i = 0; i < rt->sourcecount; i ++) {
994                readybufs += receive_from_single_socket(&(rt->sources[i]),
995                                &tv, &gottime, rt);
996        }
997
998        return readybufs;
999
1000}
1001
1002
1003static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1004                libtrace_packet_t *packet) {
1005
1006        int iserr = 0;
1007
1008        if (packet->buf_control == TRACE_CTRL_PACKET) {
1009                free(packet->buffer);
1010                packet->buffer = NULL;
1011        }
1012
1013        do {
1014                /* Make sure we shouldn't be halting */
1015                if ((iserr = is_halted(libtrace)) != -1) {
1016                        return iserr;
1017                }
1018
1019                /* Check for any messages from the control thread */
1020                iserr = receiver_read_messages(rt);
1021
1022                if (iserr <= 0) {
1023                        return iserr;
1024                }
1025
1026                /* If blocking and no sources, sleep for a bit and then try
1027                 * checking for messages again.
1028                 */
1029                if (rt->sourcecount == 0) {
1030                        usleep(10000);
1031                        continue;
1032                }
1033
1034                if ((iserr = receive_from_sockets(rt)) < 0) {
1035                        return iserr;
1036                } else if (iserr > 0) {
1037                        /* At least one of our input sockets has available
1038                         * data, let's go ahead and use what we have. */
1039                        break;
1040                }
1041
1042                /* None of our sources have anything available, we can take
1043                 * a short break rather than immediately trying again.
1044                 */
1045                if (iserr == 0) {
1046                        usleep(100);
1047                }
1048
1049        } while (1);
1050
1051        return iserr;
1052}
1053
1054static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1055                libtrace_packet_t *packet) {
1056
1057        int iserr = 0;
1058
1059        if (packet->buf_control == TRACE_CTRL_PACKET) {
1060                free(packet->buffer);
1061                packet->buffer = NULL;
1062        }
1063
1064        /* Make sure we shouldn't be halting */
1065        if ((iserr = is_halted(libtrace)) != -1) {
1066                return iserr;
1067        }
1068
1069        /* If non-blocking and there are no sources, just break */
1070        if (rt->sourcecount == 0) {
1071                return 0;
1072        }
1073
1074        return receive_from_sockets(rt);
1075}
1076
1077static streamsock_t *select_next_packet(recvstream_t *rt) {
1078        int i;
1079        streamsock_t *ssock = NULL;
1080        uint64_t earliest = 0;
1081        uint64_t currentts = 0;
1082        dag_record_t *daghdr;
1083
1084        for (i = 0; i < rt->sourcecount; i ++) {
1085                if (!readable_data(&(rt->sources[i]))) {
1086                        continue;
1087                }
1088
1089                daghdr = (dag_record_t *)(rt->sources[i].nextread);
1090                currentts = bswap_le_to_host64(daghdr->ts);
1091
1092                if (earliest == 0 || earliest > currentts) {
1093                        earliest = currentts;
1094                        ssock = &(rt->sources[i]);
1095                }
1096                /*
1097                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
1098                                i, currentts,
1099                                rt->sources[i].recordcount,
1100                                rt->missing_records);
1101                */
1102        }
1103        return ssock;
1104}
1105
1106static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1107
1108        int rem;
1109        streamsock_t *nextavail = NULL;
1110        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1111                        packet);
1112
1113        if (rem <= 0) {
1114                return rem;
1115        }
1116
1117        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1118        if (nextavail == NULL) {
1119                return 0;
1120        }
1121
1122        /* nextread should point at an ERF header, so prepare 'packet' to be
1123         * a libtrace ERF packet. */
1124
1125        return ndag_prepare_packet_stream(libtrace,
1126                        &(FORMAT_DATA->receivers[0]), nextavail,
1127                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1128}
1129
1130static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1131                libtrace_packet_t **packets, size_t nb_packets) {
1132
1133        recvstream_t *rt;
1134        int rem;
1135        size_t read_packets = 0;
1136        streamsock_t *nextavail = NULL;
1137
1138        rt = (recvstream_t *)t->format_data;
1139
1140
1141        do {
1142                /* Only check for messages once per batch */
1143                if (read_packets == 0) {
1144                        rem = receive_encap_records_block(libtrace, rt,
1145                                packets[read_packets]);
1146                } else {
1147                        rem = receive_encap_records_nonblock(libtrace, rt,
1148                                packets[read_packets]);
1149                }
1150
1151                if (rem < 0) {
1152                        return rem;
1153                }
1154
1155                if (rem == 0) {
1156                        break;
1157                }
1158
1159                nextavail = select_next_packet(rt);
1160                if (nextavail == NULL) {
1161                        break;
1162                }
1163
1164                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1165                                packets[read_packets],
1166                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1167
1168                read_packets  ++;
1169                if (read_packets >= nb_packets) {
1170                        break;
1171                }
1172        } while (1);
1173
1174        return read_packets;
1175
1176}
1177
1178static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1179                libtrace_packet_t *packet) {
1180
1181
1182        libtrace_eventobj_t event = {0,0,0.0,0};
1183        int rem;
1184        streamsock_t *nextavail = NULL;
1185
1186        /* Only check for messages once per call */
1187        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1188        if (rem <= 0) {
1189                event.type = TRACE_EVENT_TERMINATE;
1190                return event;
1191        }
1192
1193        do {
1194                rem = receive_encap_records_nonblock(libtrace,
1195                                &(FORMAT_DATA->receivers[0]), packet);
1196
1197                if (rem < 0) {
1198                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1199                                "Received invalid nDAG records.");
1200                        event.type = TRACE_EVENT_TERMINATE;
1201                        break;
1202                }
1203
1204                if (rem == 0) {
1205                        /* Either we've been halted or we've got no packets
1206                         * right now. */
1207                        if (is_halted(libtrace) == 0) {
1208                                event.type = TRACE_EVENT_TERMINATE;
1209                                break;
1210                        }
1211                        event.type = TRACE_EVENT_SLEEP;
1212                        event.seconds = 0.0001;
1213                        break;
1214                }
1215
1216                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1217                if (nextavail == NULL) {
1218                        event.type = TRACE_EVENT_SLEEP;
1219                        event.seconds = 0.0001;
1220                        break;
1221                }
1222
1223                event.type = TRACE_EVENT_PACKET;
1224                ndag_prepare_packet_stream(libtrace,
1225                                &(FORMAT_DATA->receivers[0]), nextavail,
1226                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1227                event.size = trace_get_capture_length(packet) +
1228                                trace_get_framing_length(packet);
1229
1230                if (libtrace->filter) {
1231                        int filtret = trace_apply_filter(libtrace->filter,
1232                                        packet);
1233                        if (filtret == -1) {
1234                                trace_set_err(libtrace,
1235                                                TRACE_ERR_BAD_FILTER,
1236                                                "Bad BPF Filter");
1237                                event.type = TRACE_EVENT_TERMINATE;
1238                                break;
1239                        }
1240
1241                        if (filtret == 0) {
1242                                /* Didn't match filter, try next one */
1243                                libtrace->filtered_packets ++;
1244                                trace_clear_cache(packet);
1245                                continue;
1246                        }
1247                }
1248
1249                if (libtrace->snaplen > 0) {
1250                        trace_set_capture_length(packet, libtrace->snaplen);
1251                }
1252                libtrace->accepted_packets ++;
1253                break;
1254        } while (1);
1255
1256        return event;
1257}
1258
1259static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1260
1261        int i;
1262
1263        stat->dropped_valid = 1;
1264        stat->dropped = 0;
1265        stat->received_valid = 1;
1266        stat->received = 0;
1267        stat->missing_valid = 1;
1268        stat->missing = 0;
1269
1270        /* TODO Is this thread safe? */
1271        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1272                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1273                stat->received += FORMAT_DATA->receivers[i].received_packets;
1274                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1275        }
1276
1277}
1278
1279static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1280                libtrace_stat_t *stat) {
1281
1282        recvstream_t *recvr = (recvstream_t *)t->format_data;
1283
1284        if (libtrace == NULL)
1285                return;
1286        /* TODO Is this thread safe */
1287        stat->dropped_valid = 1;
1288        stat->dropped = recvr->dropped_upstream;
1289
1290        stat->received_valid = 1;
1291        stat->received = recvr->received_packets;
1292
1293        stat->missing_valid = 1;
1294        stat->missing = recvr->missing_records;
1295
1296}
1297
1298static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1299                bool reader) {
1300        recvstream_t *recvr;
1301
1302        if (!reader || t->type != THREAD_PERPKT) {
1303                return 0;
1304        }
1305
1306        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1307        t->format_data = recvr;
1308
1309        return 0;
1310}
1311
1312static struct libtrace_format_t ndag = {
1313
1314        "ndag",
1315        "",
1316        TRACE_FORMAT_NDAG,
1317        NULL,                   /* probe filename */
1318        NULL,                   /* probe magic */
1319        ndag_init_input,        /* init_input */
1320        NULL,                   /* config_input */
1321        ndag_start_input,       /* start_input */
1322        ndag_pause_input,       /* pause_input */
1323        NULL,                   /* init_output */
1324        NULL,                   /* config_output */
1325        NULL,                   /* start_output */
1326        ndag_fin_input,         /* fin_input */
1327        NULL,                   /* fin_output */
1328        ndag_read_packet,       /* read_packet */
1329        ndag_prepare_packet,    /* prepare_packet */
1330        NULL,                   /* fin_packet */
1331        NULL,                   /* write_packet */
1332        erf_get_link_type,      /* get_link_type */
1333        erf_get_direction,      /* get_direction */
1334        erf_set_direction,      /* set_direction */
1335        erf_get_erf_timestamp,  /* get_erf_timestamp */
1336        NULL,                   /* get_timeval */
1337        NULL,                   /* get_seconds */
1338        NULL,                   /* get_timespec */
1339        NULL,                   /* seek_erf */
1340        NULL,                   /* seek_timeval */
1341        NULL,                   /* seek_seconds */
1342        erf_get_capture_length, /* get_capture_length */
1343        erf_get_wire_length,    /* get_wire_length */
1344        erf_get_framing_length, /* get_framing_length */
1345        erf_set_capture_length, /* set_capture_length */
1346        NULL,                   /* get_received_packets */
1347        NULL,                   /* get_filtered_packets */
1348        NULL,                   /* get_dropped_packets */
1349        ndag_get_statistics,    /* get_statistics */
1350        NULL,                   /* get_fd */
1351        trace_event_ndag,       /* trace_event */
1352        NULL,                   /* help */
1353        NULL,                   /* next pointer */
1354        {true, 0},              /* live packet capture */
1355        ndag_pstart_input,      /* parallel start */
1356        ndag_pread_packets,     /* parallel read */
1357        ndag_pause_input,       /* parallel pause */
1358        NULL,
1359        ndag_pregister_thread,  /* register thread */
1360        NULL,
1361        ndag_get_thread_stats   /* per-thread stats */
1362};
1363
1364void ndag_constructor(void) {
1365        register_format(&ndag);
1366}
Note: See TracBrowser for help on using the repository browser.