source: lib/format_ndag.c @ 3004d6c

cachetimestampsdevelopetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 3004d6c was 3004d6c, checked in by Shane Alcock <salcock@…>, 3 years ago

format_ndag: use select to skip sockets with no data

A non-blocking call to recvmmsg still requires a lot of initial
effort, e.g. setting up buffers etc., but there is no need to do
so if we use select() to tell us in advance which sockets are
worth trying.

  • Property mode set to 100644
File size: 45.5 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <assert.h>
38#include <errno.h>
39#include <fcntl.h>
40#include <stdio.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <net/if.h>
45#include <sys/types.h>
46#include <sys/socket.h>
47#include <netdb.h>
48
49#include "format_ndag.h"
50
51#define NDAG_IDLE_TIMEOUT (600)
52#define ENCAP_BUFSIZE (10000)
53#define CTRL_BUF_SIZE (10000)
54#define ENCAP_BUFFERS (1000)
55
56#define RECV_BATCH_SIZE (50)
57
58#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
59
60static struct libtrace_format_t ndag;
61
62volatile int ndag_paused = 0;
63
64typedef struct monitor {
65        uint16_t monitorid;
66        uint64_t laststart;
67} ndag_monitor_t;
68
69
70typedef struct streamsource {
71        uint16_t monitor;
72        char *groupaddr;
73        char *localiface;
74        uint16_t port;
75} streamsource_t;
76
77typedef struct streamsock {
78        char *groupaddr;
79        int sock;
80        struct addrinfo *srcaddr;
81        uint16_t port;
82        uint32_t expectedseq;
83        ndag_monitor_t *monitorptr;
84        char **saved;
85        char *nextread;
86        int nextreadind;
87        int nextwriteind;
88        int savedsize[ENCAP_BUFFERS];
89        uint64_t nextts;
90        uint32_t startidle;
91        uint64_t recordcount;
92
93        int bufavail;
94        int bufwaiting;
95
96#if HAVE_RECVMMSG
97        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
98#else
99        struct msghdr singlemsg;
100#endif
101
102} streamsock_t;
103
104typedef struct recvstream {
105        streamsock_t *sources;
106        uint16_t sourcecount;
107        libtrace_message_queue_t mqueue;
108        int threadindex;
109        ndag_monitor_t *knownmonitors;
110        uint16_t monitorcount;
111
112        uint64_t dropped_upstream;
113        uint64_t missing_records;
114        uint64_t received_packets;
115
116        fd_set allsocks;
117        int maxfd;
118} recvstream_t;
119
120typedef struct ndag_format_data {
121        char *multicastgroup;
122        char *portstr;
123        char *localiface;
124        uint16_t nextthreadid;
125        recvstream_t *receivers;
126
127        pthread_t controlthread;
128        libtrace_message_queue_t controlqueue;
129} ndag_format_data_t;
130
131enum {
132        NDAG_CLIENT_HALT = 0x01,
133        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
134        NDAG_CLIENT_NEWGROUP = 0x03
135};
136
137typedef struct ndagreadermessage {
138        uint8_t type;
139        streamsource_t contents;
140} ndag_internal_message_t;
141
142
143static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
144
145        /* Calculate seq_a - seq_b, taking wraparound into account */
146        if (seq_a == seq_b) return 0;
147
148        if (seq_a > seq_b) {
149                return (int) (seq_a - seq_b);
150        }
151
152        /* -1 for the wrap and another -1 because we don't use zero */
153        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
154}
155
156static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
157        ndag_common_t *header = (ndag_common_t *)msgbuf;
158
159        if (msgsize < sizeof(ndag_common_t)) {
160                fprintf(stderr,
161                        "nDAG message does not have a complete nDAG header.\n");
162                return 0;
163        }
164
165        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
166                fprintf(stderr,
167                        "nDAG message does not have a valid magic number.\n");
168                return 0;
169        }
170
171        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
172                fprintf(stderr,
173                        "nDAG message has an invalid header version: %u\n",
174                                header->version);
175                return 0;
176        }
177
178        return header->type;
179}
180
181static int join_multicast_group(char *groupaddr, char *localiface,
182        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
183
184        struct addrinfo hints;
185        struct addrinfo *gotten;
186        struct addrinfo *group;
187        unsigned int interface;
188        char pstr[16];
189        struct group_req greq;
190        int bufsize;
191
192        int sock;
193
194        if (portstr == NULL) {
195                snprintf(pstr, 15, "%u", portnum);
196                portstr = pstr;
197        }
198
199        interface = if_nametoindex(localiface);
200        if (interface == 0) {
201                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
202                                localiface, strerror(errno));
203                return -1;
204        }
205
206        hints.ai_family = PF_UNSPEC;
207        hints.ai_socktype = SOCK_DGRAM;
208        hints.ai_flags = AI_PASSIVE;
209        hints.ai_protocol = 0;
210
211        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
212                fprintf(stderr,
213                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
214                                portstr, strerror(errno));
215                return -1;
216        }
217
218        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
219                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
220                                groupaddr, strerror(errno));
221                return -1;
222        }
223
224        *srcinfo = gotten;
225        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
226        if (sock < 0) {
227                fprintf(stderr,
228                        "Failed to create multicast socket for %s:%s -- %s\n",
229                                groupaddr, portstr, strerror(errno));
230                goto sockcreateover;
231        }
232
233        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
234        {
235                fprintf(stderr,
236                        "Failed to bind to multicast socket %s:%s -- %s\n",
237                                groupaddr, portstr, strerror(errno));
238                sock = -1;
239                goto sockcreateover;
240        }
241
242        greq.gr_interface = interface;
243        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
244
245        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
246                        sizeof(greq)) < 0) {
247                fprintf(stderr,
248                        "Failed to join multicast group %s:%s -- %s\n",
249                                groupaddr, portstr, strerror(errno));
250                close(sock);
251                sock = -1;
252                goto sockcreateover;
253        }
254
255        bufsize = 16 * 1024 * 1024;
256        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
257                                (socklen_t)sizeof(int)) < 0) {
258
259                fprintf(stderr,
260                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
261                                groupaddr, portstr, strerror(errno));
262                close(sock);
263                sock = -1;
264                goto sockcreateover;
265        }
266
267sockcreateover:
268        freeaddrinfo(group);
269        return sock;
270}
271
272
273static int ndag_init_input(libtrace_t *libtrace) {
274
275        char *scan = NULL;
276        char *next = NULL;
277
278        libtrace->format_data = (ndag_format_data_t *)malloc(
279                        sizeof(ndag_format_data_t));
280
281        FORMAT_DATA->multicastgroup = NULL;
282        FORMAT_DATA->portstr = NULL;
283        FORMAT_DATA->localiface = NULL;
284        FORMAT_DATA->nextthreadid = 0;
285        FORMAT_DATA->receivers = NULL;
286
287        scan = strchr(libtrace->uridata, ',');
288        if (scan == NULL) {
289                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
290                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
291                return -1;
292        }
293        FORMAT_DATA->localiface = strndup(libtrace->uridata,
294                        (size_t)(scan - libtrace->uridata));
295        next = scan + 1;
296
297        scan = strchr(next, ',');
298        if (scan == NULL) {
299                FORMAT_DATA->portstr = strdup("9001");
300                FORMAT_DATA->multicastgroup = strdup(next);
301        } else {
302                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
303
304                FORMAT_DATA->portstr = strdup(scan + 1);
305        }
306        return 0;
307}
308
309static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
310                uint16_t portnum, uint16_t monid) {
311
312        ndag_internal_message_t alert;
313
314        alert.type = NDAG_CLIENT_NEWGROUP;
315        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
316        alert.contents.localiface = FORMAT_DATA->localiface;
317        alert.contents.port = portnum;
318        alert.contents.monitor = monid;
319
320        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
321                        (void *)&alert);
322
323}
324
325static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
326                int msgsize, uint16_t *ptmap) {
327
328        int i;
329        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
330        uint8_t msgtype;
331
332        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
333        if (msgtype == 0) {
334                return -1;
335        }
336
337        msgsize -= sizeof(ndag_common_t);
338        if (msgtype == NDAG_PKT_BEACON) {
339                /* If message is a beacon, make sure every port included in the
340                 * beacon is assigned to a receive thread.
341                 */
342                uint16_t *ptr, numstreams;
343
344                if ((uint32_t)msgsize < sizeof(uint16_t)) {
345                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
346                        return -1;
347                }
348
349                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
350                numstreams = ntohs(*ptr);
351                ptr ++;
352
353                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
354                {
355                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
356                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
357                        return -1;
358                }
359
360                for (i = 0; i < numstreams; i++) {
361                        uint16_t streamport = ntohs(*ptr);
362
363                        if (ptmap[streamport] == 0xffff) {
364                                new_group_alert(libtrace,
365                                        FORMAT_DATA->nextthreadid, streamport,
366                                        ntohs(ndaghdr->monitorid));
367
368                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
369
370                                if (libtrace->perpkt_thread_count == 0) {
371                                        FORMAT_DATA->nextthreadid = 0;
372                                } else {
373                                        FORMAT_DATA->nextthreadid =
374                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
375                                }
376                        }
377
378                        ptr ++;
379                }
380        } else {
381                fprintf(stderr,
382                        "Unexpected message type on control channel: %u\n",
383                         msgtype);
384                return -1;
385        }
386
387        return 0;
388
389}
390
391static void *ndag_controller_run(void *tdata) {
392
393        libtrace_t *libtrace = (libtrace_t *)tdata;
394        uint16_t ptmap[65536];
395        int sock = -1;
396        struct addrinfo *receiveaddr = NULL;
397        fd_set listening;
398        struct timeval timeout;
399
400        /* ptmap is a dirty hack to allow us to quickly check if we've already
401         * assigned a stream to a thread.
402         */
403        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
404
405        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
406                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
407                        &receiveaddr);
408        if (sock == -1) {
409                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
410                        "Unable to join multicast group for nDAG control channel");
411                trace_interrupt();
412                pthread_exit(NULL);
413        }
414
415        ndag_paused = 0;
416        while ((is_halted(libtrace) == -1) && !ndag_paused) {
417                int ret;
418                char buf[CTRL_BUF_SIZE];
419
420                FD_ZERO(&listening);
421                FD_SET(sock, &listening);
422
423                timeout.tv_sec = 0;
424                timeout.tv_usec = 500000;
425
426                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
427                if (ret < 0) {
428                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
429                        break;
430                }
431
432                if (!FD_ISSET(sock, &listening)) {
433                        continue;
434                }
435
436                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
437                                receiveaddr->ai_addr,
438                                &(receiveaddr->ai_addrlen));
439                if (ret < 0) {
440                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
441                        break;
442                }
443
444                if (ret == 0) {
445                        break;
446                }
447
448                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
449                        fprintf(stderr, "Error while parsing nDAG control message.\n");
450                        continue;
451                }
452        }
453
454        if (sock >= 0) {
455                close(sock);
456        }
457
458        /* Control channel has fallen over, should probably encourage libtrace
459         * to halt the receiver threads as well.
460         */
461        if (!is_halted(libtrace)) {
462                trace_interrupt();
463        }
464
465        pthread_exit(NULL);
466}
467
468static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
469{
470        int ret;
471        uint32_t i;
472        /* Configure the set of receiver threads */
473
474        if (FORMAT_DATA->receivers == NULL) {
475                /* What if the number of threads changes between a pause and
476                 * a restart? Can this happen? */
477                FORMAT_DATA->receivers = (recvstream_t *)
478                                malloc(sizeof(recvstream_t) * maxthreads);
479        }
480
481        for (i = 0; i < maxthreads; i++) {
482                FORMAT_DATA->receivers[i].sources = NULL;
483                FORMAT_DATA->receivers[i].sourcecount = 0;
484                FORMAT_DATA->receivers[i].knownmonitors = NULL;
485                FORMAT_DATA->receivers[i].monitorcount = 0;
486                FORMAT_DATA->receivers[i].threadindex = i;
487                FORMAT_DATA->receivers[i].dropped_upstream = 0;
488                FORMAT_DATA->receivers[i].received_packets = 0;
489                FORMAT_DATA->receivers[i].missing_records = 0;
490                FD_ZERO(&(FORMAT_DATA->receivers[i].allsocks));
491                FORMAT_DATA->receivers[i].maxfd = -1;
492
493                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
494                                sizeof(ndag_internal_message_t));
495        }
496
497        /* Start the controller thread */
498        /* TODO consider affinity of this thread? */
499
500        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
501                        ndag_controller_run, libtrace);
502        if (ret != 0) {
503                return -1;
504        }
505        return maxthreads;
506}
507
508static int ndag_start_input(libtrace_t *libtrace) {
509        return ndag_start_threads(libtrace, 1);
510}
511
512static int ndag_pstart_input(libtrace_t *libtrace) {
513        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
514                        libtrace->perpkt_thread_count)
515                return 0;
516        return -1;
517}
518
519static void halt_ndag_receiver(recvstream_t *receiver) {
520        int j, i;
521        libtrace_message_queue_destroy(&(receiver->mqueue));
522
523        if (receiver->sources == NULL)
524                return;
525        for (i = 0; i < receiver->sourcecount; i++) {
526                streamsock_t src = receiver->sources[i];
527                if (src.saved) {
528                        for (j = 0; j < ENCAP_BUFFERS; j++) {
529                                if (src.saved[j]) {
530                                        free(src.saved[j]);
531                                }
532                        }
533                        free(src.saved);
534                }
535
536#if HAVE_RECVMMSG
537                for (j = 0; j < RECV_BATCH_SIZE; j++) {
538                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
539                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
540                        }
541                }
542#else
543                free(src.singlemsg.msg_iov);
544#endif
545
546                close(src.sock);
547        }
548        if (receiver->knownmonitors) {
549                free(receiver->knownmonitors);
550        }
551
552        if (receiver->sources) {
553                free(receiver->sources);
554        }
555}
556
557static int ndag_pause_input(libtrace_t *libtrace) {
558        int i;
559
560        /* Close the existing receiver sockets */
561        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
562               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
563        }
564        return 0;
565}
566
567static int ndag_fin_input(libtrace_t *libtrace) {
568
569        if (FORMAT_DATA->receivers) {
570                free(FORMAT_DATA->receivers);
571        }
572        if (FORMAT_DATA->multicastgroup) {
573                free(FORMAT_DATA->multicastgroup);
574        }
575        if (FORMAT_DATA->portstr) {
576                free(FORMAT_DATA->portstr);
577        }
578        if (FORMAT_DATA->localiface) {
579                free(FORMAT_DATA->localiface);
580        }
581
582        free(libtrace->format_data);
583        return 0;
584}
585
586static int ndag_prepare_packet_stream(libtrace_t *libtrace,
587                recvstream_t *rt,
588                streamsock_t *ssock, libtrace_packet_t *packet,
589                uint32_t flags) {
590
591        dag_record_t *erfptr;
592        ndag_encap_t *encaphdr;
593        uint16_t ndag_reccount = 0;
594        int nr;
595        uint16_t rlen;
596
597        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
598                packet->buf_control = TRACE_CTRL_PACKET;
599        } else {
600                packet->buf_control = TRACE_CTRL_EXTERNAL;
601        }
602
603        packet->trace = libtrace;
604        packet->buffer = ssock->nextread;
605        packet->header = ssock->nextread;
606        packet->type = TRACE_RT_DATA_ERF;
607
608        erfptr = (dag_record_t *)packet->header;
609
610        if (erfptr->flags.rxerror == 1) {
611                packet->payload = NULL;
612                erfptr->rlen = htons(erf_get_framing_length(packet));
613        } else {
614                packet->payload = (char *)packet->buffer +
615                                erf_get_framing_length(packet);
616        }
617
618        /* Update upstream drops using lctr */
619
620        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
621                /* TODO */
622        } else {
623                if (rt->received_packets > 0) {
624                        rt->dropped_upstream += ntohs(erfptr->lctr);
625                }
626        }
627
628        rt->received_packets ++;
629        ssock->recordcount += 1;
630
631        nr = ssock->nextreadind;
632        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
633                        sizeof(ndag_common_t));
634
635        ndag_reccount = ntohs(encaphdr->recordcount);
636        if ((ndag_reccount & 0x8000) != 0) {
637                /* Record was truncated -- update rlen appropriately */
638                rlen = ssock->savedsize[nr] -
639                                (ssock->nextread - ssock->saved[nr]);
640                erfptr->rlen = htons(rlen);
641        } else {
642                rlen = ntohs(erfptr->rlen);
643        }
644        ssock->nextread += rlen;
645        ssock->nextts = 0;
646
647        assert(ssock->nextread - ssock->saved[nr] <= ssock->savedsize[nr]);
648
649        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
650                /* Read everything from this buffer, mark as empty and
651                 * move on. */
652                ssock->savedsize[nr] = 0;
653                ssock->bufwaiting ++;
654
655                nr ++;
656                if (nr == ENCAP_BUFFERS) {
657                        nr = 0;
658                }
659                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
660                                sizeof(ndag_encap_t);
661                ssock->nextreadind = nr;
662        }
663
664        packet->order = erf_get_erf_timestamp(packet);
665        packet->error = rlen;
666        return rlen;
667}
668
669static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
670                libtrace_packet_t *packet UNUSED,
671                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
672                uint32_t flags UNUSED) {
673
674        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
675        return 0;
676
677}
678
679static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
680
681        ndag_monitor_t *mon;
682
683        if (rt->monitorcount == 0) {
684                rt->knownmonitors = (ndag_monitor_t *)
685                                malloc(sizeof(ndag_monitor_t) * 5);
686        } else {
687                rt->knownmonitors = (ndag_monitor_t *)
688                            realloc(rt->knownmonitors,
689                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
690        }
691
692        mon = &(rt->knownmonitors[rt->monitorcount]);
693        mon->monitorid = monid;
694        mon->laststart = 0;
695
696        rt->monitorcount ++;
697        return mon;
698}
699
700static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
701
702        streamsock_t *ssock = NULL;
703        ndag_monitor_t *mon = NULL;
704        int i;
705
706        /* TODO consider replacing this with a list or vector so we can
707         * easily remove sources that are no longer in use, rather than
708         * just setting the sock to -1 and having to check them every
709         * time we want to read a packet.
710         */
711        if (rt->sourcecount == 0) {
712                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
713        } else if ((rt->sourcecount % 10) == 0) {
714                rt->sources = (streamsock_t *)realloc(rt->sources,
715                        sizeof(streamsock_t) * (rt->sourcecount + 10));
716        }
717
718        ssock = &(rt->sources[rt->sourcecount]);
719
720        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
721                        NULL, src.port, &(ssock->srcaddr));
722
723        if (ssock->sock < 0) {
724                return -1;
725        }
726
727        for (i = 0; i < rt->monitorcount; i++) {
728                if (rt->knownmonitors[i].monitorid == src.monitor) {
729                        mon = &(rt->knownmonitors[i]);
730                        break;
731                }
732        }
733
734        if (mon == NULL) {
735                mon = add_new_knownmonitor(rt, src.monitor);
736        }
737
738        ssock->port = src.port;
739        ssock->groupaddr = src.groupaddr;
740        ssock->expectedseq = 0;
741        ssock->monitorptr = mon;
742        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
743        ssock->bufavail = ENCAP_BUFFERS;
744        ssock->bufwaiting = 0;
745        ssock->startidle = 0;
746        ssock->nextts = 0;
747
748        for (i = 0; i < ENCAP_BUFFERS; i++) {
749                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
750                ssock->savedsize[i] = 0;
751        }
752
753#if HAVE_RECVMMSG
754        for (i = 0; i < RECV_BATCH_SIZE; i++) {
755                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
756                                malloc(sizeof(struct iovec));
757                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
758                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
759                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
760                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
761                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
762                ssock->mmsgbufs[i].msg_len = 0;
763        }
764#else
765        ssock->singlemsg.msg_iov = (struct iovec *) malloc(sizeof(struct iovec));
766#endif
767
768        ssock->nextread = NULL;;
769        ssock->nextreadind = 0;
770        ssock->nextwriteind = 0;
771        ssock->recordcount = 0;
772        rt->sourcecount += 1;
773        FD_SET(ssock->sock, &(rt->allsocks));
774        if (ssock->sock > rt->maxfd) {
775                rt->maxfd = ssock->sock;
776        }
777
778        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
779                        ssock->groupaddr, ssock->port, rt->threadindex);
780
781        return ssock->port;
782}
783
784static int receiver_read_messages(recvstream_t *rt) {
785
786        ndag_internal_message_t msg;
787
788        while (libtrace_message_queue_try_get(&(rt->mqueue),
789                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
790                switch(msg.type) {
791                        case NDAG_CLIENT_NEWGROUP:
792                                if (add_new_streamsock(rt, msg.contents) < 0) {
793                                        return -1;
794                                }
795                                break;
796                        case NDAG_CLIENT_HALT:
797                                return 0;
798                }
799        }
800        return 1;
801
802}
803
804static inline int readable_data(streamsock_t *ssock) {
805
806        if (ssock->sock == -1) {
807                return 0;
808        }
809        if (ssock->savedsize[ssock->nextreadind] == 0) {
810                return 0;
811        }
812        /*
813        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
814                        ssock->savedsize[ssock->nextreadind]) {
815                return 0;
816        }
817        */
818        return 1;
819
820
821}
822
823static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
824
825        int i;
826        for (i = 0; i < rt->sourcecount; i++) {
827                if (rt->sources[i].monitorptr == mon) {
828                        rt->sources[i].expectedseq = 0;
829                }
830        }
831
832}
833
834static int init_receivers(streamsock_t *ssock, int required) {
835
836        int wind = ssock->nextwriteind;
837        int i = 1;
838
839#if HAVE_RECVMMSG
840        for (i = 0; i < required; i++) {
841                if (i >= RECV_BATCH_SIZE) {
842                        break;
843                }
844
845                if (wind >= ENCAP_BUFFERS) {
846                        wind = 0;
847                }
848
849                ssock->mmsgbufs[i].msg_len = 0;
850                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
851                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
852                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
853
854                wind ++;
855        }
856#else
857        assert(required > 0);
858        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
859        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
860        ssock->singlemsg.msg_iovlen = 1;
861#endif
862        return i;
863}
864
865static int check_ndag_received(streamsock_t *ssock, int index,
866                unsigned int msglen, recvstream_t *rt) {
867
868        ndag_encap_t *encaphdr;
869        ndag_monitor_t *mon;
870        uint8_t rectype;
871
872        /* Check that we have a valid nDAG encap record */
873        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
874
875        if (rectype == NDAG_PKT_KEEPALIVE) {
876                /* Keep-alive, reset startidle and carry on. Don't
877                 * change nextwrite -- we want to overwrite the
878                 * keep-alive with usable content. */
879                return 0;
880        } else if (rectype != NDAG_PKT_ENCAPERF) {
881                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
882                                ssock->groupaddr, ssock->port);
883                FD_CLR(ssock->sock, &(rt->allsocks));
884                close(ssock->sock);
885                ssock->sock = -1;
886                return -1;
887        }
888
889        ssock->savedsize[index] = msglen;
890        ssock->nextwriteind ++;
891        ssock->bufavail --;
892
893        assert(ssock->bufavail >= 0);
894        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
895                ssock->nextwriteind = 0;
896        }
897
898        /* Get the useful info from the encap header */
899        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
900
901        mon = ssock->monitorptr;
902
903        if (mon->laststart == 0) {
904                mon->laststart = bswap_be_to_host64(encaphdr->started);
905        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
906                mon->laststart = bswap_be_to_host64(encaphdr->started);
907                reset_expected_seqs(rt, mon);
908
909                /* TODO what is a good way to indicate this to clients?
910                 * set the loss counter in the ERF header? a bit rude?
911                 * use another bit in the ERF header?
912                 * add a queryable flag to libtrace_packet_t?
913                 */
914
915        }
916
917        if (ssock->expectedseq != 0) {
918                rt->missing_records += seq_cmp(
919                                ntohl(encaphdr->seqno), ssock->expectedseq);
920
921        }
922        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
923        if (ssock->expectedseq == 0) {
924                ssock->expectedseq ++;
925        }
926
927        if (ssock->nextread == NULL) {
928                /* If this is our first read, set up 'nextread'
929                 * by skipping past the nDAG headers */
930                ssock->nextread = ssock->saved[0] +
931                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
932        }
933        return 1;
934
935}
936
937static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
938                int *gottime, recvstream_t *rt) {
939
940        int ret, ndagstat, avail;
941        int toret = 0;
942
943#if HAVE_RECVMMSG
944        int i;
945#endif
946
947        if (ssock->sock == -1) {
948                return 0;
949        }
950
951#if HAVE_RECVMMSG
952        /* Plenty of full buffers, just use the packets in those */
953        if (ssock->bufavail < RECV_BATCH_SIZE / 2) {
954                return 1;
955        }
956#else
957        if (ssock->bufavail == 0) {
958                return 1;
959        }
960#endif
961
962        avail = init_receivers(ssock, ssock->bufavail);
963
964#if HAVE_RECVMMSG
965        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
966                        MSG_DONTWAIT, NULL);
967#else
968        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
969#endif
970        if (ret < 0) {
971                /* Nothing to receive right now, but we should still
972                 * count as 'ready' if at least one buffer is full */
973                if (errno == EAGAIN || errno == EWOULDBLOCK) {
974                        if (readable_data(ssock)) {
975                                toret = 1;
976                        }
977                        if (!(*gottime)) {
978                                gettimeofday(tv, NULL);
979                                *gottime = 1;
980                        }
981                        if (ssock->startidle == 0) {
982                                ssock->startidle = tv->tv_sec;
983                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
984                                fprintf(stderr,
985                                        "Closing channel %s:%u due to inactivity.\n",
986                                        ssock->groupaddr,
987                                        ssock->port);
988
989                                FD_CLR(ssock->sock, &(rt->allsocks));
990                                close(ssock->sock);
991                                ssock->sock = -1;
992                        }
993                } else {
994
995                        fprintf(stderr,
996                                "Error receiving encapsulated records from %s:%u -- %s \n",
997                                ssock->groupaddr, ssock->port,
998                                strerror(errno));
999                        FD_CLR(ssock->sock, &(rt->allsocks));
1000                        close(ssock->sock);
1001                        ssock->sock = -1;
1002                }
1003                return toret;
1004        }
1005
1006        ssock->startidle = 0;
1007
1008#if HAVE_RECVMMSG
1009        for (i = 0; i < ret; i++) {
1010                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
1011                                ssock->mmsgbufs[i].msg_len, rt);
1012                if (ndagstat == -1) {
1013                        break;
1014                }
1015
1016                if (ndagstat == 1) {
1017                        toret = 1;
1018                }
1019        }
1020#else
1021        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1022        if (ndagstat <= 0) {
1023                toret = 0;
1024        } else {
1025                toret = 1;
1026        }
1027#endif
1028
1029        return toret;
1030}
1031
1032static int receive_from_sockets(recvstream_t *rt) {
1033
1034        int i, readybufs, gottime;
1035        struct timeval tv;
1036        fd_set fds;
1037        struct timeval zerotv;
1038
1039        readybufs = 0;
1040        gottime = 0;
1041
1042        fds = rt->allsocks;
1043
1044        if (rt->maxfd == -1) {
1045                return 0;
1046        }
1047
1048        zerotv.tv_sec = 0;
1049        zerotv.tv_usec = 0;
1050
1051        if (select(rt->maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1052                /* log the error? XXX */
1053                return -1;
1054        }
1055
1056        for (i = 0; i < rt->sourcecount; i++) {
1057                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1058                        continue;
1059                }
1060                readybufs += receive_from_single_socket(&(rt->sources[i]),
1061                                &tv, &gottime, rt);
1062        }
1063
1064        return readybufs;
1065
1066}
1067
1068
1069static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1070                libtrace_packet_t *packet) {
1071
1072        int iserr = 0;
1073
1074        if (packet->buf_control == TRACE_CTRL_PACKET) {
1075                free(packet->buffer);
1076                packet->buffer = NULL;
1077        }
1078
1079        do {
1080                /* Make sure we shouldn't be halting */
1081                if ((iserr = is_halted(libtrace)) != -1) {
1082                        return iserr;
1083                }
1084
1085                /* Check for any messages from the control thread */
1086                iserr = receiver_read_messages(rt);
1087
1088                if (iserr <= 0) {
1089                        return iserr;
1090                }
1091
1092                /* If blocking and no sources, sleep for a bit and then try
1093                 * checking for messages again.
1094                 */
1095                if (rt->sourcecount == 0) {
1096                        usleep(10000);
1097                        continue;
1098                }
1099
1100                if ((iserr = receive_from_sockets(rt)) < 0) {
1101                        return iserr;
1102                } else if (iserr > 0) {
1103                        /* At least one of our input sockets has available
1104                         * data, let's go ahead and use what we have. */
1105                        break;
1106                }
1107
1108                /* None of our sources have anything available, we can take
1109                 * a short break rather than immediately trying again.
1110                 */
1111                if (iserr == 0) {
1112                        usleep(100);
1113                }
1114
1115        } while (1);
1116
1117        return iserr;
1118}
1119
1120static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1121                libtrace_packet_t *packet) {
1122
1123        int iserr = 0;
1124
1125        if (packet->buf_control == TRACE_CTRL_PACKET) {
1126                free(packet->buffer);
1127                packet->buffer = NULL;
1128        }
1129
1130        /* Make sure we shouldn't be halting */
1131        if ((iserr = is_halted(libtrace)) != -1) {
1132                return iserr;
1133        }
1134
1135        /* If non-blocking and there are no sources, just break */
1136        if (rt->sourcecount == 0) {
1137                return 0;
1138        }
1139
1140        return receive_from_sockets(rt);
1141}
1142
1143static streamsock_t *select_next_packet(recvstream_t *rt) {
1144        int i;
1145        streamsock_t *ssock = NULL;
1146        uint64_t earliest = 0;
1147        uint64_t currentts = 0;
1148        dag_record_t *daghdr;
1149
1150        /* If we only have one source, then no need to do any
1151         * timestamp parsing or byteswapping.
1152         */
1153        if (rt->sourcecount == 1) {
1154                if (readable_data(&(rt->sources[0]))) {
1155                        return &(rt->sources[0]);
1156                }
1157                return NULL;
1158        }
1159
1160
1161        for (i = 0; i < rt->sourcecount; i ++) {
1162                if (!readable_data(&(rt->sources[i]))) {
1163                        continue;
1164                }
1165
1166                if (rt->sources[i].nextts == 0) {
1167                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1168                        currentts = bswap_le_to_host64(daghdr->ts);
1169                        rt->sources[i].nextts = currentts;
1170                } else {
1171                        currentts = rt->sources[i].nextts;
1172                }
1173
1174                if (earliest == 0 || earliest > currentts) {
1175                        earliest = currentts;
1176                        ssock = &(rt->sources[i]);
1177                }
1178        }
1179        return ssock;
1180}
1181
1182static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1183
1184        int rem, ret;
1185        streamsock_t *nextavail = NULL;
1186        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1187                        packet);
1188
1189        if (rem <= 0) {
1190                return rem;
1191        }
1192
1193        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1194        if (nextavail == NULL) {
1195                return 0;
1196        }
1197
1198        /* nextread should point at an ERF header, so prepare 'packet' to be
1199         * a libtrace ERF packet. */
1200
1201        ret = ndag_prepare_packet_stream(libtrace,
1202                        &(FORMAT_DATA->receivers[0]), nextavail,
1203                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1204        nextavail->bufavail += nextavail->bufwaiting;
1205        nextavail->bufwaiting = 0;
1206        return ret;
1207}
1208
1209static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1210                libtrace_packet_t **packets, size_t nb_packets) {
1211
1212        recvstream_t *rt;
1213        int rem, i;
1214        size_t read_packets = 0;
1215        streamsock_t *nextavail = NULL;
1216
1217        rt = (recvstream_t *)t->format_data;
1218
1219
1220        do {
1221                /* Only check for messages once per batch */
1222                if (read_packets == 0) {
1223                        rem = receive_encap_records_block(libtrace, rt,
1224                                packets[read_packets]);
1225                } else {
1226                        rem = receive_encap_records_nonblock(libtrace, rt,
1227                                packets[read_packets]);
1228                }
1229
1230                if (rem < 0) {
1231                        return rem;
1232                }
1233
1234                if (rem == 0) {
1235                        break;
1236                }
1237
1238                nextavail = select_next_packet(rt);
1239                if (nextavail == NULL) {
1240                        break;
1241                }
1242
1243                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1244                                packets[read_packets],
1245                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1246
1247                read_packets  ++;
1248                if (read_packets >= nb_packets) {
1249                        break;
1250                }
1251        } while (1);
1252
1253        for (i = 0; i < rt->sourcecount; i++) {
1254                streamsock_t *src = &(rt->sources[i]);
1255                src->bufavail += src->bufwaiting;
1256                src->bufwaiting = 0;
1257                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1258        }
1259
1260        return read_packets;
1261
1262}
1263
1264static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1265                libtrace_packet_t *packet) {
1266
1267
1268        libtrace_eventobj_t event = {0,0,0.0,0};
1269        int rem, i;
1270        streamsock_t *nextavail = NULL;
1271
1272        /* Only check for messages once per call */
1273        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1274        if (rem <= 0) {
1275                event.type = TRACE_EVENT_TERMINATE;
1276                return event;
1277        }
1278
1279        do {
1280                rem = receive_encap_records_nonblock(libtrace,
1281                                &(FORMAT_DATA->receivers[0]), packet);
1282
1283                if (rem < 0) {
1284                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1285                                "Received invalid nDAG records.");
1286                        event.type = TRACE_EVENT_TERMINATE;
1287                        break;
1288                }
1289
1290                if (rem == 0) {
1291                        /* Either we've been halted or we've got no packets
1292                         * right now. */
1293                        if (is_halted(libtrace) == 0) {
1294                                event.type = TRACE_EVENT_TERMINATE;
1295                                break;
1296                        }
1297                        event.type = TRACE_EVENT_SLEEP;
1298                        event.seconds = 0.0001;
1299                        break;
1300                }
1301
1302                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1303                if (nextavail == NULL) {
1304                        event.type = TRACE_EVENT_SLEEP;
1305                        event.seconds = 0.0001;
1306                        break;
1307                }
1308
1309                event.type = TRACE_EVENT_PACKET;
1310                ndag_prepare_packet_stream(libtrace,
1311                                &(FORMAT_DATA->receivers[0]), nextavail,
1312                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1313                event.size = trace_get_capture_length(packet) +
1314                                trace_get_framing_length(packet);
1315
1316                if (libtrace->filter) {
1317                        int filtret = trace_apply_filter(libtrace->filter,
1318                                        packet);
1319                        if (filtret == -1) {
1320                                trace_set_err(libtrace,
1321                                                TRACE_ERR_BAD_FILTER,
1322                                                "Bad BPF Filter");
1323                                event.type = TRACE_EVENT_TERMINATE;
1324                                break;
1325                        }
1326
1327                        if (filtret == 0) {
1328                                /* Didn't match filter, try next one */
1329                                libtrace->filtered_packets ++;
1330                                trace_clear_cache(packet);
1331                                continue;
1332                        }
1333                }
1334
1335                if (libtrace->snaplen > 0) {
1336                        trace_set_capture_length(packet, libtrace->snaplen);
1337                }
1338                libtrace->accepted_packets ++;
1339                break;
1340        } while (1);
1341
1342        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1343                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1344                src->bufavail += src->bufwaiting;
1345                src->bufwaiting = 0;
1346                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1347        }
1348
1349        return event;
1350}
1351
1352static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1353
1354        int i;
1355
1356        stat->dropped_valid = 1;
1357        stat->dropped = 0;
1358        stat->received_valid = 1;
1359        stat->received = 0;
1360        stat->missing_valid = 1;
1361        stat->missing = 0;
1362
1363        /* TODO Is this thread safe? */
1364        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1365                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1366                stat->received += FORMAT_DATA->receivers[i].received_packets;
1367                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1368        }
1369
1370}
1371
1372static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1373                libtrace_stat_t *stat) {
1374
1375        recvstream_t *recvr = (recvstream_t *)t->format_data;
1376
1377        if (libtrace == NULL)
1378                return;
1379        /* TODO Is this thread safe */
1380        stat->dropped_valid = 1;
1381        stat->dropped = recvr->dropped_upstream;
1382
1383        stat->received_valid = 1;
1384        stat->received = recvr->received_packets;
1385
1386        stat->missing_valid = 1;
1387        stat->missing = recvr->missing_records;
1388
1389}
1390
1391static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1392                bool reader) {
1393        recvstream_t *recvr;
1394
1395        if (!reader || t->type != THREAD_PERPKT) {
1396                return 0;
1397        }
1398
1399        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1400        t->format_data = recvr;
1401
1402        return 0;
1403}
1404
1405static struct libtrace_format_t ndag = {
1406
1407        "ndag",
1408        "",
1409        TRACE_FORMAT_NDAG,
1410        NULL,                   /* probe filename */
1411        NULL,                   /* probe magic */
1412        ndag_init_input,        /* init_input */
1413        NULL,                   /* config_input */
1414        ndag_start_input,       /* start_input */
1415        ndag_pause_input,       /* pause_input */
1416        NULL,                   /* init_output */
1417        NULL,                   /* config_output */
1418        NULL,                   /* start_output */
1419        ndag_fin_input,         /* fin_input */
1420        NULL,                   /* fin_output */
1421        ndag_read_packet,       /* read_packet */
1422        ndag_prepare_packet,    /* prepare_packet */
1423        NULL,                   /* fin_packet */
1424        NULL,                   /* write_packet */
1425        erf_get_link_type,      /* get_link_type */
1426        erf_get_direction,      /* get_direction */
1427        erf_set_direction,      /* set_direction */
1428        erf_get_erf_timestamp,  /* get_erf_timestamp */
1429        NULL,                   /* get_timeval */
1430        NULL,                   /* get_seconds */
1431        NULL,                   /* get_timespec */
1432        NULL,                   /* seek_erf */
1433        NULL,                   /* seek_timeval */
1434        NULL,                   /* seek_seconds */
1435        erf_get_capture_length, /* get_capture_length */
1436        erf_get_wire_length,    /* get_wire_length */
1437        erf_get_framing_length, /* get_framing_length */
1438        erf_set_capture_length, /* set_capture_length */
1439        NULL,                   /* get_received_packets */
1440        NULL,                   /* get_filtered_packets */
1441        NULL,                   /* get_dropped_packets */
1442        ndag_get_statistics,    /* get_statistics */
1443        NULL,                   /* get_fd */
1444        trace_event_ndag,       /* trace_event */
1445        NULL,                   /* help */
1446        NULL,                   /* next pointer */
1447        {true, 0},              /* live packet capture */
1448        ndag_pstart_input,      /* parallel start */
1449        ndag_pread_packets,     /* parallel read */
1450        ndag_pause_input,       /* parallel pause */
1451        NULL,
1452        ndag_pregister_thread,  /* register thread */
1453        NULL,
1454        ndag_get_thread_stats   /* per-thread stats */
1455};
1456
1457void ndag_constructor(void) {
1458        register_format(&ndag);
1459}
Note: See TracBrowser for help on using the repository browser.