source: lib/format_ndag.c @ 8d24b12

develop
Last change on this file since 8d24b12 was 8d24b12, checked in by Shane Alcock <salcock@…>, 2 years ago

Fix unused variable warning in format_ndag when recvmmsg is missing.

  • Property mode set to 100644
File size: 46.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <assert.h>
38#include <errno.h>
39#include <fcntl.h>
40#include <stdio.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <net/if.h>
45#include <sys/types.h>
46#include <sys/socket.h>
47#include <netdb.h>
48
49#include "format_ndag.h"
50
51#define NDAG_IDLE_TIMEOUT (600)
52#define ENCAP_BUFSIZE (10000)
53#define CTRL_BUF_SIZE (10000)
54#define ENCAP_BUFFERS (1000)
55
56#define RECV_BATCH_SIZE (50)
57
58#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
59
60static struct libtrace_format_t ndag;
61
62volatile int ndag_paused = 0;
63
64typedef struct monitor {
65        uint16_t monitorid;
66        uint64_t laststart;
67} ndag_monitor_t;
68
69
70typedef struct streamsource {
71        uint16_t monitor;
72        char *groupaddr;
73        char *localiface;
74        uint16_t port;
75} streamsource_t;
76
77typedef struct streamsock {
78        char *groupaddr;
79        int sock;
80        struct addrinfo *srcaddr;
81        uint16_t port;
82        uint32_t expectedseq;
83        ndag_monitor_t *monitorptr;
84        char **saved;
85        char *nextread;
86        int nextreadind;
87        int nextwriteind;
88        int savedsize[ENCAP_BUFFERS];
89        uint64_t nextts;
90        uint32_t startidle;
91        uint64_t recordcount;
92
93        int bufavail;
94        int bufwaiting;
95
96#if HAVE_DECL_RECVMMSG
97        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
98#else
99        struct msghdr singlemsg;
100#endif
101
102} streamsock_t;
103
104typedef struct recvstream {
105        streamsock_t *sources;
106        uint16_t sourcecount;
107        libtrace_message_queue_t mqueue;
108        int threadindex;
109        ndag_monitor_t *knownmonitors;
110        uint16_t monitorcount;
111
112        uint64_t dropped_upstream;
113        uint64_t missing_records;
114        uint64_t received_packets;
115
116        int maxfd;
117} recvstream_t;
118
119typedef struct ndag_format_data {
120        char *multicastgroup;
121        char *portstr;
122        char *localiface;
123        uint16_t nextthreadid;
124        recvstream_t *receivers;
125
126        pthread_t controlthread;
127        libtrace_message_queue_t controlqueue;
128} ndag_format_data_t;
129
130enum {
131        NDAG_CLIENT_HALT = 0x01,
132        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
133        NDAG_CLIENT_NEWGROUP = 0x03
134};
135
136typedef struct ndagreadermessage {
137        uint8_t type;
138        streamsource_t contents;
139} ndag_internal_message_t;
140
141
142static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
143
144        /* Calculate seq_a - seq_b, taking wraparound into account */
145        if (seq_a == seq_b) return 0;
146
147        if (seq_a > seq_b) {
148                return (int) (seq_a - seq_b);
149        }
150
151        /* -1 for the wrap and another -1 because we don't use zero */
152        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
153}
154
155static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
156        ndag_common_t *header = (ndag_common_t *)msgbuf;
157
158        if (msgsize < sizeof(ndag_common_t)) {
159                fprintf(stderr,
160                        "nDAG message does not have a complete nDAG header.\n");
161                return 0;
162        }
163
164        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
165                fprintf(stderr,
166                        "nDAG message does not have a valid magic number.\n");
167                return 0;
168        }
169
170        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
171                fprintf(stderr,
172                        "nDAG message has an invalid header version: %u\n",
173                                header->version);
174                return 0;
175        }
176
177        return header->type;
178}
179
180static int join_multicast_group(char *groupaddr, char *localiface,
181        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
182
183        struct addrinfo hints;
184        struct addrinfo *gotten;
185        struct addrinfo *group;
186        unsigned int interface;
187        char pstr[16];
188        struct group_req greq;
189        int bufsize, val;
190
191        int sock;
192
193        if (portstr == NULL) {
194                snprintf(pstr, 15, "%u", portnum);
195                portstr = pstr;
196        }
197
198        interface = if_nametoindex(localiface);
199        if (interface == 0) {
200                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
201                                localiface, strerror(errno));
202                return -1;
203        }
204
205        hints.ai_family = PF_UNSPEC;
206        hints.ai_socktype = SOCK_DGRAM;
207        hints.ai_flags = AI_PASSIVE;
208        hints.ai_protocol = 0;
209
210        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
211                fprintf(stderr,
212                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
213                                portstr, strerror(errno));
214                return -1;
215        }
216
217        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
218                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
219                                groupaddr, strerror(errno));
220                return -1;
221        }
222
223        *srcinfo = gotten;
224        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
225        if (sock < 0) {
226                fprintf(stderr,
227                        "Failed to create multicast socket for %s:%s -- %s\n",
228                                groupaddr, portstr, strerror(errno));
229                goto sockcreateover;
230        }
231
232        val = 1;
233        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
234                fprintf(stderr,
235                        "Failed to set REUSEADDR socket option for %s:%s -- %s\n",
236                                groupaddr, portstr, strerror(errno));
237                goto sockcreateover;
238        }
239
240        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
241        {
242                fprintf(stderr,
243                        "Failed to bind to multicast socket %s:%s -- %s\n",
244                                groupaddr, portstr, strerror(errno));
245                sock = -1;
246                goto sockcreateover;
247        }
248
249        greq.gr_interface = interface;
250        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
251
252        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
253                        sizeof(greq)) < 0) {
254                fprintf(stderr,
255                        "Failed to join multicast group %s:%s -- %s\n",
256                                groupaddr, portstr, strerror(errno));
257                close(sock);
258                sock = -1;
259                goto sockcreateover;
260        }
261
262        bufsize = 16 * 1024 * 1024;
263        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
264                                (socklen_t)sizeof(int)) < 0) {
265
266                fprintf(stderr,
267                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
268                                groupaddr, portstr, strerror(errno));
269                close(sock);
270                sock = -1;
271                goto sockcreateover;
272        }
273
274sockcreateover:
275        freeaddrinfo(group);
276        return sock;
277}
278
279
280static int ndag_init_input(libtrace_t *libtrace) {
281
282        char *scan = NULL;
283        char *next = NULL;
284
285        libtrace->format_data = (ndag_format_data_t *)malloc(
286                        sizeof(ndag_format_data_t));
287
288        FORMAT_DATA->multicastgroup = NULL;
289        FORMAT_DATA->portstr = NULL;
290        FORMAT_DATA->localiface = NULL;
291        FORMAT_DATA->nextthreadid = 0;
292        FORMAT_DATA->receivers = NULL;
293
294        scan = strchr(libtrace->uridata, ',');
295        if (scan == NULL) {
296                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
297                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
298                return -1;
299        }
300        FORMAT_DATA->localiface = strndup(libtrace->uridata,
301                        (size_t)(scan - libtrace->uridata));
302        next = scan + 1;
303
304        scan = strchr(next, ',');
305        if (scan == NULL) {
306                FORMAT_DATA->portstr = strdup("9001");
307                FORMAT_DATA->multicastgroup = strdup(next);
308        } else {
309                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
310
311                FORMAT_DATA->portstr = strdup(scan + 1);
312        }
313        return 0;
314}
315
316static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
317                uint16_t portnum, uint16_t monid) {
318
319        ndag_internal_message_t alert;
320
321        alert.type = NDAG_CLIENT_NEWGROUP;
322        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
323        alert.contents.localiface = FORMAT_DATA->localiface;
324        alert.contents.port = portnum;
325        alert.contents.monitor = monid;
326
327        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
328                        (void *)&alert);
329
330}
331
332static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
333                int msgsize, uint16_t *ptmap) {
334
335        int i;
336        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
337        uint8_t msgtype;
338
339        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
340        if (msgtype == 0) {
341                return -1;
342        }
343
344        msgsize -= sizeof(ndag_common_t);
345        if (msgtype == NDAG_PKT_BEACON) {
346                /* If message is a beacon, make sure every port included in the
347                 * beacon is assigned to a receive thread.
348                 */
349                uint16_t *ptr, numstreams;
350
351                if ((uint32_t)msgsize < sizeof(uint16_t)) {
352                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
353                        return -1;
354                }
355
356                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
357                numstreams = ntohs(*ptr);
358                ptr ++;
359
360                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
361                {
362                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
363                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
364                        return -1;
365                }
366
367                for (i = 0; i < numstreams; i++) {
368                        uint16_t streamport = ntohs(*ptr);
369
370                        if (ptmap[streamport] == 0xffff) {
371                                new_group_alert(libtrace,
372                                        FORMAT_DATA->nextthreadid, streamport,
373                                        ntohs(ndaghdr->monitorid));
374
375                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
376
377                                if (libtrace->perpkt_thread_count == 0) {
378                                        FORMAT_DATA->nextthreadid = 0;
379                                } else {
380                                        FORMAT_DATA->nextthreadid =
381                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
382                                }
383                        }
384
385                        ptr ++;
386                }
387        } else {
388                fprintf(stderr,
389                        "Unexpected message type on control channel: %u\n",
390                         msgtype);
391                return -1;
392        }
393
394        return 0;
395
396}
397
398static void *ndag_controller_run(void *tdata) {
399
400        libtrace_t *libtrace = (libtrace_t *)tdata;
401        uint16_t ptmap[65536];
402        int sock = -1;
403        struct addrinfo *receiveaddr = NULL;
404        fd_set listening;
405        struct timeval timeout;
406
407        /* ptmap is a dirty hack to allow us to quickly check if we've already
408         * assigned a stream to a thread.
409         */
410        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
411
412        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
413                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
414                        &receiveaddr);
415        if (sock == -1) {
416                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
417                        "Unable to join multicast group for nDAG control channel");
418                trace_interrupt();
419                pthread_exit(NULL);
420        }
421
422        ndag_paused = 0;
423        while ((is_halted(libtrace) == -1) && !ndag_paused) {
424                int ret;
425                char buf[CTRL_BUF_SIZE];
426
427                FD_ZERO(&listening);
428                FD_SET(sock, &listening);
429
430                timeout.tv_sec = 0;
431                timeout.tv_usec = 500000;
432
433                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
434                if (ret < 0) {
435                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
436                        break;
437                }
438
439                if (!FD_ISSET(sock, &listening)) {
440                        continue;
441                }
442
443                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
444                                receiveaddr->ai_addr,
445                                &(receiveaddr->ai_addrlen));
446                if (ret < 0) {
447                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
448                        break;
449                }
450
451                if (ret == 0) {
452                        break;
453                }
454
455                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
456                        fprintf(stderr, "Error while parsing nDAG control message.\n");
457                        continue;
458                }
459        }
460
461        if (sock >= 0) {
462                close(sock);
463        }
464
465        /* Control channel has fallen over, should probably encourage libtrace
466         * to halt the receiver threads as well.
467         */
468        if (!is_halted(libtrace)) {
469                trace_interrupt();
470        }
471
472        pthread_exit(NULL);
473}
474
475static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
476{
477        int ret;
478        uint32_t i;
479        /* Configure the set of receiver threads */
480
481        if (FORMAT_DATA->receivers == NULL) {
482                /* What if the number of threads changes between a pause and
483                 * a restart? Can this happen? */
484                FORMAT_DATA->receivers = (recvstream_t *)
485                                malloc(sizeof(recvstream_t) * maxthreads);
486        }
487
488        for (i = 0; i < maxthreads; i++) {
489                FORMAT_DATA->receivers[i].sources = NULL;
490                FORMAT_DATA->receivers[i].sourcecount = 0;
491                FORMAT_DATA->receivers[i].knownmonitors = NULL;
492                FORMAT_DATA->receivers[i].monitorcount = 0;
493                FORMAT_DATA->receivers[i].threadindex = i;
494                FORMAT_DATA->receivers[i].dropped_upstream = 0;
495                FORMAT_DATA->receivers[i].received_packets = 0;
496                FORMAT_DATA->receivers[i].missing_records = 0;
497                FORMAT_DATA->receivers[i].maxfd = -1;
498
499                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
500                                sizeof(ndag_internal_message_t));
501        }
502
503        /* Start the controller thread */
504        /* TODO consider affinity of this thread? */
505
506        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
507                        ndag_controller_run, libtrace);
508        if (ret != 0) {
509                return -1;
510        }
511        return maxthreads;
512}
513
514static int ndag_start_input(libtrace_t *libtrace) {
515        return ndag_start_threads(libtrace, 1);
516}
517
518static int ndag_pstart_input(libtrace_t *libtrace) {
519        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
520                        libtrace->perpkt_thread_count)
521                return 0;
522        return -1;
523}
524
525static void halt_ndag_receiver(recvstream_t *receiver) {
526        int j, i;
527        libtrace_message_queue_destroy(&(receiver->mqueue));
528
529        if (receiver->sources == NULL)
530                return;
531        for (i = 0; i < receiver->sourcecount; i++) {
532                streamsock_t src = receiver->sources[i];
533                if (src.saved) {
534                        for (j = 0; j < ENCAP_BUFFERS; j++) {
535                                if (src.saved[j]) {
536                                        free(src.saved[j]);
537                                }
538                        }
539                        free(src.saved);
540                }
541
542#if HAVE_DECL_RECVMMSG
543                for (j = 0; j < RECV_BATCH_SIZE; j++) {
544                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
545                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
546                        }
547                }
548#else
549                free(src.singlemsg.msg_iov);
550#endif
551
552                if (src.sock != -1) {
553                        close(src.sock);
554                }
555        }
556        if (receiver->knownmonitors) {
557                free(receiver->knownmonitors);
558        }
559
560        if (receiver->sources) {
561                free(receiver->sources);
562        }
563}
564
565static int ndag_pause_input(libtrace_t *libtrace) {
566        int i;
567
568        /* Close the existing receiver sockets */
569        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
570               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
571        }
572        return 0;
573}
574
575static int ndag_fin_input(libtrace_t *libtrace) {
576
577        if (FORMAT_DATA->receivers) {
578                free(FORMAT_DATA->receivers);
579        }
580        if (FORMAT_DATA->multicastgroup) {
581                free(FORMAT_DATA->multicastgroup);
582        }
583        if (FORMAT_DATA->portstr) {
584                free(FORMAT_DATA->portstr);
585        }
586        if (FORMAT_DATA->localiface) {
587                free(FORMAT_DATA->localiface);
588        }
589
590        free(libtrace->format_data);
591        return 0;
592}
593
594static int ndag_prepare_packet_stream(libtrace_t *libtrace,
595                recvstream_t *rt,
596                streamsock_t *ssock, libtrace_packet_t *packet,
597                uint32_t flags) {
598
599        dag_record_t *erfptr;
600        ndag_encap_t *encaphdr;
601        uint16_t ndag_reccount = 0;
602        int nr;
603        uint16_t rlen;
604
605        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
606                packet->buf_control = TRACE_CTRL_PACKET;
607        } else {
608                packet->buf_control = TRACE_CTRL_EXTERNAL;
609        }
610
611        packet->trace = libtrace;
612        packet->buffer = ssock->nextread;
613        packet->header = ssock->nextread;
614        packet->type = TRACE_RT_DATA_ERF;
615
616        erfptr = (dag_record_t *)packet->header;
617
618        if (erfptr->flags.rxerror == 1) {
619                packet->payload = NULL;
620                erfptr->rlen = htons(erf_get_framing_length(packet));
621        } else {
622                packet->payload = (char *)packet->buffer +
623                                erf_get_framing_length(packet);
624        }
625
626        /* Update upstream drops using lctr */
627
628        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
629                /* TODO */
630        } else {
631                if (rt->received_packets > 0) {
632                        rt->dropped_upstream += ntohs(erfptr->lctr);
633                }
634        }
635
636        rt->received_packets ++;
637        ssock->recordcount += 1;
638
639        nr = ssock->nextreadind;
640        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
641                        sizeof(ndag_common_t));
642
643        ndag_reccount = ntohs(encaphdr->recordcount);
644        if ((ndag_reccount & 0x8000) != 0) {
645                /* Record was truncated -- update rlen appropriately */
646                rlen = ssock->savedsize[nr] -
647                                (ssock->nextread - ssock->saved[nr]);
648                erfptr->rlen = htons(rlen);
649        } else {
650                rlen = ntohs(erfptr->rlen);
651        }
652        ssock->nextread += rlen;
653        ssock->nextts = 0;
654
655        assert(ssock->nextread - ssock->saved[nr] <= ssock->savedsize[nr]);
656
657        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
658                /* Read everything from this buffer, mark as empty and
659                 * move on. */
660                ssock->savedsize[nr] = 0;
661                ssock->bufwaiting ++;
662
663                nr ++;
664                if (nr == ENCAP_BUFFERS) {
665                        nr = 0;
666                }
667                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
668                                sizeof(ndag_encap_t);
669                ssock->nextreadind = nr;
670        }
671
672        packet->order = erf_get_erf_timestamp(packet);
673        packet->error = rlen;
674        return rlen;
675}
676
677static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
678                libtrace_packet_t *packet UNUSED,
679                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
680                uint32_t flags UNUSED) {
681
682        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
683        return 0;
684
685}
686
687static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
688
689        ndag_monitor_t *mon;
690
691        if (rt->monitorcount == 0) {
692                rt->knownmonitors = (ndag_monitor_t *)
693                                malloc(sizeof(ndag_monitor_t) * 5);
694        } else {
695                rt->knownmonitors = (ndag_monitor_t *)
696                            realloc(rt->knownmonitors,
697                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
698        }
699
700        mon = &(rt->knownmonitors[rt->monitorcount]);
701        mon->monitorid = monid;
702        mon->laststart = 0;
703
704        rt->monitorcount ++;
705        return mon;
706}
707
708static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
709
710        streamsock_t *ssock = NULL;
711        ndag_monitor_t *mon = NULL;
712        int i;
713
714        /* TODO consider replacing this with a list or vector so we can
715         * easily remove sources that are no longer in use, rather than
716         * just setting the sock to -1 and having to check them every
717         * time we want to read a packet.
718         */
719        if (rt->sourcecount == 0) {
720                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
721        } else if ((rt->sourcecount % 10) == 0) {
722                rt->sources = (streamsock_t *)realloc(rt->sources,
723                        sizeof(streamsock_t) * (rt->sourcecount + 10));
724        }
725
726        ssock = &(rt->sources[rt->sourcecount]);
727
728        for (i = 0; i < rt->monitorcount; i++) {
729                if (rt->knownmonitors[i].monitorid == src.monitor) {
730                        mon = &(rt->knownmonitors[i]);
731                        break;
732                }
733        }
734
735        if (mon == NULL) {
736                mon = add_new_knownmonitor(rt, src.monitor);
737        }
738
739        ssock->port = src.port;
740        ssock->groupaddr = src.groupaddr;
741        ssock->expectedseq = 0;
742        ssock->monitorptr = mon;
743        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
744        ssock->bufavail = ENCAP_BUFFERS;
745        ssock->bufwaiting = 0;
746        ssock->startidle = 0;
747        ssock->nextts = 0;
748
749        for (i = 0; i < ENCAP_BUFFERS; i++) {
750                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
751                ssock->savedsize[i] = 0;
752        }
753
754#if HAVE_DECL_RECVMMSG
755        for (i = 0; i < RECV_BATCH_SIZE; i++) {
756                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
757                                malloc(sizeof(struct iovec));
758                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
759                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
760                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
761                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
762                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
763                ssock->mmsgbufs[i].msg_len = 0;
764        }
765#else
766        ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec));
767#endif
768
769        ssock->nextread = NULL;;
770        ssock->nextreadind = 0;
771        ssock->nextwriteind = 0;
772        ssock->recordcount = 0;
773        rt->sourcecount += 1;
774
775        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
776                        NULL, src.port, &(ssock->srcaddr));
777
778        if (ssock->sock < 0) {
779                return -1;
780        }
781
782        if (ssock->sock > rt->maxfd) {
783                rt->maxfd = ssock->sock;
784        }
785
786        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
787                        ssock->groupaddr, ssock->port, rt->threadindex);
788
789        return ssock->port;
790}
791
792static int receiver_read_messages(recvstream_t *rt) {
793
794        ndag_internal_message_t msg;
795
796        while (libtrace_message_queue_try_get(&(rt->mqueue),
797                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
798                switch(msg.type) {
799                        case NDAG_CLIENT_NEWGROUP:
800                                if (add_new_streamsock(rt, msg.contents) < 0) {
801                                        return -1;
802                                }
803                                break;
804                        case NDAG_CLIENT_HALT:
805                                return 0;
806                }
807        }
808        return 1;
809
810}
811
812static inline int readable_data(streamsock_t *ssock) {
813
814        if (ssock->sock == -1) {
815                return 0;
816        }
817        if (ssock->savedsize[ssock->nextreadind] == 0) {
818                return 0;
819        }
820        /*
821        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
822                        ssock->savedsize[ssock->nextreadind]) {
823                return 0;
824        }
825        */
826        return 1;
827
828
829}
830
831static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
832
833        int i;
834        for (i = 0; i < rt->sourcecount; i++) {
835                if (rt->sources[i].monitorptr == mon) {
836                        rt->sources[i].expectedseq = 0;
837                }
838        }
839
840}
841
842static int init_receivers(streamsock_t *ssock, int required) {
843
844        int wind = ssock->nextwriteind;
845        int i = 1;
846
847#if HAVE_DECL_RECVMMSG
848        for (i = 0; i < required; i++) {
849                if (i >= RECV_BATCH_SIZE) {
850                        break;
851                }
852
853                if (wind >= ENCAP_BUFFERS) {
854                        wind = 0;
855                }
856
857                ssock->mmsgbufs[i].msg_len = 0;
858                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
859                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
860                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
861
862                wind ++;
863        }
864#else
865        assert(required > 0);
866        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
867        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
868        ssock->singlemsg.msg_iovlen = 1;
869#endif
870        return i;
871}
872
873static int check_ndag_received(streamsock_t *ssock, int index,
874                unsigned int msglen, recvstream_t *rt) {
875
876        ndag_encap_t *encaphdr;
877        ndag_monitor_t *mon;
878        uint8_t rectype;
879
880        /* Check that we have a valid nDAG encap record */
881        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
882
883        if (rectype == NDAG_PKT_KEEPALIVE) {
884                /* Keep-alive, reset startidle and carry on. Don't
885                 * change nextwrite -- we want to overwrite the
886                 * keep-alive with usable content. */
887                return 0;
888        } else if (rectype != NDAG_PKT_ENCAPERF) {
889                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
890                                ssock->groupaddr, ssock->port);
891                close(ssock->sock);
892                ssock->sock = -1;
893                return -1;
894        }
895
896        ssock->savedsize[index] = msglen;
897        ssock->nextwriteind ++;
898        ssock->bufavail --;
899
900        assert(ssock->bufavail >= 0);
901        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
902                ssock->nextwriteind = 0;
903        }
904
905        /* Get the useful info from the encap header */
906        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
907
908        mon = ssock->monitorptr;
909
910        if (mon->laststart == 0) {
911                mon->laststart = bswap_be_to_host64(encaphdr->started);
912        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
913                mon->laststart = bswap_be_to_host64(encaphdr->started);
914                reset_expected_seqs(rt, mon);
915
916                /* TODO what is a good way to indicate this to clients?
917                 * set the loss counter in the ERF header? a bit rude?
918                 * use another bit in the ERF header?
919                 * add a queryable flag to libtrace_packet_t?
920                 */
921
922        }
923
924        if (ssock->expectedseq != 0) {
925                rt->missing_records += seq_cmp(
926                                ntohl(encaphdr->seqno), ssock->expectedseq);
927
928        }
929        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
930        if (ssock->expectedseq == 0) {
931                ssock->expectedseq ++;
932        }
933
934        if (ssock->nextread == NULL) {
935                /* If this is our first read, set up 'nextread'
936                 * by skipping past the nDAG headers */
937                ssock->nextread = ssock->saved[0] +
938                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
939        }
940        return 1;
941
942}
943
944static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
945                int *gottime, recvstream_t *rt) {
946
947        int ret, ndagstat;
948        int toret = 0;
949
950#if HAVE_DECL_RECVMMSG
951        int i, avail;
952#endif
953
954
955#if HAVE_DECL_RECVMMSG
956        avail = init_receivers(ssock, ssock->bufavail);
957        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
958                        MSG_DONTWAIT, NULL);
959#else
960        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
961#endif
962        if (ret < 0) {
963                /* Nothing to receive right now, but we should still
964                 * count as 'ready' if at least one buffer is full */
965                if (errno == EAGAIN || errno == EWOULDBLOCK) {
966                        if (readable_data(ssock)) {
967                                toret = 1;
968                        }
969                        if (!(*gottime)) {
970                                gettimeofday(tv, NULL);
971                                *gottime = 1;
972                        }
973                        if (ssock->startidle == 0) {
974                                ssock->startidle = tv->tv_sec;
975                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
976                                fprintf(stderr,
977                                        "Closing channel %s:%u due to inactivity.\n",
978                                        ssock->groupaddr,
979                                        ssock->port);
980
981                                close(ssock->sock);
982                                ssock->sock = -1;
983                        }
984                } else {
985
986                        fprintf(stderr,
987                                "Error receiving encapsulated records from %s:%u -- %s \n",
988                                ssock->groupaddr, ssock->port,
989                                strerror(errno));
990                        close(ssock->sock);
991                        ssock->sock = -1;
992                }
993                return toret;
994        }
995
996        ssock->startidle = 0;
997
998#if HAVE_DECL_RECVMMSG
999        for (i = 0; i < ret; i++) {
1000                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
1001                                ssock->mmsgbufs[i].msg_len, rt);
1002                if (ndagstat == -1) {
1003                        break;
1004                }
1005
1006                if (ndagstat == 1) {
1007                        toret = 1;
1008                }
1009        }
1010#else
1011        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1012        if (ndagstat <= 0) {
1013                toret = 0;
1014        } else {
1015                toret = 1;
1016        }
1017#endif
1018
1019        return toret;
1020}
1021
1022static int receive_from_sockets(recvstream_t *rt) {
1023
1024        int i, readybufs, gottime;
1025        struct timeval tv;
1026        fd_set fds;
1027        int maxfd = 0;
1028        struct timeval zerotv;
1029
1030        readybufs = 0;
1031        gottime = 0;
1032
1033        FD_ZERO(&fds);
1034
1035        if (rt->maxfd == -1) {
1036                return 0;
1037        }
1038
1039        zerotv.tv_sec = 0;
1040        zerotv.tv_usec = 0;
1041
1042        for (i = 0; i < rt->sourcecount; i++) {
1043                if (rt->sources[i].sock == -1) {
1044                        continue;
1045                }
1046
1047#if HAVE_DECL_RECVMMSG
1048                /* Plenty of full buffers, just use the packets in those */
1049                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
1050                        readybufs ++;
1051                        continue;
1052                }
1053#else
1054                if (rt->sources[i].bufavail == 0) {
1055                        readybufs ++;
1056                        continue;
1057                }
1058#endif
1059                FD_SET(rt->sources[i].sock, &fds);
1060                if (maxfd < rt->sources[i].sock) {
1061                        maxfd = rt->sources[i].sock;
1062                }
1063        }
1064
1065
1066        if (maxfd <= 0) {
1067                return readybufs;
1068        }
1069
1070        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1071                /* log the error? XXX */
1072                return -1;
1073        }
1074
1075        for (i = 0; i < rt->sourcecount; i++) {
1076                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1077                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
1078                                readybufs ++;
1079                        }
1080                        continue;
1081                }
1082                readybufs += receive_from_single_socket(&(rt->sources[i]),
1083                                &tv, &gottime, rt);
1084        }
1085
1086        return readybufs;
1087
1088}
1089
1090
1091static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1092                libtrace_packet_t *packet) {
1093
1094        int iserr = 0;
1095
1096        if (packet->buf_control == TRACE_CTRL_PACKET) {
1097                free(packet->buffer);
1098                packet->buffer = NULL;
1099        }
1100
1101        do {
1102                /* Make sure we shouldn't be halting */
1103                if ((iserr = is_halted(libtrace)) != -1) {
1104                        return iserr;
1105                }
1106
1107                /* Check for any messages from the control thread */
1108                iserr = receiver_read_messages(rt);
1109
1110                if (iserr <= 0) {
1111                        return iserr;
1112                }
1113
1114                /* If blocking and no sources, sleep for a bit and then try
1115                 * checking for messages again.
1116                 */
1117                if (rt->sourcecount == 0) {
1118                        usleep(10000);
1119                        continue;
1120                }
1121
1122                if ((iserr = receive_from_sockets(rt)) < 0) {
1123                        return iserr;
1124                } else if (iserr > 0) {
1125                        /* At least one of our input sockets has available
1126                         * data, let's go ahead and use what we have. */
1127                        break;
1128                }
1129
1130                /* None of our sources have anything available, we can take
1131                 * a short break rather than immediately trying again.
1132                 */
1133                if (iserr == 0) {
1134                        usleep(100);
1135                }
1136
1137        } while (1);
1138
1139        return iserr;
1140}
1141
1142static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1143                libtrace_packet_t *packet) {
1144
1145        int iserr = 0;
1146
1147        if (packet->buf_control == TRACE_CTRL_PACKET) {
1148                free(packet->buffer);
1149                packet->buffer = NULL;
1150        }
1151
1152        /* Make sure we shouldn't be halting */
1153        if ((iserr = is_halted(libtrace)) != -1) {
1154                return iserr;
1155        }
1156
1157        /* If non-blocking and there are no sources, just break */
1158        if (rt->sourcecount == 0) {
1159                return 0;
1160        }
1161
1162        return receive_from_sockets(rt);
1163}
1164
1165static streamsock_t *select_next_packet(recvstream_t *rt) {
1166        int i;
1167        streamsock_t *ssock = NULL;
1168        uint64_t earliest = 0;
1169        uint64_t currentts = 0;
1170        dag_record_t *daghdr;
1171
1172        /* If we only have one source, then no need to do any
1173         * timestamp parsing or byteswapping.
1174         */
1175        if (rt->sourcecount == 1) {
1176                if (readable_data(&(rt->sources[0]))) {
1177                        return &(rt->sources[0]);
1178                }
1179                return NULL;
1180        }
1181
1182
1183        for (i = 0; i < rt->sourcecount; i ++) {
1184                if (!readable_data(&(rt->sources[i]))) {
1185                        continue;
1186                }
1187
1188                if (rt->sources[i].nextts == 0) {
1189                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1190                        currentts = bswap_le_to_host64(daghdr->ts);
1191                        rt->sources[i].nextts = currentts;
1192                } else {
1193                        currentts = rt->sources[i].nextts;
1194                }
1195
1196                if (earliest == 0 || earliest > currentts) {
1197                        earliest = currentts;
1198                        ssock = &(rt->sources[i]);
1199                }
1200        }
1201        return ssock;
1202}
1203
1204static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1205
1206        int rem, ret;
1207        streamsock_t *nextavail = NULL;
1208        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1209                        packet);
1210
1211        if (rem <= 0) {
1212                return rem;
1213        }
1214
1215        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1216        if (nextavail == NULL) {
1217                return 0;
1218        }
1219
1220        /* nextread should point at an ERF header, so prepare 'packet' to be
1221         * a libtrace ERF packet. */
1222
1223        ret = ndag_prepare_packet_stream(libtrace,
1224                        &(FORMAT_DATA->receivers[0]), nextavail,
1225                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1226        nextavail->bufavail += nextavail->bufwaiting;
1227        nextavail->bufwaiting = 0;
1228        return ret;
1229}
1230
1231static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1232                libtrace_packet_t **packets, size_t nb_packets) {
1233
1234        recvstream_t *rt;
1235        int rem, i;
1236        size_t read_packets = 0;
1237        streamsock_t *nextavail = NULL;
1238
1239        rt = (recvstream_t *)t->format_data;
1240
1241
1242        do {
1243                /* Only check for messages once per batch */
1244                if (read_packets == 0) {
1245                        rem = receive_encap_records_block(libtrace, rt,
1246                                packets[read_packets]);
1247                } else {
1248                        rem = receive_encap_records_nonblock(libtrace, rt,
1249                                packets[read_packets]);
1250                }
1251
1252                if (rem < 0) {
1253                        return rem;
1254                }
1255
1256                if (rem == 0) {
1257                        break;
1258                }
1259
1260                nextavail = select_next_packet(rt);
1261                if (nextavail == NULL) {
1262                        break;
1263                }
1264
1265                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1266                                packets[read_packets],
1267                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1268
1269                read_packets  ++;
1270                if (read_packets >= nb_packets) {
1271                        break;
1272                }
1273        } while (1);
1274
1275        for (i = 0; i < rt->sourcecount; i++) {
1276                streamsock_t *src = &(rt->sources[i]);
1277                src->bufavail += src->bufwaiting;
1278                src->bufwaiting = 0;
1279                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1280        }
1281
1282        return read_packets;
1283
1284}
1285
1286static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1287                libtrace_packet_t *packet) {
1288
1289
1290        libtrace_eventobj_t event = {0,0,0.0,0};
1291        int rem, i;
1292        streamsock_t *nextavail = NULL;
1293
1294        /* Only check for messages once per call */
1295        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1296        if (rem <= 0) {
1297                event.type = TRACE_EVENT_TERMINATE;
1298                return event;
1299        }
1300
1301        do {
1302                rem = receive_encap_records_nonblock(libtrace,
1303                                &(FORMAT_DATA->receivers[0]), packet);
1304
1305                if (rem < 0) {
1306                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1307                                "Received invalid nDAG records.");
1308                        event.type = TRACE_EVENT_TERMINATE;
1309                        break;
1310                }
1311
1312                if (rem == 0) {
1313                        /* Either we've been halted or we've got no packets
1314                         * right now. */
1315                        if (is_halted(libtrace) == 0) {
1316                                event.type = TRACE_EVENT_TERMINATE;
1317                                break;
1318                        }
1319                        event.type = TRACE_EVENT_SLEEP;
1320                        event.seconds = 0.0001;
1321                        break;
1322                }
1323
1324                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1325                if (nextavail == NULL) {
1326                        event.type = TRACE_EVENT_SLEEP;
1327                        event.seconds = 0.0001;
1328                        break;
1329                }
1330
1331                event.type = TRACE_EVENT_PACKET;
1332                ndag_prepare_packet_stream(libtrace,
1333                                &(FORMAT_DATA->receivers[0]), nextavail,
1334                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1335                event.size = trace_get_capture_length(packet) +
1336                                trace_get_framing_length(packet);
1337
1338                if (libtrace->filter) {
1339                        int filtret = trace_apply_filter(libtrace->filter,
1340                                        packet);
1341                        if (filtret == -1) {
1342                                trace_set_err(libtrace,
1343                                                TRACE_ERR_BAD_FILTER,
1344                                                "Bad BPF Filter");
1345                                event.type = TRACE_EVENT_TERMINATE;
1346                                break;
1347                        }
1348
1349                        if (filtret == 0) {
1350                                /* Didn't match filter, try next one */
1351                                libtrace->filtered_packets ++;
1352                                trace_clear_cache(packet);
1353                                continue;
1354                        }
1355                }
1356
1357                if (libtrace->snaplen > 0) {
1358                        trace_set_capture_length(packet, libtrace->snaplen);
1359                }
1360                libtrace->accepted_packets ++;
1361                break;
1362        } while (1);
1363
1364        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1365                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1366                src->bufavail += src->bufwaiting;
1367                src->bufwaiting = 0;
1368                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1369        }
1370
1371        return event;
1372}
1373
1374static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1375
1376        int i;
1377
1378        stat->dropped_valid = 1;
1379        stat->dropped = 0;
1380        stat->received_valid = 1;
1381        stat->received = 0;
1382        stat->missing_valid = 1;
1383        stat->missing = 0;
1384
1385        /* TODO Is this thread safe? */
1386        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1387                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1388                stat->received += FORMAT_DATA->receivers[i].received_packets;
1389                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1390        }
1391
1392}
1393
1394static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1395                libtrace_stat_t *stat) {
1396
1397        recvstream_t *recvr = (recvstream_t *)t->format_data;
1398
1399        if (libtrace == NULL)
1400                return;
1401        /* TODO Is this thread safe */
1402        stat->dropped_valid = 1;
1403        stat->dropped = recvr->dropped_upstream;
1404
1405        stat->received_valid = 1;
1406        stat->received = recvr->received_packets;
1407
1408        stat->missing_valid = 1;
1409        stat->missing = recvr->missing_records;
1410
1411}
1412
1413static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1414                bool reader) {
1415        recvstream_t *recvr;
1416
1417        if (!reader || t->type != THREAD_PERPKT) {
1418                return 0;
1419        }
1420
1421        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1422        t->format_data = recvr;
1423
1424        return 0;
1425}
1426
1427static struct libtrace_format_t ndag = {
1428
1429        "ndag",
1430        "",
1431        TRACE_FORMAT_NDAG,
1432        NULL,                   /* probe filename */
1433        NULL,                   /* probe magic */
1434        ndag_init_input,        /* init_input */
1435        NULL,                   /* config_input */
1436        ndag_start_input,       /* start_input */
1437        ndag_pause_input,       /* pause_input */
1438        NULL,                   /* init_output */
1439        NULL,                   /* config_output */
1440        NULL,                   /* start_output */
1441        ndag_fin_input,         /* fin_input */
1442        NULL,                   /* fin_output */
1443        ndag_read_packet,       /* read_packet */
1444        ndag_prepare_packet,    /* prepare_packet */
1445        NULL,                   /* fin_packet */
1446        NULL,                   /* write_packet */
1447        NULL,                   /* flush_output */
1448        erf_get_link_type,      /* get_link_type */
1449        erf_get_direction,      /* get_direction */
1450        erf_set_direction,      /* set_direction */
1451        erf_get_erf_timestamp,  /* get_erf_timestamp */
1452        NULL,                   /* get_timeval */
1453        NULL,                   /* get_seconds */
1454        NULL,                   /* get_timespec */
1455        NULL,                   /* seek_erf */
1456        NULL,                   /* seek_timeval */
1457        NULL,                   /* seek_seconds */
1458        erf_get_capture_length, /* get_capture_length */
1459        erf_get_wire_length,    /* get_wire_length */
1460        erf_get_framing_length, /* get_framing_length */
1461        erf_set_capture_length, /* set_capture_length */
1462        NULL,                   /* get_received_packets */
1463        NULL,                   /* get_filtered_packets */
1464        NULL,                   /* get_dropped_packets */
1465        ndag_get_statistics,    /* get_statistics */
1466        NULL,                   /* get_fd */
1467        trace_event_ndag,       /* trace_event */
1468        NULL,                   /* help */
1469        NULL,                   /* next pointer */
1470        {true, 0},              /* live packet capture */
1471        ndag_pstart_input,      /* parallel start */
1472        ndag_pread_packets,     /* parallel read */
1473        ndag_pause_input,       /* parallel pause */
1474        NULL,
1475        ndag_pregister_thread,  /* register thread */
1476        NULL,
1477        ndag_get_thread_stats   /* per-thread stats */
1478};
1479
1480void ndag_constructor(void) {
1481        register_format(&ndag);
1482}
Note: See TracBrowser for help on using the repository browser.