source: lib/format_ndag.c @ b135888

cachetimestampsdevelopringdecrementfixringperformance
Last change on this file since b135888 was b135888, checked in by Shane Alcock <salcock@…>, 2 years ago

Fix bad check for recvmmsg after switching to AC_CHECK_DECLS

Two problems:

  1. recvmmsg requires an extra #include, which we have to add specifically.
  2. HAVE_RECVMMSG is no longer the right macro to check; instead we needed HAVE_DECL_RECVMMSG
  • Property mode set to 100644
File size: 45.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <assert.h>
38#include <errno.h>
39#include <fcntl.h>
40#include <stdio.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <net/if.h>
45#include <sys/types.h>
46#include <sys/socket.h>
47#include <netdb.h>
48
49#include "format_ndag.h"
50
51#define NDAG_IDLE_TIMEOUT (600)
52#define ENCAP_BUFSIZE (10000)
53#define CTRL_BUF_SIZE (10000)
54#define ENCAP_BUFFERS (1000)
55
56#define RECV_BATCH_SIZE (50)
57
58#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
59
60static struct libtrace_format_t ndag;
61
62volatile int ndag_paused = 0;
63
64typedef struct monitor {
65        uint16_t monitorid;
66        uint64_t laststart;
67} ndag_monitor_t;
68
69
70typedef struct streamsource {
71        uint16_t monitor;
72        char *groupaddr;
73        char *localiface;
74        uint16_t port;
75} streamsource_t;
76
77typedef struct streamsock {
78        char *groupaddr;
79        int sock;
80        struct addrinfo *srcaddr;
81        uint16_t port;
82        uint32_t expectedseq;
83        ndag_monitor_t *monitorptr;
84        char **saved;
85        char *nextread;
86        int nextreadind;
87        int nextwriteind;
88        int savedsize[ENCAP_BUFFERS];
89        uint64_t nextts;
90        uint32_t startidle;
91        uint64_t recordcount;
92
93        int bufavail;
94        int bufwaiting;
95
96#if HAVE_DECL_RECVMMSG
97        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
98#else
99        struct msghdr singlemsg;
100#endif
101
102} streamsock_t;
103
104typedef struct recvstream {
105        streamsock_t *sources;
106        uint16_t sourcecount;
107        libtrace_message_queue_t mqueue;
108        int threadindex;
109        ndag_monitor_t *knownmonitors;
110        uint16_t monitorcount;
111
112        uint64_t dropped_upstream;
113        uint64_t missing_records;
114        uint64_t received_packets;
115
116        int maxfd;
117} recvstream_t;
118
119typedef struct ndag_format_data {
120        char *multicastgroup;
121        char *portstr;
122        char *localiface;
123        uint16_t nextthreadid;
124        recvstream_t *receivers;
125
126        pthread_t controlthread;
127        libtrace_message_queue_t controlqueue;
128} ndag_format_data_t;
129
130enum {
131        NDAG_CLIENT_HALT = 0x01,
132        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
133        NDAG_CLIENT_NEWGROUP = 0x03
134};
135
136typedef struct ndagreadermessage {
137        uint8_t type;
138        streamsource_t contents;
139} ndag_internal_message_t;
140
141
142static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
143
144        /* Calculate seq_a - seq_b, taking wraparound into account */
145        if (seq_a == seq_b) return 0;
146
147        if (seq_a > seq_b) {
148                return (int) (seq_a - seq_b);
149        }
150
151        /* -1 for the wrap and another -1 because we don't use zero */
152        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
153}
154
155static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
156        ndag_common_t *header = (ndag_common_t *)msgbuf;
157
158        if (msgsize < sizeof(ndag_common_t)) {
159                fprintf(stderr,
160                        "nDAG message does not have a complete nDAG header.\n");
161                return 0;
162        }
163
164        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
165                fprintf(stderr,
166                        "nDAG message does not have a valid magic number.\n");
167                return 0;
168        }
169
170        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
171                fprintf(stderr,
172                        "nDAG message has an invalid header version: %u\n",
173                                header->version);
174                return 0;
175        }
176
177        return header->type;
178}
179
180static int join_multicast_group(char *groupaddr, char *localiface,
181        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
182
183        struct addrinfo hints;
184        struct addrinfo *gotten;
185        struct addrinfo *group;
186        unsigned int interface;
187        char pstr[16];
188        struct group_req greq;
189        int bufsize;
190
191        int sock;
192
193        if (portstr == NULL) {
194                snprintf(pstr, 15, "%u", portnum);
195                portstr = pstr;
196        }
197
198        interface = if_nametoindex(localiface);
199        if (interface == 0) {
200                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
201                                localiface, strerror(errno));
202                return -1;
203        }
204
205        hints.ai_family = PF_UNSPEC;
206        hints.ai_socktype = SOCK_DGRAM;
207        hints.ai_flags = AI_PASSIVE;
208        hints.ai_protocol = 0;
209
210        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
211                fprintf(stderr,
212                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
213                                portstr, strerror(errno));
214                return -1;
215        }
216
217        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
218                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
219                                groupaddr, strerror(errno));
220                return -1;
221        }
222
223        *srcinfo = gotten;
224        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
225        if (sock < 0) {
226                fprintf(stderr,
227                        "Failed to create multicast socket for %s:%s -- %s\n",
228                                groupaddr, portstr, strerror(errno));
229                goto sockcreateover;
230        }
231
232        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
233        {
234                fprintf(stderr,
235                        "Failed to bind to multicast socket %s:%s -- %s\n",
236                                groupaddr, portstr, strerror(errno));
237                sock = -1;
238                goto sockcreateover;
239        }
240
241        greq.gr_interface = interface;
242        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
243
244        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
245                        sizeof(greq)) < 0) {
246                fprintf(stderr,
247                        "Failed to join multicast group %s:%s -- %s\n",
248                                groupaddr, portstr, strerror(errno));
249                close(sock);
250                sock = -1;
251                goto sockcreateover;
252        }
253
254        bufsize = 16 * 1024 * 1024;
255        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
256                                (socklen_t)sizeof(int)) < 0) {
257
258                fprintf(stderr,
259                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
260                                groupaddr, portstr, strerror(errno));
261                close(sock);
262                sock = -1;
263                goto sockcreateover;
264        }
265
266sockcreateover:
267        freeaddrinfo(group);
268        return sock;
269}
270
271
272static int ndag_init_input(libtrace_t *libtrace) {
273
274        char *scan = NULL;
275        char *next = NULL;
276
277        libtrace->format_data = (ndag_format_data_t *)malloc(
278                        sizeof(ndag_format_data_t));
279
280        FORMAT_DATA->multicastgroup = NULL;
281        FORMAT_DATA->portstr = NULL;
282        FORMAT_DATA->localiface = NULL;
283        FORMAT_DATA->nextthreadid = 0;
284        FORMAT_DATA->receivers = NULL;
285
286        scan = strchr(libtrace->uridata, ',');
287        if (scan == NULL) {
288                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
289                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
290                return -1;
291        }
292        FORMAT_DATA->localiface = strndup(libtrace->uridata,
293                        (size_t)(scan - libtrace->uridata));
294        next = scan + 1;
295
296        scan = strchr(next, ',');
297        if (scan == NULL) {
298                FORMAT_DATA->portstr = strdup("9001");
299                FORMAT_DATA->multicastgroup = strdup(next);
300        } else {
301                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
302
303                FORMAT_DATA->portstr = strdup(scan + 1);
304        }
305        return 0;
306}
307
308static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
309                uint16_t portnum, uint16_t monid) {
310
311        ndag_internal_message_t alert;
312
313        alert.type = NDAG_CLIENT_NEWGROUP;
314        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
315        alert.contents.localiface = FORMAT_DATA->localiface;
316        alert.contents.port = portnum;
317        alert.contents.monitor = monid;
318
319        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
320                        (void *)&alert);
321
322}
323
324static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
325                int msgsize, uint16_t *ptmap) {
326
327        int i;
328        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
329        uint8_t msgtype;
330
331        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
332        if (msgtype == 0) {
333                return -1;
334        }
335
336        msgsize -= sizeof(ndag_common_t);
337        if (msgtype == NDAG_PKT_BEACON) {
338                /* If message is a beacon, make sure every port included in the
339                 * beacon is assigned to a receive thread.
340                 */
341                uint16_t *ptr, numstreams;
342
343                if ((uint32_t)msgsize < sizeof(uint16_t)) {
344                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
345                        return -1;
346                }
347
348                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
349                numstreams = ntohs(*ptr);
350                ptr ++;
351
352                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
353                {
354                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
355                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
356                        return -1;
357                }
358
359                for (i = 0; i < numstreams; i++) {
360                        uint16_t streamport = ntohs(*ptr);
361
362                        if (ptmap[streamport] == 0xffff) {
363                                new_group_alert(libtrace,
364                                        FORMAT_DATA->nextthreadid, streamport,
365                                        ntohs(ndaghdr->monitorid));
366
367                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
368
369                                if (libtrace->perpkt_thread_count == 0) {
370                                        FORMAT_DATA->nextthreadid = 0;
371                                } else {
372                                        FORMAT_DATA->nextthreadid =
373                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
374                                }
375                        }
376
377                        ptr ++;
378                }
379        } else {
380                fprintf(stderr,
381                        "Unexpected message type on control channel: %u\n",
382                         msgtype);
383                return -1;
384        }
385
386        return 0;
387
388}
389
390static void *ndag_controller_run(void *tdata) {
391
392        libtrace_t *libtrace = (libtrace_t *)tdata;
393        uint16_t ptmap[65536];
394        int sock = -1;
395        struct addrinfo *receiveaddr = NULL;
396        fd_set listening;
397        struct timeval timeout;
398
399        /* ptmap is a dirty hack to allow us to quickly check if we've already
400         * assigned a stream to a thread.
401         */
402        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
403
404        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
405                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
406                        &receiveaddr);
407        if (sock == -1) {
408                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
409                        "Unable to join multicast group for nDAG control channel");
410                trace_interrupt();
411                pthread_exit(NULL);
412        }
413
414        ndag_paused = 0;
415        while ((is_halted(libtrace) == -1) && !ndag_paused) {
416                int ret;
417                char buf[CTRL_BUF_SIZE];
418
419                FD_ZERO(&listening);
420                FD_SET(sock, &listening);
421
422                timeout.tv_sec = 0;
423                timeout.tv_usec = 500000;
424
425                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
426                if (ret < 0) {
427                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
428                        break;
429                }
430
431                if (!FD_ISSET(sock, &listening)) {
432                        continue;
433                }
434
435                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
436                                receiveaddr->ai_addr,
437                                &(receiveaddr->ai_addrlen));
438                if (ret < 0) {
439                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
440                        break;
441                }
442
443                if (ret == 0) {
444                        break;
445                }
446
447                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
448                        fprintf(stderr, "Error while parsing nDAG control message.\n");
449                        continue;
450                }
451        }
452
453        if (sock >= 0) {
454                close(sock);
455        }
456
457        /* Control channel has fallen over, should probably encourage libtrace
458         * to halt the receiver threads as well.
459         */
460        if (!is_halted(libtrace)) {
461                trace_interrupt();
462        }
463
464        pthread_exit(NULL);
465}
466
467static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
468{
469        int ret;
470        uint32_t i;
471        /* Configure the set of receiver threads */
472
473        if (FORMAT_DATA->receivers == NULL) {
474                /* What if the number of threads changes between a pause and
475                 * a restart? Can this happen? */
476                FORMAT_DATA->receivers = (recvstream_t *)
477                                malloc(sizeof(recvstream_t) * maxthreads);
478        }
479
480        for (i = 0; i < maxthreads; i++) {
481                FORMAT_DATA->receivers[i].sources = NULL;
482                FORMAT_DATA->receivers[i].sourcecount = 0;
483                FORMAT_DATA->receivers[i].knownmonitors = NULL;
484                FORMAT_DATA->receivers[i].monitorcount = 0;
485                FORMAT_DATA->receivers[i].threadindex = i;
486                FORMAT_DATA->receivers[i].dropped_upstream = 0;
487                FORMAT_DATA->receivers[i].received_packets = 0;
488                FORMAT_DATA->receivers[i].missing_records = 0;
489                FORMAT_DATA->receivers[i].maxfd = -1;
490
491                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
492                                sizeof(ndag_internal_message_t));
493        }
494
495        /* Start the controller thread */
496        /* TODO consider affinity of this thread? */
497
498        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
499                        ndag_controller_run, libtrace);
500        if (ret != 0) {
501                return -1;
502        }
503        return maxthreads;
504}
505
506static int ndag_start_input(libtrace_t *libtrace) {
507        return ndag_start_threads(libtrace, 1);
508}
509
510static int ndag_pstart_input(libtrace_t *libtrace) {
511        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
512                        libtrace->perpkt_thread_count)
513                return 0;
514        return -1;
515}
516
517static void halt_ndag_receiver(recvstream_t *receiver) {
518        int j, i;
519        libtrace_message_queue_destroy(&(receiver->mqueue));
520
521        if (receiver->sources == NULL)
522                return;
523        for (i = 0; i < receiver->sourcecount; i++) {
524                streamsock_t src = receiver->sources[i];
525                if (src.saved) {
526                        for (j = 0; j < ENCAP_BUFFERS; j++) {
527                                if (src.saved[j]) {
528                                        free(src.saved[j]);
529                                }
530                        }
531                        free(src.saved);
532                }
533
534#if HAVE_DECL_RECVMMSG
535                for (j = 0; j < RECV_BATCH_SIZE; j++) {
536                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
537                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
538                        }
539                }
540#else
541                free(src.singlemsg.msg_iov);
542#endif
543
544                close(src.sock);
545        }
546        if (receiver->knownmonitors) {
547                free(receiver->knownmonitors);
548        }
549
550        if (receiver->sources) {
551                free(receiver->sources);
552        }
553}
554
555static int ndag_pause_input(libtrace_t *libtrace) {
556        int i;
557
558        /* Close the existing receiver sockets */
559        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
560               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
561        }
562        return 0;
563}
564
565static int ndag_fin_input(libtrace_t *libtrace) {
566
567        if (FORMAT_DATA->receivers) {
568                free(FORMAT_DATA->receivers);
569        }
570        if (FORMAT_DATA->multicastgroup) {
571                free(FORMAT_DATA->multicastgroup);
572        }
573        if (FORMAT_DATA->portstr) {
574                free(FORMAT_DATA->portstr);
575        }
576        if (FORMAT_DATA->localiface) {
577                free(FORMAT_DATA->localiface);
578        }
579
580        free(libtrace->format_data);
581        return 0;
582}
583
584static int ndag_prepare_packet_stream(libtrace_t *libtrace,
585                recvstream_t *rt,
586                streamsock_t *ssock, libtrace_packet_t *packet,
587                uint32_t flags) {
588
589        dag_record_t *erfptr;
590        ndag_encap_t *encaphdr;
591        uint16_t ndag_reccount = 0;
592        int nr;
593        uint16_t rlen;
594
595        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
596                packet->buf_control = TRACE_CTRL_PACKET;
597        } else {
598                packet->buf_control = TRACE_CTRL_EXTERNAL;
599        }
600
601        packet->trace = libtrace;
602        packet->buffer = ssock->nextread;
603        packet->header = ssock->nextread;
604        packet->type = TRACE_RT_DATA_ERF;
605
606        erfptr = (dag_record_t *)packet->header;
607
608        if (erfptr->flags.rxerror == 1) {
609                packet->payload = NULL;
610                erfptr->rlen = htons(erf_get_framing_length(packet));
611        } else {
612                packet->payload = (char *)packet->buffer +
613                                erf_get_framing_length(packet);
614        }
615
616        /* Update upstream drops using lctr */
617
618        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
619                /* TODO */
620        } else {
621                if (rt->received_packets > 0) {
622                        rt->dropped_upstream += ntohs(erfptr->lctr);
623                }
624        }
625
626        rt->received_packets ++;
627        ssock->recordcount += 1;
628
629        nr = ssock->nextreadind;
630        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
631                        sizeof(ndag_common_t));
632
633        ndag_reccount = ntohs(encaphdr->recordcount);
634        if ((ndag_reccount & 0x8000) != 0) {
635                /* Record was truncated -- update rlen appropriately */
636                rlen = ssock->savedsize[nr] -
637                                (ssock->nextread - ssock->saved[nr]);
638                erfptr->rlen = htons(rlen);
639        } else {
640                rlen = ntohs(erfptr->rlen);
641        }
642        ssock->nextread += rlen;
643        ssock->nextts = 0;
644
645        assert(ssock->nextread - ssock->saved[nr] <= ssock->savedsize[nr]);
646
647        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
648                /* Read everything from this buffer, mark as empty and
649                 * move on. */
650                ssock->savedsize[nr] = 0;
651                ssock->bufwaiting ++;
652
653                nr ++;
654                if (nr == ENCAP_BUFFERS) {
655                        nr = 0;
656                }
657                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
658                                sizeof(ndag_encap_t);
659                ssock->nextreadind = nr;
660        }
661
662        packet->order = erf_get_erf_timestamp(packet);
663        packet->error = rlen;
664        return rlen;
665}
666
667static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
668                libtrace_packet_t *packet UNUSED,
669                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
670                uint32_t flags UNUSED) {
671
672        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
673        return 0;
674
675}
676
677static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
678
679        ndag_monitor_t *mon;
680
681        if (rt->monitorcount == 0) {
682                rt->knownmonitors = (ndag_monitor_t *)
683                                malloc(sizeof(ndag_monitor_t) * 5);
684        } else {
685                rt->knownmonitors = (ndag_monitor_t *)
686                            realloc(rt->knownmonitors,
687                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
688        }
689
690        mon = &(rt->knownmonitors[rt->monitorcount]);
691        mon->monitorid = monid;
692        mon->laststart = 0;
693
694        rt->monitorcount ++;
695        return mon;
696}
697
698static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
699
700        streamsock_t *ssock = NULL;
701        ndag_monitor_t *mon = NULL;
702        int i;
703
704        /* TODO consider replacing this with a list or vector so we can
705         * easily remove sources that are no longer in use, rather than
706         * just setting the sock to -1 and having to check them every
707         * time we want to read a packet.
708         */
709        if (rt->sourcecount == 0) {
710                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
711        } else if ((rt->sourcecount % 10) == 0) {
712                rt->sources = (streamsock_t *)realloc(rt->sources,
713                        sizeof(streamsock_t) * (rt->sourcecount + 10));
714        }
715
716        ssock = &(rt->sources[rt->sourcecount]);
717
718        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
719                        NULL, src.port, &(ssock->srcaddr));
720
721        if (ssock->sock < 0) {
722                return -1;
723        }
724
725        for (i = 0; i < rt->monitorcount; i++) {
726                if (rt->knownmonitors[i].monitorid == src.monitor) {
727                        mon = &(rt->knownmonitors[i]);
728                        break;
729                }
730        }
731
732        if (mon == NULL) {
733                mon = add_new_knownmonitor(rt, src.monitor);
734        }
735
736        ssock->port = src.port;
737        ssock->groupaddr = src.groupaddr;
738        ssock->expectedseq = 0;
739        ssock->monitorptr = mon;
740        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
741        ssock->bufavail = ENCAP_BUFFERS;
742        ssock->bufwaiting = 0;
743        ssock->startidle = 0;
744        ssock->nextts = 0;
745
746        for (i = 0; i < ENCAP_BUFFERS; i++) {
747                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
748                ssock->savedsize[i] = 0;
749        }
750
751#if HAVE_DECL_RECVMMSG
752        for (i = 0; i < RECV_BATCH_SIZE; i++) {
753                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
754                                malloc(sizeof(struct iovec));
755                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
756                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
757                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
758                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
759                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
760                ssock->mmsgbufs[i].msg_len = 0;
761        }
762#else
763        ssock->singlemsg.msg_iov = (struct iovec *) malloc(sizeof(struct iovec));
764#endif
765
766        ssock->nextread = NULL;;
767        ssock->nextreadind = 0;
768        ssock->nextwriteind = 0;
769        ssock->recordcount = 0;
770        rt->sourcecount += 1;
771        if (ssock->sock > rt->maxfd) {
772                rt->maxfd = ssock->sock;
773        }
774
775        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
776                        ssock->groupaddr, ssock->port, rt->threadindex);
777
778        return ssock->port;
779}
780
781static int receiver_read_messages(recvstream_t *rt) {
782
783        ndag_internal_message_t msg;
784
785        while (libtrace_message_queue_try_get(&(rt->mqueue),
786                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
787                switch(msg.type) {
788                        case NDAG_CLIENT_NEWGROUP:
789                                if (add_new_streamsock(rt, msg.contents) < 0) {
790                                        return -1;
791                                }
792                                break;
793                        case NDAG_CLIENT_HALT:
794                                return 0;
795                }
796        }
797        return 1;
798
799}
800
801static inline int readable_data(streamsock_t *ssock) {
802
803        if (ssock->sock == -1) {
804                return 0;
805        }
806        if (ssock->savedsize[ssock->nextreadind] == 0) {
807                return 0;
808        }
809        /*
810        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
811                        ssock->savedsize[ssock->nextreadind]) {
812                return 0;
813        }
814        */
815        return 1;
816
817
818}
819
820static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
821
822        int i;
823        for (i = 0; i < rt->sourcecount; i++) {
824                if (rt->sources[i].monitorptr == mon) {
825                        rt->sources[i].expectedseq = 0;
826                }
827        }
828
829}
830
831static int init_receivers(streamsock_t *ssock, int required) {
832
833        int wind = ssock->nextwriteind;
834        int i = 1;
835
836#if HAVE_DECL_RECVMMSG
837        for (i = 0; i < required; i++) {
838                if (i >= RECV_BATCH_SIZE) {
839                        break;
840                }
841
842                if (wind >= ENCAP_BUFFERS) {
843                        wind = 0;
844                }
845
846                ssock->mmsgbufs[i].msg_len = 0;
847                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
848                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
849                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
850
851                wind ++;
852        }
853#else
854        assert(required > 0);
855        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
856        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
857        ssock->singlemsg.msg_iovlen = 1;
858#endif
859        return i;
860}
861
862static int check_ndag_received(streamsock_t *ssock, int index,
863                unsigned int msglen, recvstream_t *rt) {
864
865        ndag_encap_t *encaphdr;
866        ndag_monitor_t *mon;
867        uint8_t rectype;
868
869        /* Check that we have a valid nDAG encap record */
870        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
871
872        if (rectype == NDAG_PKT_KEEPALIVE) {
873                /* Keep-alive, reset startidle and carry on. Don't
874                 * change nextwrite -- we want to overwrite the
875                 * keep-alive with usable content. */
876                return 0;
877        } else if (rectype != NDAG_PKT_ENCAPERF) {
878                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
879                                ssock->groupaddr, ssock->port);
880                close(ssock->sock);
881                ssock->sock = -1;
882                return -1;
883        }
884
885        ssock->savedsize[index] = msglen;
886        ssock->nextwriteind ++;
887        ssock->bufavail --;
888
889        assert(ssock->bufavail >= 0);
890        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
891                ssock->nextwriteind = 0;
892        }
893
894        /* Get the useful info from the encap header */
895        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
896
897        mon = ssock->monitorptr;
898
899        if (mon->laststart == 0) {
900                mon->laststart = bswap_be_to_host64(encaphdr->started);
901        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
902                mon->laststart = bswap_be_to_host64(encaphdr->started);
903                reset_expected_seqs(rt, mon);
904
905                /* TODO what is a good way to indicate this to clients?
906                 * set the loss counter in the ERF header? a bit rude?
907                 * use another bit in the ERF header?
908                 * add a queryable flag to libtrace_packet_t?
909                 */
910
911        }
912
913        if (ssock->expectedseq != 0) {
914                rt->missing_records += seq_cmp(
915                                ntohl(encaphdr->seqno), ssock->expectedseq);
916
917        }
918        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
919        if (ssock->expectedseq == 0) {
920                ssock->expectedseq ++;
921        }
922
923        if (ssock->nextread == NULL) {
924                /* If this is our first read, set up 'nextread'
925                 * by skipping past the nDAG headers */
926                ssock->nextread = ssock->saved[0] +
927                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
928        }
929        return 1;
930
931}
932
933static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
934                int *gottime, recvstream_t *rt) {
935
936        int ret, ndagstat, avail;
937        int toret = 0;
938
939#if HAVE_DECL_RECVMMSG
940        int i;
941#endif
942
943        avail = init_receivers(ssock, ssock->bufavail);
944
945#if HAVE_DECL_RECVMMSG
946        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
947                        MSG_DONTWAIT, NULL);
948#else
949        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
950#endif
951        if (ret < 0) {
952                /* Nothing to receive right now, but we should still
953                 * count as 'ready' if at least one buffer is full */
954                if (errno == EAGAIN || errno == EWOULDBLOCK) {
955                        if (readable_data(ssock)) {
956                                toret = 1;
957                        }
958                        if (!(*gottime)) {
959                                gettimeofday(tv, NULL);
960                                *gottime = 1;
961                        }
962                        if (ssock->startidle == 0) {
963                                ssock->startidle = tv->tv_sec;
964                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
965                                fprintf(stderr,
966                                        "Closing channel %s:%u due to inactivity.\n",
967                                        ssock->groupaddr,
968                                        ssock->port);
969
970                                close(ssock->sock);
971                                ssock->sock = -1;
972                        }
973                } else {
974
975                        fprintf(stderr,
976                                "Error receiving encapsulated records from %s:%u -- %s \n",
977                                ssock->groupaddr, ssock->port,
978                                strerror(errno));
979                        close(ssock->sock);
980                        ssock->sock = -1;
981                }
982                return toret;
983        }
984
985        ssock->startidle = 0;
986
987#if HAVE_DECL_RECVMMSG
988        for (i = 0; i < ret; i++) {
989                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
990                                ssock->mmsgbufs[i].msg_len, rt);
991                if (ndagstat == -1) {
992                        break;
993                }
994
995                if (ndagstat == 1) {
996                        toret = 1;
997                }
998        }
999#else
1000        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1001        if (ndagstat <= 0) {
1002                toret = 0;
1003        } else {
1004                toret = 1;
1005        }
1006#endif
1007
1008        return toret;
1009}
1010
1011static int receive_from_sockets(recvstream_t *rt) {
1012
1013        int i, readybufs, gottime;
1014        struct timeval tv;
1015        fd_set fds;
1016        int maxfd = 0;
1017        struct timeval zerotv;
1018
1019        readybufs = 0;
1020        gottime = 0;
1021
1022        FD_ZERO(&fds);
1023
1024        if (rt->maxfd == -1) {
1025                return 0;
1026        }
1027
1028        zerotv.tv_sec = 0;
1029        zerotv.tv_usec = 0;
1030
1031        for (i = 0; i < rt->sourcecount; i++) {
1032                if (rt->sources[i].sock == -1) {
1033                        continue;
1034                }
1035
1036#if HAVE_DECL_RECVMMSG
1037                /* Plenty of full buffers, just use the packets in those */
1038                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
1039                        readybufs ++;
1040                        continue;
1041                }
1042#else
1043                if (rt->sources[i].bufavail == 0) {
1044                        readybufs ++;
1045                        continue;
1046                }
1047#endif
1048                FD_SET(rt->sources[i].sock, &fds);
1049                if (maxfd < rt->sources[i].sock) {
1050                        maxfd = rt->sources[i].sock;
1051                }
1052        }
1053
1054
1055        if (maxfd <= 0) {
1056                return readybufs;
1057        }
1058
1059        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1060                /* log the error? XXX */
1061                return -1;
1062        }
1063
1064        for (i = 0; i < rt->sourcecount; i++) {
1065                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1066                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
1067                                readybufs ++;
1068                        }
1069                        continue;
1070                }
1071                readybufs += receive_from_single_socket(&(rt->sources[i]),
1072                                &tv, &gottime, rt);
1073        }
1074
1075        return readybufs;
1076
1077}
1078
1079
1080static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1081                libtrace_packet_t *packet) {
1082
1083        int iserr = 0;
1084
1085        if (packet->buf_control == TRACE_CTRL_PACKET) {
1086                free(packet->buffer);
1087                packet->buffer = NULL;
1088        }
1089
1090        do {
1091                /* Make sure we shouldn't be halting */
1092                if ((iserr = is_halted(libtrace)) != -1) {
1093                        return iserr;
1094                }
1095
1096                /* Check for any messages from the control thread */
1097                iserr = receiver_read_messages(rt);
1098
1099                if (iserr <= 0) {
1100                        return iserr;
1101                }
1102
1103                /* If blocking and no sources, sleep for a bit and then try
1104                 * checking for messages again.
1105                 */
1106                if (rt->sourcecount == 0) {
1107                        usleep(10000);
1108                        continue;
1109                }
1110
1111                if ((iserr = receive_from_sockets(rt)) < 0) {
1112                        return iserr;
1113                } else if (iserr > 0) {
1114                        /* At least one of our input sockets has available
1115                         * data, let's go ahead and use what we have. */
1116                        break;
1117                }
1118
1119                /* None of our sources have anything available, we can take
1120                 * a short break rather than immediately trying again.
1121                 */
1122                if (iserr == 0) {
1123                        usleep(100);
1124                }
1125
1126        } while (1);
1127
1128        return iserr;
1129}
1130
1131static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1132                libtrace_packet_t *packet) {
1133
1134        int iserr = 0;
1135
1136        if (packet->buf_control == TRACE_CTRL_PACKET) {
1137                free(packet->buffer);
1138                packet->buffer = NULL;
1139        }
1140
1141        /* Make sure we shouldn't be halting */
1142        if ((iserr = is_halted(libtrace)) != -1) {
1143                return iserr;
1144        }
1145
1146        /* If non-blocking and there are no sources, just break */
1147        if (rt->sourcecount == 0) {
1148                return 0;
1149        }
1150
1151        return receive_from_sockets(rt);
1152}
1153
1154static streamsock_t *select_next_packet(recvstream_t *rt) {
1155        int i;
1156        streamsock_t *ssock = NULL;
1157        uint64_t earliest = 0;
1158        uint64_t currentts = 0;
1159        dag_record_t *daghdr;
1160
1161        /* If we only have one source, then no need to do any
1162         * timestamp parsing or byteswapping.
1163         */
1164        if (rt->sourcecount == 1) {
1165                if (readable_data(&(rt->sources[0]))) {
1166                        return &(rt->sources[0]);
1167                }
1168                return NULL;
1169        }
1170
1171
1172        for (i = 0; i < rt->sourcecount; i ++) {
1173                if (!readable_data(&(rt->sources[i]))) {
1174                        continue;
1175                }
1176
1177                if (rt->sources[i].nextts == 0) {
1178                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1179                        currentts = bswap_le_to_host64(daghdr->ts);
1180                        rt->sources[i].nextts = currentts;
1181                } else {
1182                        currentts = rt->sources[i].nextts;
1183                }
1184
1185                if (earliest == 0 || earliest > currentts) {
1186                        earliest = currentts;
1187                        ssock = &(rt->sources[i]);
1188                }
1189        }
1190        return ssock;
1191}
1192
1193static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1194
1195        int rem, ret;
1196        streamsock_t *nextavail = NULL;
1197        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1198                        packet);
1199
1200        if (rem <= 0) {
1201                return rem;
1202        }
1203
1204        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1205        if (nextavail == NULL) {
1206                return 0;
1207        }
1208
1209        /* nextread should point at an ERF header, so prepare 'packet' to be
1210         * a libtrace ERF packet. */
1211
1212        ret = ndag_prepare_packet_stream(libtrace,
1213                        &(FORMAT_DATA->receivers[0]), nextavail,
1214                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1215        nextavail->bufavail += nextavail->bufwaiting;
1216        nextavail->bufwaiting = 0;
1217        return ret;
1218}
1219
1220static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1221                libtrace_packet_t **packets, size_t nb_packets) {
1222
1223        recvstream_t *rt;
1224        int rem, i;
1225        size_t read_packets = 0;
1226        streamsock_t *nextavail = NULL;
1227
1228        rt = (recvstream_t *)t->format_data;
1229
1230
1231        do {
1232                /* Only check for messages once per batch */
1233                if (read_packets == 0) {
1234                        rem = receive_encap_records_block(libtrace, rt,
1235                                packets[read_packets]);
1236                } else {
1237                        rem = receive_encap_records_nonblock(libtrace, rt,
1238                                packets[read_packets]);
1239                }
1240
1241                if (rem < 0) {
1242                        return rem;
1243                }
1244
1245                if (rem == 0) {
1246                        break;
1247                }
1248
1249                nextavail = select_next_packet(rt);
1250                if (nextavail == NULL) {
1251                        break;
1252                }
1253
1254                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1255                                packets[read_packets],
1256                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1257
1258                read_packets  ++;
1259                if (read_packets >= nb_packets) {
1260                        break;
1261                }
1262        } while (1);
1263
1264        for (i = 0; i < rt->sourcecount; i++) {
1265                streamsock_t *src = &(rt->sources[i]);
1266                src->bufavail += src->bufwaiting;
1267                src->bufwaiting = 0;
1268                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1269        }
1270
1271        return read_packets;
1272
1273}
1274
1275static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1276                libtrace_packet_t *packet) {
1277
1278
1279        libtrace_eventobj_t event = {0,0,0.0,0};
1280        int rem, i;
1281        streamsock_t *nextavail = NULL;
1282
1283        /* Only check for messages once per call */
1284        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1285        if (rem <= 0) {
1286                event.type = TRACE_EVENT_TERMINATE;
1287                return event;
1288        }
1289
1290        do {
1291                rem = receive_encap_records_nonblock(libtrace,
1292                                &(FORMAT_DATA->receivers[0]), packet);
1293
1294                if (rem < 0) {
1295                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1296                                "Received invalid nDAG records.");
1297                        event.type = TRACE_EVENT_TERMINATE;
1298                        break;
1299                }
1300
1301                if (rem == 0) {
1302                        /* Either we've been halted or we've got no packets
1303                         * right now. */
1304                        if (is_halted(libtrace) == 0) {
1305                                event.type = TRACE_EVENT_TERMINATE;
1306                                break;
1307                        }
1308                        event.type = TRACE_EVENT_SLEEP;
1309                        event.seconds = 0.0001;
1310                        break;
1311                }
1312
1313                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1314                if (nextavail == NULL) {
1315                        event.type = TRACE_EVENT_SLEEP;
1316                        event.seconds = 0.0001;
1317                        break;
1318                }
1319
1320                event.type = TRACE_EVENT_PACKET;
1321                ndag_prepare_packet_stream(libtrace,
1322                                &(FORMAT_DATA->receivers[0]), nextavail,
1323                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1324                event.size = trace_get_capture_length(packet) +
1325                                trace_get_framing_length(packet);
1326
1327                if (libtrace->filter) {
1328                        int filtret = trace_apply_filter(libtrace->filter,
1329                                        packet);
1330                        if (filtret == -1) {
1331                                trace_set_err(libtrace,
1332                                                TRACE_ERR_BAD_FILTER,
1333                                                "Bad BPF Filter");
1334                                event.type = TRACE_EVENT_TERMINATE;
1335                                break;
1336                        }
1337
1338                        if (filtret == 0) {
1339                                /* Didn't match filter, try next one */
1340                                libtrace->filtered_packets ++;
1341                                trace_clear_cache(packet);
1342                                continue;
1343                        }
1344                }
1345
1346                if (libtrace->snaplen > 0) {
1347                        trace_set_capture_length(packet, libtrace->snaplen);
1348                }
1349                libtrace->accepted_packets ++;
1350                break;
1351        } while (1);
1352
1353        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1354                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1355                src->bufavail += src->bufwaiting;
1356                src->bufwaiting = 0;
1357                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1358        }
1359
1360        return event;
1361}
1362
1363static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1364
1365        int i;
1366
1367        stat->dropped_valid = 1;
1368        stat->dropped = 0;
1369        stat->received_valid = 1;
1370        stat->received = 0;
1371        stat->missing_valid = 1;
1372        stat->missing = 0;
1373
1374        /* TODO Is this thread safe? */
1375        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1376                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1377                stat->received += FORMAT_DATA->receivers[i].received_packets;
1378                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1379        }
1380
1381}
1382
1383static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1384                libtrace_stat_t *stat) {
1385
1386        recvstream_t *recvr = (recvstream_t *)t->format_data;
1387
1388        if (libtrace == NULL)
1389                return;
1390        /* TODO Is this thread safe */
1391        stat->dropped_valid = 1;
1392        stat->dropped = recvr->dropped_upstream;
1393
1394        stat->received_valid = 1;
1395        stat->received = recvr->received_packets;
1396
1397        stat->missing_valid = 1;
1398        stat->missing = recvr->missing_records;
1399
1400}
1401
1402static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1403                bool reader) {
1404        recvstream_t *recvr;
1405
1406        if (!reader || t->type != THREAD_PERPKT) {
1407                return 0;
1408        }
1409
1410        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1411        t->format_data = recvr;
1412
1413        return 0;
1414}
1415
1416static struct libtrace_format_t ndag = {
1417
1418        "ndag",
1419        "",
1420        TRACE_FORMAT_NDAG,
1421        NULL,                   /* probe filename */
1422        NULL,                   /* probe magic */
1423        ndag_init_input,        /* init_input */
1424        NULL,                   /* config_input */
1425        ndag_start_input,       /* start_input */
1426        ndag_pause_input,       /* pause_input */
1427        NULL,                   /* init_output */
1428        NULL,                   /* config_output */
1429        NULL,                   /* start_output */
1430        ndag_fin_input,         /* fin_input */
1431        NULL,                   /* fin_output */
1432        ndag_read_packet,       /* read_packet */
1433        ndag_prepare_packet,    /* prepare_packet */
1434        NULL,                   /* fin_packet */
1435        NULL,                   /* write_packet */
1436        NULL,                   /* flush_output */
1437        erf_get_link_type,      /* get_link_type */
1438        erf_get_direction,      /* get_direction */
1439        erf_set_direction,      /* set_direction */
1440        erf_get_erf_timestamp,  /* get_erf_timestamp */
1441        NULL,                   /* get_timeval */
1442        NULL,                   /* get_seconds */
1443        NULL,                   /* get_timespec */
1444        NULL,                   /* seek_erf */
1445        NULL,                   /* seek_timeval */
1446        NULL,                   /* seek_seconds */
1447        erf_get_capture_length, /* get_capture_length */
1448        erf_get_wire_length,    /* get_wire_length */
1449        erf_get_framing_length, /* get_framing_length */
1450        erf_set_capture_length, /* set_capture_length */
1451        NULL,                   /* get_received_packets */
1452        NULL,                   /* get_filtered_packets */
1453        NULL,                   /* get_dropped_packets */
1454        ndag_get_statistics,    /* get_statistics */
1455        NULL,                   /* get_fd */
1456        trace_event_ndag,       /* trace_event */
1457        NULL,                   /* help */
1458        NULL,                   /* next pointer */
1459        {true, 0},              /* live packet capture */
1460        ndag_pstart_input,      /* parallel start */
1461        ndag_pread_packets,     /* parallel read */
1462        ndag_pause_input,       /* parallel pause */
1463        NULL,
1464        ndag_pregister_thread,  /* register thread */
1465        NULL,
1466        ndag_get_thread_stats   /* per-thread stats */
1467};
1468
1469void ndag_constructor(void) {
1470        register_format(&ndag);
1471}
Note: See TracBrowser for help on using the repository browser.