source: lib/format_ndag.c @ 6a2f037

developringperformance
Last change on this file since 6a2f037 was 6a2f037, checked in by Shane Alcock <salcock@…>, 2 years ago

ndag: join the stream socket multicast group at the last minute

Joining earlier means that we have packets queuing up while we're
still allocating buffers etc. This means we run the risk of
dropping packets during our init phase.

Instead, we now do as much initialisation as possible before
joining the group -- it means that our first packet might end up
being a later one than we would have read previously, but we are
less likely to have a gap in our incoming stream.

Using the old method, our first batch of packets might have been
as follows:

1, 2, 3, 4, 5, 8, 9, 10

Now we might miss 1 and 2, but avoid having the gap from 5 to 8,
i.e.:

3, 4, 5, 6, 7, 8, 9, 10

I think generally the latter is a better sequence of packets for
us to provide to the user. It also means that our reported missing
count is zero, which is less likely to make users think that there
is a performance issue in their own code.

  • Property mode set to 100644
File size: 46.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <assert.h>
38#include <errno.h>
39#include <fcntl.h>
40#include <stdio.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <net/if.h>
45#include <sys/types.h>
46#include <sys/socket.h>
47#include <netdb.h>
48
49#include "format_ndag.h"
50
51#define NDAG_IDLE_TIMEOUT (600)
52#define ENCAP_BUFSIZE (10000)
53#define CTRL_BUF_SIZE (10000)
54#define ENCAP_BUFFERS (1000)
55
56#define RECV_BATCH_SIZE (50)
57
58#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
59
60static struct libtrace_format_t ndag;
61
62volatile int ndag_paused = 0;
63
64typedef struct monitor {
65        uint16_t monitorid;
66        uint64_t laststart;
67} ndag_monitor_t;
68
69
70typedef struct streamsource {
71        uint16_t monitor;
72        char *groupaddr;
73        char *localiface;
74        uint16_t port;
75} streamsource_t;
76
77typedef struct streamsock {
78        char *groupaddr;
79        int sock;
80        struct addrinfo *srcaddr;
81        uint16_t port;
82        uint32_t expectedseq;
83        ndag_monitor_t *monitorptr;
84        char **saved;
85        char *nextread;
86        int nextreadind;
87        int nextwriteind;
88        int savedsize[ENCAP_BUFFERS];
89        uint64_t nextts;
90        uint32_t startidle;
91        uint64_t recordcount;
92
93        int bufavail;
94        int bufwaiting;
95
96#if HAVE_DECL_RECVMMSG
97        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
98#else
99        struct msghdr singlemsg;
100#endif
101
102} streamsock_t;
103
104typedef struct recvstream {
105        streamsock_t *sources;
106        uint16_t sourcecount;
107        libtrace_message_queue_t mqueue;
108        int threadindex;
109        ndag_monitor_t *knownmonitors;
110        uint16_t monitorcount;
111
112        uint64_t dropped_upstream;
113        uint64_t missing_records;
114        uint64_t received_packets;
115
116        int maxfd;
117} recvstream_t;
118
119typedef struct ndag_format_data {
120        char *multicastgroup;
121        char *portstr;
122        char *localiface;
123        uint16_t nextthreadid;
124        recvstream_t *receivers;
125
126        pthread_t controlthread;
127        libtrace_message_queue_t controlqueue;
128} ndag_format_data_t;
129
130enum {
131        NDAG_CLIENT_HALT = 0x01,
132        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
133        NDAG_CLIENT_NEWGROUP = 0x03
134};
135
136typedef struct ndagreadermessage {
137        uint8_t type;
138        streamsource_t contents;
139} ndag_internal_message_t;
140
141
142static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
143
144        /* Calculate seq_a - seq_b, taking wraparound into account */
145        if (seq_a == seq_b) return 0;
146
147        if (seq_a > seq_b) {
148                return (int) (seq_a - seq_b);
149        }
150
151        /* -1 for the wrap and another -1 because we don't use zero */
152        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
153}
154
155static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
156        ndag_common_t *header = (ndag_common_t *)msgbuf;
157
158        if (msgsize < sizeof(ndag_common_t)) {
159                fprintf(stderr,
160                        "nDAG message does not have a complete nDAG header.\n");
161                return 0;
162        }
163
164        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
165                fprintf(stderr,
166                        "nDAG message does not have a valid magic number.\n");
167                return 0;
168        }
169
170        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
171                fprintf(stderr,
172                        "nDAG message has an invalid header version: %u\n",
173                                header->version);
174                return 0;
175        }
176
177        return header->type;
178}
179
180static int join_multicast_group(char *groupaddr, char *localiface,
181        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
182
183        struct addrinfo hints;
184        struct addrinfo *gotten;
185        struct addrinfo *group;
186        unsigned int interface;
187        char pstr[16];
188        struct group_req greq;
189        int bufsize, val;
190
191        int sock;
192
193        if (portstr == NULL) {
194                snprintf(pstr, 15, "%u", portnum);
195                portstr = pstr;
196        }
197
198        interface = if_nametoindex(localiface);
199        if (interface == 0) {
200                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
201                                localiface, strerror(errno));
202                return -1;
203        }
204
205        hints.ai_family = PF_UNSPEC;
206        hints.ai_socktype = SOCK_DGRAM;
207        hints.ai_flags = AI_PASSIVE;
208        hints.ai_protocol = 0;
209
210        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
211                fprintf(stderr,
212                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
213                                portstr, strerror(errno));
214                return -1;
215        }
216
217        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
218                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
219                                groupaddr, strerror(errno));
220                return -1;
221        }
222
223        *srcinfo = gotten;
224        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
225        if (sock < 0) {
226                fprintf(stderr,
227                        "Failed to create multicast socket for %s:%s -- %s\n",
228                                groupaddr, portstr, strerror(errno));
229                goto sockcreateover;
230        }
231
232        val = 1;
233        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
234                fprintf(stderr,
235                        "Failed to set REUSEADDR socket option for %s:%s -- %s\n",
236                                groupaddr, portstr, strerror(errno));
237                goto sockcreateover;
238        }
239
240        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
241        {
242                fprintf(stderr,
243                        "Failed to bind to multicast socket %s:%s -- %s\n",
244                                groupaddr, portstr, strerror(errno));
245                sock = -1;
246                goto sockcreateover;
247        }
248
249        greq.gr_interface = interface;
250        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
251
252        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
253                        sizeof(greq)) < 0) {
254                fprintf(stderr,
255                        "Failed to join multicast group %s:%s -- %s\n",
256                                groupaddr, portstr, strerror(errno));
257                close(sock);
258                sock = -1;
259                goto sockcreateover;
260        }
261
262        bufsize = 16 * 1024 * 1024;
263        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
264                                (socklen_t)sizeof(int)) < 0) {
265
266                fprintf(stderr,
267                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
268                                groupaddr, portstr, strerror(errno));
269                close(sock);
270                sock = -1;
271                goto sockcreateover;
272        }
273
274sockcreateover:
275        freeaddrinfo(group);
276        return sock;
277}
278
279
280static int ndag_init_input(libtrace_t *libtrace) {
281
282        char *scan = NULL;
283        char *next = NULL;
284
285        libtrace->format_data = (ndag_format_data_t *)malloc(
286                        sizeof(ndag_format_data_t));
287
288        FORMAT_DATA->multicastgroup = NULL;
289        FORMAT_DATA->portstr = NULL;
290        FORMAT_DATA->localiface = NULL;
291        FORMAT_DATA->nextthreadid = 0;
292        FORMAT_DATA->receivers = NULL;
293
294        scan = strchr(libtrace->uridata, ',');
295        if (scan == NULL) {
296                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
297                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
298                return -1;
299        }
300        FORMAT_DATA->localiface = strndup(libtrace->uridata,
301                        (size_t)(scan - libtrace->uridata));
302        next = scan + 1;
303
304        scan = strchr(next, ',');
305        if (scan == NULL) {
306                FORMAT_DATA->portstr = strdup("9001");
307                FORMAT_DATA->multicastgroup = strdup(next);
308        } else {
309                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
310
311                FORMAT_DATA->portstr = strdup(scan + 1);
312        }
313        return 0;
314}
315
316static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
317                uint16_t portnum, uint16_t monid) {
318
319        ndag_internal_message_t alert;
320
321        alert.type = NDAG_CLIENT_NEWGROUP;
322        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
323        alert.contents.localiface = FORMAT_DATA->localiface;
324        alert.contents.port = portnum;
325        alert.contents.monitor = monid;
326
327        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
328                        (void *)&alert);
329
330}
331
332static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
333                int msgsize, uint16_t *ptmap) {
334
335        int i;
336        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
337        uint8_t msgtype;
338
339        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
340        if (msgtype == 0) {
341                return -1;
342        }
343
344        msgsize -= sizeof(ndag_common_t);
345        if (msgtype == NDAG_PKT_BEACON) {
346                /* If message is a beacon, make sure every port included in the
347                 * beacon is assigned to a receive thread.
348                 */
349                uint16_t *ptr, numstreams;
350
351                if ((uint32_t)msgsize < sizeof(uint16_t)) {
352                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
353                        return -1;
354                }
355
356                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
357                numstreams = ntohs(*ptr);
358                ptr ++;
359
360                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
361                {
362                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
363                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
364                        return -1;
365                }
366
367                for (i = 0; i < numstreams; i++) {
368                        uint16_t streamport = ntohs(*ptr);
369
370                        if (ptmap[streamport] == 0xffff) {
371                                new_group_alert(libtrace,
372                                        FORMAT_DATA->nextthreadid, streamport,
373                                        ntohs(ndaghdr->monitorid));
374
375                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
376
377                                if (libtrace->perpkt_thread_count == 0) {
378                                        FORMAT_DATA->nextthreadid = 0;
379                                } else {
380                                        FORMAT_DATA->nextthreadid =
381                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
382                                }
383                        }
384
385                        ptr ++;
386                }
387        } else {
388                fprintf(stderr,
389                        "Unexpected message type on control channel: %u\n",
390                         msgtype);
391                return -1;
392        }
393
394        return 0;
395
396}
397
398static void *ndag_controller_run(void *tdata) {
399
400        libtrace_t *libtrace = (libtrace_t *)tdata;
401        uint16_t ptmap[65536];
402        int sock = -1;
403        struct addrinfo *receiveaddr = NULL;
404        fd_set listening;
405        struct timeval timeout;
406
407        /* ptmap is a dirty hack to allow us to quickly check if we've already
408         * assigned a stream to a thread.
409         */
410        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
411
412        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
413                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
414                        &receiveaddr);
415        if (sock == -1) {
416                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
417                        "Unable to join multicast group for nDAG control channel");
418                trace_interrupt();
419                pthread_exit(NULL);
420        }
421
422        ndag_paused = 0;
423        while ((is_halted(libtrace) == -1) && !ndag_paused) {
424                int ret;
425                char buf[CTRL_BUF_SIZE];
426
427                FD_ZERO(&listening);
428                FD_SET(sock, &listening);
429
430                timeout.tv_sec = 0;
431                timeout.tv_usec = 500000;
432
433                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
434                if (ret < 0) {
435                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
436                        break;
437                }
438
439                if (!FD_ISSET(sock, &listening)) {
440                        continue;
441                }
442
443                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
444                                receiveaddr->ai_addr,
445                                &(receiveaddr->ai_addrlen));
446                if (ret < 0) {
447                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
448                        break;
449                }
450
451                if (ret == 0) {
452                        break;
453                }
454
455                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
456                        fprintf(stderr, "Error while parsing nDAG control message.\n");
457                        continue;
458                }
459        }
460
461        if (sock >= 0) {
462                close(sock);
463        }
464
465        /* Control channel has fallen over, should probably encourage libtrace
466         * to halt the receiver threads as well.
467         */
468        if (!is_halted(libtrace)) {
469                trace_interrupt();
470        }
471
472        pthread_exit(NULL);
473}
474
475static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
476{
477        int ret;
478        uint32_t i;
479        /* Configure the set of receiver threads */
480
481        if (FORMAT_DATA->receivers == NULL) {
482                /* What if the number of threads changes between a pause and
483                 * a restart? Can this happen? */
484                FORMAT_DATA->receivers = (recvstream_t *)
485                                malloc(sizeof(recvstream_t) * maxthreads);
486        }
487
488        for (i = 0; i < maxthreads; i++) {
489                FORMAT_DATA->receivers[i].sources = NULL;
490                FORMAT_DATA->receivers[i].sourcecount = 0;
491                FORMAT_DATA->receivers[i].knownmonitors = NULL;
492                FORMAT_DATA->receivers[i].monitorcount = 0;
493                FORMAT_DATA->receivers[i].threadindex = i;
494                FORMAT_DATA->receivers[i].dropped_upstream = 0;
495                FORMAT_DATA->receivers[i].received_packets = 0;
496                FORMAT_DATA->receivers[i].missing_records = 0;
497                FORMAT_DATA->receivers[i].maxfd = -1;
498
499                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
500                                sizeof(ndag_internal_message_t));
501        }
502
503        /* Start the controller thread */
504        /* TODO consider affinity of this thread? */
505
506        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
507                        ndag_controller_run, libtrace);
508        if (ret != 0) {
509                return -1;
510        }
511        return maxthreads;
512}
513
514static int ndag_start_input(libtrace_t *libtrace) {
515        return ndag_start_threads(libtrace, 1);
516}
517
518static int ndag_pstart_input(libtrace_t *libtrace) {
519        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
520                        libtrace->perpkt_thread_count)
521                return 0;
522        return -1;
523}
524
525static void halt_ndag_receiver(recvstream_t *receiver) {
526        int j, i;
527        libtrace_message_queue_destroy(&(receiver->mqueue));
528
529        if (receiver->sources == NULL)
530                return;
531        for (i = 0; i < receiver->sourcecount; i++) {
532                streamsock_t src = receiver->sources[i];
533                if (src.saved) {
534                        for (j = 0; j < ENCAP_BUFFERS; j++) {
535                                if (src.saved[j]) {
536                                        free(src.saved[j]);
537                                }
538                        }
539                        free(src.saved);
540                }
541
542#if HAVE_DECL_RECVMMSG
543                for (j = 0; j < RECV_BATCH_SIZE; j++) {
544                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
545                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
546                        }
547                }
548#else
549                free(src.singlemsg.msg_iov);
550#endif
551
552                if (src.sock != -1) {
553                        close(src.sock);
554                }
555        }
556        if (receiver->knownmonitors) {
557                free(receiver->knownmonitors);
558        }
559
560        if (receiver->sources) {
561                free(receiver->sources);
562        }
563}
564
565static int ndag_pause_input(libtrace_t *libtrace) {
566        int i;
567
568        /* Close the existing receiver sockets */
569        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
570               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
571        }
572        return 0;
573}
574
575static int ndag_fin_input(libtrace_t *libtrace) {
576
577        if (FORMAT_DATA->receivers) {
578                free(FORMAT_DATA->receivers);
579        }
580        if (FORMAT_DATA->multicastgroup) {
581                free(FORMAT_DATA->multicastgroup);
582        }
583        if (FORMAT_DATA->portstr) {
584                free(FORMAT_DATA->portstr);
585        }
586        if (FORMAT_DATA->localiface) {
587                free(FORMAT_DATA->localiface);
588        }
589
590        free(libtrace->format_data);
591        return 0;
592}
593
594static int ndag_prepare_packet_stream(libtrace_t *libtrace,
595                recvstream_t *rt,
596                streamsock_t *ssock, libtrace_packet_t *packet,
597                uint32_t flags) {
598
599        dag_record_t *erfptr;
600        ndag_encap_t *encaphdr;
601        uint16_t ndag_reccount = 0;
602        int nr;
603        uint16_t rlen;
604
605        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
606                packet->buf_control = TRACE_CTRL_PACKET;
607        } else {
608                packet->buf_control = TRACE_CTRL_EXTERNAL;
609        }
610
611        packet->trace = libtrace;
612        packet->buffer = ssock->nextread;
613        packet->header = ssock->nextread;
614        packet->type = TRACE_RT_DATA_ERF;
615
616        erfptr = (dag_record_t *)packet->header;
617
618        if (erfptr->flags.rxerror == 1) {
619                packet->payload = NULL;
620                erfptr->rlen = htons(erf_get_framing_length(packet));
621        } else {
622                packet->payload = (char *)packet->buffer +
623                                erf_get_framing_length(packet);
624        }
625
626        /* Update upstream drops using lctr */
627
628        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
629                /* TODO */
630        } else {
631                if (rt->received_packets > 0) {
632                        rt->dropped_upstream += ntohs(erfptr->lctr);
633                }
634        }
635
636        rt->received_packets ++;
637        ssock->recordcount += 1;
638
639        nr = ssock->nextreadind;
640        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
641                        sizeof(ndag_common_t));
642
643        ndag_reccount = ntohs(encaphdr->recordcount);
644        if ((ndag_reccount & 0x8000) != 0) {
645                /* Record was truncated -- update rlen appropriately */
646                rlen = ssock->savedsize[nr] -
647                                (ssock->nextread - ssock->saved[nr]);
648                erfptr->rlen = htons(rlen);
649        } else {
650                rlen = ntohs(erfptr->rlen);
651        }
652        ssock->nextread += rlen;
653        ssock->nextts = 0;
654
655        assert(ssock->nextread - ssock->saved[nr] <= ssock->savedsize[nr]);
656
657        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
658                /* Read everything from this buffer, mark as empty and
659                 * move on. */
660                ssock->savedsize[nr] = 0;
661                ssock->bufwaiting ++;
662
663                nr ++;
664                if (nr == ENCAP_BUFFERS) {
665                        nr = 0;
666                }
667                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
668                                sizeof(ndag_encap_t);
669                ssock->nextreadind = nr;
670        }
671
672        packet->order = erf_get_erf_timestamp(packet);
673        packet->error = rlen;
674        return rlen;
675}
676
677static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
678                libtrace_packet_t *packet UNUSED,
679                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
680                uint32_t flags UNUSED) {
681
682        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
683        return 0;
684
685}
686
687static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
688
689        ndag_monitor_t *mon;
690
691        if (rt->monitorcount == 0) {
692                rt->knownmonitors = (ndag_monitor_t *)
693                                malloc(sizeof(ndag_monitor_t) * 5);
694        } else {
695                rt->knownmonitors = (ndag_monitor_t *)
696                            realloc(rt->knownmonitors,
697                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
698        }
699
700        mon = &(rt->knownmonitors[rt->monitorcount]);
701        mon->monitorid = monid;
702        mon->laststart = 0;
703
704        rt->monitorcount ++;
705        return mon;
706}
707
708static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
709
710        streamsock_t *ssock = NULL;
711        ndag_monitor_t *mon = NULL;
712        int i;
713
714        /* TODO consider replacing this with a list or vector so we can
715         * easily remove sources that are no longer in use, rather than
716         * just setting the sock to -1 and having to check them every
717         * time we want to read a packet.
718         */
719        if (rt->sourcecount == 0) {
720                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
721        } else if ((rt->sourcecount % 10) == 0) {
722                rt->sources = (streamsock_t *)realloc(rt->sources,
723                        sizeof(streamsock_t) * (rt->sourcecount + 10));
724        }
725
726        ssock = &(rt->sources[rt->sourcecount]);
727
728        for (i = 0; i < rt->monitorcount; i++) {
729                if (rt->knownmonitors[i].monitorid == src.monitor) {
730                        mon = &(rt->knownmonitors[i]);
731                        break;
732                }
733        }
734
735        if (mon == NULL) {
736                mon = add_new_knownmonitor(rt, src.monitor);
737        }
738
739        ssock->port = src.port;
740        ssock->groupaddr = src.groupaddr;
741        ssock->expectedseq = 0;
742        ssock->monitorptr = mon;
743        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
744        ssock->bufavail = ENCAP_BUFFERS;
745        ssock->bufwaiting = 0;
746        ssock->startidle = 0;
747        ssock->nextts = 0;
748
749        for (i = 0; i < ENCAP_BUFFERS; i++) {
750                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
751                ssock->savedsize[i] = 0;
752        }
753
754#if HAVE_DECL_RECVMMSG
755        for (i = 0; i < RECV_BATCH_SIZE; i++) {
756                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
757                                malloc(sizeof(struct iovec));
758                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
759                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
760                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
761                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
762                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
763                ssock->mmsgbufs[i].msg_len = 0;
764        }
765#else
766        ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec));
767#endif
768
769        ssock->nextread = NULL;;
770        ssock->nextreadind = 0;
771        ssock->nextwriteind = 0;
772        ssock->recordcount = 0;
773        rt->sourcecount += 1;
774
775        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
776                        NULL, src.port, &(ssock->srcaddr));
777
778        if (ssock->sock < 0) {
779                return -1;
780        }
781
782        if (ssock->sock > rt->maxfd) {
783                rt->maxfd = ssock->sock;
784        }
785
786        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
787                        ssock->groupaddr, ssock->port, rt->threadindex);
788
789        return ssock->port;
790}
791
792static int receiver_read_messages(recvstream_t *rt) {
793
794        ndag_internal_message_t msg;
795
796        while (libtrace_message_queue_try_get(&(rt->mqueue),
797                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
798                switch(msg.type) {
799                        case NDAG_CLIENT_NEWGROUP:
800                                if (add_new_streamsock(rt, msg.contents) < 0) {
801                                        return -1;
802                                }
803                                break;
804                        case NDAG_CLIENT_HALT:
805                                return 0;
806                }
807        }
808        return 1;
809
810}
811
812static inline int readable_data(streamsock_t *ssock) {
813
814        if (ssock->sock == -1) {
815                return 0;
816        }
817        if (ssock->savedsize[ssock->nextreadind] == 0) {
818                return 0;
819        }
820        /*
821        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
822                        ssock->savedsize[ssock->nextreadind]) {
823                return 0;
824        }
825        */
826        return 1;
827
828
829}
830
831static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
832
833        int i;
834        for (i = 0; i < rt->sourcecount; i++) {
835                if (rt->sources[i].monitorptr == mon) {
836                        rt->sources[i].expectedseq = 0;
837                }
838        }
839
840}
841
842static int init_receivers(streamsock_t *ssock, int required) {
843
844        int wind = ssock->nextwriteind;
845        int i = 1;
846
847#if HAVE_DECL_RECVMMSG
848        for (i = 0; i < required; i++) {
849                if (i >= RECV_BATCH_SIZE) {
850                        break;
851                }
852
853                if (wind >= ENCAP_BUFFERS) {
854                        wind = 0;
855                }
856
857                ssock->mmsgbufs[i].msg_len = 0;
858                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
859                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
860                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
861
862                wind ++;
863        }
864#else
865        assert(required > 0);
866        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
867        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
868        ssock->singlemsg.msg_iovlen = 1;
869#endif
870        return i;
871}
872
873static int check_ndag_received(streamsock_t *ssock, int index,
874                unsigned int msglen, recvstream_t *rt) {
875
876        ndag_encap_t *encaphdr;
877        ndag_monitor_t *mon;
878        uint8_t rectype;
879
880        /* Check that we have a valid nDAG encap record */
881        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
882
883        if (rectype == NDAG_PKT_KEEPALIVE) {
884                /* Keep-alive, reset startidle and carry on. Don't
885                 * change nextwrite -- we want to overwrite the
886                 * keep-alive with usable content. */
887                return 0;
888        } else if (rectype != NDAG_PKT_ENCAPERF) {
889                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
890                                ssock->groupaddr, ssock->port);
891                close(ssock->sock);
892                ssock->sock = -1;
893                return -1;
894        }
895
896        ssock->savedsize[index] = msglen;
897        ssock->nextwriteind ++;
898        ssock->bufavail --;
899
900        assert(ssock->bufavail >= 0);
901        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
902                ssock->nextwriteind = 0;
903        }
904
905        /* Get the useful info from the encap header */
906        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
907
908        mon = ssock->monitorptr;
909
910        if (mon->laststart == 0) {
911                mon->laststart = bswap_be_to_host64(encaphdr->started);
912        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
913                mon->laststart = bswap_be_to_host64(encaphdr->started);
914                reset_expected_seqs(rt, mon);
915
916                /* TODO what is a good way to indicate this to clients?
917                 * set the loss counter in the ERF header? a bit rude?
918                 * use another bit in the ERF header?
919                 * add a queryable flag to libtrace_packet_t?
920                 */
921
922        }
923
924        if (ssock->expectedseq != 0) {
925                rt->missing_records += seq_cmp(
926                                ntohl(encaphdr->seqno), ssock->expectedseq);
927
928        }
929        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
930        if (ssock->expectedseq == 0) {
931                ssock->expectedseq ++;
932        }
933
934        if (ssock->nextread == NULL) {
935                /* If this is our first read, set up 'nextread'
936                 * by skipping past the nDAG headers */
937                ssock->nextread = ssock->saved[0] +
938                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
939        }
940        return 1;
941
942}
943
944static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
945                int *gottime, recvstream_t *rt) {
946
947        int ret, ndagstat, avail;
948        int toret = 0;
949
950#if HAVE_DECL_RECVMMSG
951        int i;
952#endif
953
954        avail = init_receivers(ssock, ssock->bufavail);
955
956#if HAVE_DECL_RECVMMSG
957        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
958                        MSG_DONTWAIT, NULL);
959#else
960        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
961#endif
962        if (ret < 0) {
963                /* Nothing to receive right now, but we should still
964                 * count as 'ready' if at least one buffer is full */
965                if (errno == EAGAIN || errno == EWOULDBLOCK) {
966                        if (readable_data(ssock)) {
967                                toret = 1;
968                        }
969                        if (!(*gottime)) {
970                                gettimeofday(tv, NULL);
971                                *gottime = 1;
972                        }
973                        if (ssock->startidle == 0) {
974                                ssock->startidle = tv->tv_sec;
975                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
976                                fprintf(stderr,
977                                        "Closing channel %s:%u due to inactivity.\n",
978                                        ssock->groupaddr,
979                                        ssock->port);
980
981                                close(ssock->sock);
982                                ssock->sock = -1;
983                        }
984                } else {
985
986                        fprintf(stderr,
987                                "Error receiving encapsulated records from %s:%u -- %s \n",
988                                ssock->groupaddr, ssock->port,
989                                strerror(errno));
990                        close(ssock->sock);
991                        ssock->sock = -1;
992                }
993                return toret;
994        }
995
996        ssock->startidle = 0;
997
998#if HAVE_DECL_RECVMMSG
999        for (i = 0; i < ret; i++) {
1000                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
1001                                ssock->mmsgbufs[i].msg_len, rt);
1002                if (ndagstat == -1) {
1003                        break;
1004                }
1005
1006                if (ndagstat == 1) {
1007                        toret = 1;
1008                }
1009        }
1010#else
1011        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1012        if (ndagstat <= 0) {
1013                toret = 0;
1014        } else {
1015                toret = 1;
1016        }
1017#endif
1018
1019        return toret;
1020}
1021
1022static int receive_from_sockets(recvstream_t *rt) {
1023
1024        int i, readybufs, gottime;
1025        struct timeval tv;
1026        fd_set fds;
1027        int maxfd = 0;
1028        struct timeval zerotv;
1029
1030        readybufs = 0;
1031        gottime = 0;
1032
1033        FD_ZERO(&fds);
1034
1035        if (rt->maxfd == -1) {
1036                return 0;
1037        }
1038
1039        zerotv.tv_sec = 0;
1040        zerotv.tv_usec = 0;
1041
1042        for (i = 0; i < rt->sourcecount; i++) {
1043                if (rt->sources[i].sock == -1) {
1044                        continue;
1045                }
1046
1047#if HAVE_DECL_RECVMMSG
1048                /* Plenty of full buffers, just use the packets in those */
1049                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
1050                        readybufs ++;
1051                        continue;
1052                }
1053#else
1054                if (rt->sources[i].bufavail == 0) {
1055                        readybufs ++;
1056                        continue;
1057                }
1058#endif
1059                FD_SET(rt->sources[i].sock, &fds);
1060                if (maxfd < rt->sources[i].sock) {
1061                        maxfd = rt->sources[i].sock;
1062                }
1063        }
1064
1065
1066        if (maxfd <= 0) {
1067                return readybufs;
1068        }
1069
1070        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1071                /* log the error? XXX */
1072                return -1;
1073        }
1074
1075        for (i = 0; i < rt->sourcecount; i++) {
1076                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1077                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
1078                                readybufs ++;
1079                        }
1080                        continue;
1081                }
1082                readybufs += receive_from_single_socket(&(rt->sources[i]),
1083                                &tv, &gottime, rt);
1084        }
1085
1086        return readybufs;
1087
1088}
1089
1090
1091static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1092                libtrace_packet_t *packet) {
1093
1094        int iserr = 0;
1095
1096        if (packet->buf_control == TRACE_CTRL_PACKET) {
1097                free(packet->buffer);
1098                packet->buffer = NULL;
1099        }
1100
1101        do {
1102                /* Make sure we shouldn't be halting */
1103                if ((iserr = is_halted(libtrace)) != -1) {
1104                        return iserr;
1105                }
1106
1107                /* Check for any messages from the control thread */
1108                iserr = receiver_read_messages(rt);
1109
1110                if (iserr <= 0) {
1111                        return iserr;
1112                }
1113
1114                /* If blocking and no sources, sleep for a bit and then try
1115                 * checking for messages again.
1116                 */
1117                if (rt->sourcecount == 0) {
1118                        usleep(10000);
1119                        continue;
1120                }
1121
1122                if ((iserr = receive_from_sockets(rt)) < 0) {
1123                        return iserr;
1124                } else if (iserr > 0) {
1125                        /* At least one of our input sockets has available
1126                         * data, let's go ahead and use what we have. */
1127                        break;
1128                }
1129
1130                /* None of our sources have anything available, we can take
1131                 * a short break rather than immediately trying again.
1132                 */
1133                if (iserr == 0) {
1134                        usleep(100);
1135                }
1136
1137        } while (1);
1138
1139        return iserr;
1140}
1141
1142static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1143                libtrace_packet_t *packet) {
1144
1145        int iserr = 0;
1146
1147        if (packet->buf_control == TRACE_CTRL_PACKET) {
1148                free(packet->buffer);
1149                packet->buffer = NULL;
1150        }
1151
1152        /* Make sure we shouldn't be halting */
1153        if ((iserr = is_halted(libtrace)) != -1) {
1154                return iserr;
1155        }
1156
1157        /* If non-blocking and there are no sources, just break */
1158        if (rt->sourcecount == 0) {
1159                return 0;
1160        }
1161
1162        return receive_from_sockets(rt);
1163}
1164
1165static streamsock_t *select_next_packet(recvstream_t *rt) {
1166        int i;
1167        streamsock_t *ssock = NULL;
1168        uint64_t earliest = 0;
1169        uint64_t currentts = 0;
1170        dag_record_t *daghdr;
1171
1172        /* If we only have one source, then no need to do any
1173         * timestamp parsing or byteswapping.
1174         */
1175        if (rt->sourcecount == 1) {
1176                if (readable_data(&(rt->sources[0]))) {
1177                        return &(rt->sources[0]);
1178                }
1179                return NULL;
1180        }
1181
1182
1183        for (i = 0; i < rt->sourcecount; i ++) {
1184                if (!readable_data(&(rt->sources[i]))) {
1185                        continue;
1186                }
1187
1188                if (rt->sources[i].nextts == 0) {
1189                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1190                        currentts = bswap_le_to_host64(daghdr->ts);
1191                        rt->sources[i].nextts = currentts;
1192                } else {
1193                        currentts = rt->sources[i].nextts;
1194                }
1195
1196                if (earliest == 0 || earliest > currentts) {
1197                        earliest = currentts;
1198                        ssock = &(rt->sources[i]);
1199                }
1200        }
1201        return ssock;
1202}
1203
1204static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1205
1206        int rem, ret;
1207        streamsock_t *nextavail = NULL;
1208        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1209                        packet);
1210
1211        if (rem <= 0) {
1212                return rem;
1213        }
1214
1215        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1216        if (nextavail == NULL) {
1217                return 0;
1218        }
1219
1220        /* nextread should point at an ERF header, so prepare 'packet' to be
1221         * a libtrace ERF packet. */
1222
1223        ret = ndag_prepare_packet_stream(libtrace,
1224                        &(FORMAT_DATA->receivers[0]), nextavail,
1225                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1226        nextavail->bufavail += nextavail->bufwaiting;
1227        nextavail->bufwaiting = 0;
1228        return ret;
1229}
1230
1231static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1232                libtrace_packet_t **packets, size_t nb_packets) {
1233
1234        recvstream_t *rt;
1235        int rem, i;
1236        size_t read_packets = 0;
1237        streamsock_t *nextavail = NULL;
1238
1239        rt = (recvstream_t *)t->format_data;
1240
1241
1242        do {
1243                /* Only check for messages once per batch */
1244                if (read_packets == 0) {
1245                        rem = receive_encap_records_block(libtrace, rt,
1246                                packets[read_packets]);
1247                } else {
1248                        rem = receive_encap_records_nonblock(libtrace, rt,
1249                                packets[read_packets]);
1250                }
1251
1252                if (rem < 0) {
1253                        return rem;
1254                }
1255
1256                if (rem == 0) {
1257                        break;
1258                }
1259
1260                nextavail = select_next_packet(rt);
1261                if (nextavail == NULL) {
1262                        break;
1263                }
1264
1265                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1266                                packets[read_packets],
1267                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1268
1269                read_packets  ++;
1270                if (read_packets >= nb_packets) {
1271                        break;
1272                }
1273        } while (1);
1274
1275        for (i = 0; i < rt->sourcecount; i++) {
1276                streamsock_t *src = &(rt->sources[i]);
1277                src->bufavail += src->bufwaiting;
1278                src->bufwaiting = 0;
1279                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1280        }
1281
1282        return read_packets;
1283
1284}
1285
1286static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1287                libtrace_packet_t *packet) {
1288
1289
1290        libtrace_eventobj_t event = {0,0,0.0,0};
1291        int rem, i;
1292        streamsock_t *nextavail = NULL;
1293
1294        /* Only check for messages once per call */
1295        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1296        if (rem <= 0) {
1297                event.type = TRACE_EVENT_TERMINATE;
1298                return event;
1299        }
1300
1301        do {
1302                rem = receive_encap_records_nonblock(libtrace,
1303                                &(FORMAT_DATA->receivers[0]), packet);
1304
1305                if (rem < 0) {
1306                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1307                                "Received invalid nDAG records.");
1308                        event.type = TRACE_EVENT_TERMINATE;
1309                        break;
1310                }
1311
1312                if (rem == 0) {
1313                        /* Either we've been halted or we've got no packets
1314                         * right now. */
1315                        if (is_halted(libtrace) == 0) {
1316                                event.type = TRACE_EVENT_TERMINATE;
1317                                break;
1318                        }
1319                        event.type = TRACE_EVENT_SLEEP;
1320                        event.seconds = 0.0001;
1321                        break;
1322                }
1323
1324                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1325                if (nextavail == NULL) {
1326                        event.type = TRACE_EVENT_SLEEP;
1327                        event.seconds = 0.0001;
1328                        break;
1329                }
1330
1331                event.type = TRACE_EVENT_PACKET;
1332                ndag_prepare_packet_stream(libtrace,
1333                                &(FORMAT_DATA->receivers[0]), nextavail,
1334                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1335                event.size = trace_get_capture_length(packet) +
1336                                trace_get_framing_length(packet);
1337
1338                if (libtrace->filter) {
1339                        int filtret = trace_apply_filter(libtrace->filter,
1340                                        packet);
1341                        if (filtret == -1) {
1342                                trace_set_err(libtrace,
1343                                                TRACE_ERR_BAD_FILTER,
1344                                                "Bad BPF Filter");
1345                                event.type = TRACE_EVENT_TERMINATE;
1346                                break;
1347                        }
1348
1349                        if (filtret == 0) {
1350                                /* Didn't match filter, try next one */
1351                                libtrace->filtered_packets ++;
1352                                trace_clear_cache(packet);
1353                                continue;
1354                        }
1355                }
1356
1357                if (libtrace->snaplen > 0) {
1358                        trace_set_capture_length(packet, libtrace->snaplen);
1359                }
1360                libtrace->accepted_packets ++;
1361                break;
1362        } while (1);
1363
1364        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1365                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1366                src->bufavail += src->bufwaiting;
1367                src->bufwaiting = 0;
1368                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1369        }
1370
1371        return event;
1372}
1373
1374static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1375
1376        int i;
1377
1378        stat->dropped_valid = 1;
1379        stat->dropped = 0;
1380        stat->received_valid = 1;
1381        stat->received = 0;
1382        stat->missing_valid = 1;
1383        stat->missing = 0;
1384
1385        /* TODO Is this thread safe? */
1386        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1387                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1388                stat->received += FORMAT_DATA->receivers[i].received_packets;
1389                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1390        }
1391
1392}
1393
1394static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1395                libtrace_stat_t *stat) {
1396
1397        recvstream_t *recvr = (recvstream_t *)t->format_data;
1398
1399        if (libtrace == NULL)
1400                return;
1401        /* TODO Is this thread safe */
1402        stat->dropped_valid = 1;
1403        stat->dropped = recvr->dropped_upstream;
1404
1405        stat->received_valid = 1;
1406        stat->received = recvr->received_packets;
1407
1408        stat->missing_valid = 1;
1409        stat->missing = recvr->missing_records;
1410
1411}
1412
1413static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1414                bool reader) {
1415        recvstream_t *recvr;
1416
1417        if (!reader || t->type != THREAD_PERPKT) {
1418                return 0;
1419        }
1420
1421        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1422        t->format_data = recvr;
1423
1424        return 0;
1425}
1426
1427static struct libtrace_format_t ndag = {
1428
1429        "ndag",
1430        "",
1431        TRACE_FORMAT_NDAG,
1432        NULL,                   /* probe filename */
1433        NULL,                   /* probe magic */
1434        ndag_init_input,        /* init_input */
1435        NULL,                   /* config_input */
1436        ndag_start_input,       /* start_input */
1437        ndag_pause_input,       /* pause_input */
1438        NULL,                   /* init_output */
1439        NULL,                   /* config_output */
1440        NULL,                   /* start_output */
1441        ndag_fin_input,         /* fin_input */
1442        NULL,                   /* fin_output */
1443        ndag_read_packet,       /* read_packet */
1444        ndag_prepare_packet,    /* prepare_packet */
1445        NULL,                   /* fin_packet */
1446        NULL,                   /* write_packet */
1447        NULL,                   /* flush_output */
1448        erf_get_link_type,      /* get_link_type */
1449        erf_get_direction,      /* get_direction */
1450        erf_set_direction,      /* set_direction */
1451        erf_get_erf_timestamp,  /* get_erf_timestamp */
1452        NULL,                   /* get_timeval */
1453        NULL,                   /* get_seconds */
1454        NULL,                   /* get_timespec */
1455        NULL,                   /* seek_erf */
1456        NULL,                   /* seek_timeval */
1457        NULL,                   /* seek_seconds */
1458        erf_get_capture_length, /* get_capture_length */
1459        erf_get_wire_length,    /* get_wire_length */
1460        erf_get_framing_length, /* get_framing_length */
1461        erf_set_capture_length, /* set_capture_length */
1462        NULL,                   /* get_received_packets */
1463        NULL,                   /* get_filtered_packets */
1464        NULL,                   /* get_dropped_packets */
1465        ndag_get_statistics,    /* get_statistics */
1466        NULL,                   /* get_fd */
1467        trace_event_ndag,       /* trace_event */
1468        NULL,                   /* help */
1469        NULL,                   /* next pointer */
1470        {true, 0},              /* live packet capture */
1471        ndag_pstart_input,      /* parallel start */
1472        ndag_pread_packets,     /* parallel read */
1473        ndag_pause_input,       /* parallel pause */
1474        NULL,
1475        ndag_pregister_thread,  /* register thread */
1476        NULL,
1477        ndag_get_thread_stats   /* per-thread stats */
1478};
1479
1480void ndag_constructor(void) {
1481        register_format(&ndag);
1482}
Note: See TracBrowser for help on using the repository browser.