source: lib/format_ndag.c @ 32ee9b2

cachetimestampsdeveloprc-4.0.4ringdecrementfixringperformance
Last change on this file since 32ee9b2 was 32ee9b2, checked in by Shane Alcock <salcock@…>, 2 years ago

Add new trace_flush_output() to public API

Can be used to force a libtrace output to dump any buffered output
to disk immediately.

Note that if the file is compressed or the output trace format
requires a trailer, the flushed file will still not be properly
readable afterwards as this will not result in any trailers
being written. You'll still have to close the file for that.

Mainly this is useful for ensuring that output file sizes grow
over time in situations where the amount of output is relatively
small, rather than staying stuck at 0 bytes until we either reach
1MB of output or the file is closed. For instance, you could have
a timer that calls trace_flush_output() every 30 seconds so that
the output file size will grow if any packets were written in the
last 30 seconds.

  • Property mode set to 100644
File size: 45.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <assert.h>
38#include <errno.h>
39#include <fcntl.h>
40#include <stdio.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <net/if.h>
45#include <sys/types.h>
46#include <sys/socket.h>
47#include <netdb.h>
48
49#include "format_ndag.h"
50
51#define NDAG_IDLE_TIMEOUT (600)
52#define ENCAP_BUFSIZE (10000)
53#define CTRL_BUF_SIZE (10000)
54#define ENCAP_BUFFERS (1000)
55
56#define RECV_BATCH_SIZE (50)
57
58#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
59
60static struct libtrace_format_t ndag;
61
62volatile int ndag_paused = 0;
63
64typedef struct monitor {
65        uint16_t monitorid;
66        uint64_t laststart;
67} ndag_monitor_t;
68
69
70typedef struct streamsource {
71        uint16_t monitor;
72        char *groupaddr;
73        char *localiface;
74        uint16_t port;
75} streamsource_t;
76
77typedef struct streamsock {
78        char *groupaddr;
79        int sock;
80        struct addrinfo *srcaddr;
81        uint16_t port;
82        uint32_t expectedseq;
83        ndag_monitor_t *monitorptr;
84        char **saved;
85        char *nextread;
86        int nextreadind;
87        int nextwriteind;
88        int savedsize[ENCAP_BUFFERS];
89        uint64_t nextts;
90        uint32_t startidle;
91        uint64_t recordcount;
92
93        int bufavail;
94        int bufwaiting;
95
96#if HAVE_RECVMMSG
97        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
98#else
99        struct msghdr singlemsg;
100#endif
101
102} streamsock_t;
103
104typedef struct recvstream {
105        streamsock_t *sources;
106        uint16_t sourcecount;
107        libtrace_message_queue_t mqueue;
108        int threadindex;
109        ndag_monitor_t *knownmonitors;
110        uint16_t monitorcount;
111
112        uint64_t dropped_upstream;
113        uint64_t missing_records;
114        uint64_t received_packets;
115
116        int maxfd;
117} recvstream_t;
118
119typedef struct ndag_format_data {
120        char *multicastgroup;
121        char *portstr;
122        char *localiface;
123        uint16_t nextthreadid;
124        recvstream_t *receivers;
125
126        pthread_t controlthread;
127        libtrace_message_queue_t controlqueue;
128} ndag_format_data_t;
129
130enum {
131        NDAG_CLIENT_HALT = 0x01,
132        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
133        NDAG_CLIENT_NEWGROUP = 0x03
134};
135
136typedef struct ndagreadermessage {
137        uint8_t type;
138        streamsource_t contents;
139} ndag_internal_message_t;
140
141
142static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
143
144        /* Calculate seq_a - seq_b, taking wraparound into account */
145        if (seq_a == seq_b) return 0;
146
147        if (seq_a > seq_b) {
148                return (int) (seq_a - seq_b);
149        }
150
151        /* -1 for the wrap and another -1 because we don't use zero */
152        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
153}
154
155static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
156        ndag_common_t *header = (ndag_common_t *)msgbuf;
157
158        if (msgsize < sizeof(ndag_common_t)) {
159                fprintf(stderr,
160                        "nDAG message does not have a complete nDAG header.\n");
161                return 0;
162        }
163
164        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
165                fprintf(stderr,
166                        "nDAG message does not have a valid magic number.\n");
167                return 0;
168        }
169
170        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
171                fprintf(stderr,
172                        "nDAG message has an invalid header version: %u\n",
173                                header->version);
174                return 0;
175        }
176
177        return header->type;
178}
179
180static int join_multicast_group(char *groupaddr, char *localiface,
181        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
182
183        struct addrinfo hints;
184        struct addrinfo *gotten;
185        struct addrinfo *group;
186        unsigned int interface;
187        char pstr[16];
188        struct group_req greq;
189        int bufsize;
190
191        int sock;
192
193        if (portstr == NULL) {
194                snprintf(pstr, 15, "%u", portnum);
195                portstr = pstr;
196        }
197
198        interface = if_nametoindex(localiface);
199        if (interface == 0) {
200                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
201                                localiface, strerror(errno));
202                return -1;
203        }
204
205        hints.ai_family = PF_UNSPEC;
206        hints.ai_socktype = SOCK_DGRAM;
207        hints.ai_flags = AI_PASSIVE;
208        hints.ai_protocol = 0;
209
210        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
211                fprintf(stderr,
212                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
213                                portstr, strerror(errno));
214                return -1;
215        }
216
217        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
218                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
219                                groupaddr, strerror(errno));
220                return -1;
221        }
222
223        *srcinfo = gotten;
224        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
225        if (sock < 0) {
226                fprintf(stderr,
227                        "Failed to create multicast socket for %s:%s -- %s\n",
228                                groupaddr, portstr, strerror(errno));
229                goto sockcreateover;
230        }
231
232        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
233        {
234                fprintf(stderr,
235                        "Failed to bind to multicast socket %s:%s -- %s\n",
236                                groupaddr, portstr, strerror(errno));
237                sock = -1;
238                goto sockcreateover;
239        }
240
241        greq.gr_interface = interface;
242        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
243
244        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
245                        sizeof(greq)) < 0) {
246                fprintf(stderr,
247                        "Failed to join multicast group %s:%s -- %s\n",
248                                groupaddr, portstr, strerror(errno));
249                close(sock);
250                sock = -1;
251                goto sockcreateover;
252        }
253
254        bufsize = 16 * 1024 * 1024;
255        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
256                                (socklen_t)sizeof(int)) < 0) {
257
258                fprintf(stderr,
259                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
260                                groupaddr, portstr, strerror(errno));
261                close(sock);
262                sock = -1;
263                goto sockcreateover;
264        }
265
266sockcreateover:
267        freeaddrinfo(group);
268        return sock;
269}
270
271
272static int ndag_init_input(libtrace_t *libtrace) {
273
274        char *scan = NULL;
275        char *next = NULL;
276
277        libtrace->format_data = (ndag_format_data_t *)malloc(
278                        sizeof(ndag_format_data_t));
279
280        FORMAT_DATA->multicastgroup = NULL;
281        FORMAT_DATA->portstr = NULL;
282        FORMAT_DATA->localiface = NULL;
283        FORMAT_DATA->nextthreadid = 0;
284        FORMAT_DATA->receivers = NULL;
285
286        scan = strchr(libtrace->uridata, ',');
287        if (scan == NULL) {
288                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
289                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
290                return -1;
291        }
292        FORMAT_DATA->localiface = strndup(libtrace->uridata,
293                        (size_t)(scan - libtrace->uridata));
294        next = scan + 1;
295
296        scan = strchr(next, ',');
297        if (scan == NULL) {
298                FORMAT_DATA->portstr = strdup("9001");
299                FORMAT_DATA->multicastgroup = strdup(next);
300        } else {
301                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
302
303                FORMAT_DATA->portstr = strdup(scan + 1);
304        }
305        return 0;
306}
307
308static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
309                uint16_t portnum, uint16_t monid) {
310
311        ndag_internal_message_t alert;
312
313        alert.type = NDAG_CLIENT_NEWGROUP;
314        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
315        alert.contents.localiface = FORMAT_DATA->localiface;
316        alert.contents.port = portnum;
317        alert.contents.monitor = monid;
318
319        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
320                        (void *)&alert);
321
322}
323
324static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
325                int msgsize, uint16_t *ptmap) {
326
327        int i;
328        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
329        uint8_t msgtype;
330
331        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
332        if (msgtype == 0) {
333                return -1;
334        }
335
336        msgsize -= sizeof(ndag_common_t);
337        if (msgtype == NDAG_PKT_BEACON) {
338                /* If message is a beacon, make sure every port included in the
339                 * beacon is assigned to a receive thread.
340                 */
341                uint16_t *ptr, numstreams;
342
343                if ((uint32_t)msgsize < sizeof(uint16_t)) {
344                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
345                        return -1;
346                }
347
348                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
349                numstreams = ntohs(*ptr);
350                ptr ++;
351
352                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
353                {
354                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
355                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
356                        return -1;
357                }
358
359                for (i = 0; i < numstreams; i++) {
360                        uint16_t streamport = ntohs(*ptr);
361
362                        if (ptmap[streamport] == 0xffff) {
363                                new_group_alert(libtrace,
364                                        FORMAT_DATA->nextthreadid, streamport,
365                                        ntohs(ndaghdr->monitorid));
366
367                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
368
369                                if (libtrace->perpkt_thread_count == 0) {
370                                        FORMAT_DATA->nextthreadid = 0;
371                                } else {
372                                        FORMAT_DATA->nextthreadid =
373                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
374                                }
375                        }
376
377                        ptr ++;
378                }
379        } else {
380                fprintf(stderr,
381                        "Unexpected message type on control channel: %u\n",
382                         msgtype);
383                return -1;
384        }
385
386        return 0;
387
388}
389
390static void *ndag_controller_run(void *tdata) {
391
392        libtrace_t *libtrace = (libtrace_t *)tdata;
393        uint16_t ptmap[65536];
394        int sock = -1;
395        struct addrinfo *receiveaddr = NULL;
396        fd_set listening;
397        struct timeval timeout;
398
399        /* ptmap is a dirty hack to allow us to quickly check if we've already
400         * assigned a stream to a thread.
401         */
402        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
403
404        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
405                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
406                        &receiveaddr);
407        if (sock == -1) {
408                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
409                        "Unable to join multicast group for nDAG control channel");
410                trace_interrupt();
411                pthread_exit(NULL);
412        }
413
414        ndag_paused = 0;
415        while ((is_halted(libtrace) == -1) && !ndag_paused) {
416                int ret;
417                char buf[CTRL_BUF_SIZE];
418
419                FD_ZERO(&listening);
420                FD_SET(sock, &listening);
421
422                timeout.tv_sec = 0;
423                timeout.tv_usec = 500000;
424
425                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
426                if (ret < 0) {
427                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
428                        break;
429                }
430
431                if (!FD_ISSET(sock, &listening)) {
432                        continue;
433                }
434
435                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
436                                receiveaddr->ai_addr,
437                                &(receiveaddr->ai_addrlen));
438                if (ret < 0) {
439                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
440                        break;
441                }
442
443                if (ret == 0) {
444                        break;
445                }
446
447                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
448                        fprintf(stderr, "Error while parsing nDAG control message.\n");
449                        continue;
450                }
451        }
452
453        if (sock >= 0) {
454                close(sock);
455        }
456
457        /* Control channel has fallen over, should probably encourage libtrace
458         * to halt the receiver threads as well.
459         */
460        if (!is_halted(libtrace)) {
461                trace_interrupt();
462        }
463
464        pthread_exit(NULL);
465}
466
467static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
468{
469        int ret;
470        uint32_t i;
471        /* Configure the set of receiver threads */
472
473        if (FORMAT_DATA->receivers == NULL) {
474                /* What if the number of threads changes between a pause and
475                 * a restart? Can this happen? */
476                FORMAT_DATA->receivers = (recvstream_t *)
477                                malloc(sizeof(recvstream_t) * maxthreads);
478        }
479
480        for (i = 0; i < maxthreads; i++) {
481                FORMAT_DATA->receivers[i].sources = NULL;
482                FORMAT_DATA->receivers[i].sourcecount = 0;
483                FORMAT_DATA->receivers[i].knownmonitors = NULL;
484                FORMAT_DATA->receivers[i].monitorcount = 0;
485                FORMAT_DATA->receivers[i].threadindex = i;
486                FORMAT_DATA->receivers[i].dropped_upstream = 0;
487                FORMAT_DATA->receivers[i].received_packets = 0;
488                FORMAT_DATA->receivers[i].missing_records = 0;
489                FORMAT_DATA->receivers[i].maxfd = -1;
490
491                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
492                                sizeof(ndag_internal_message_t));
493        }
494
495        /* Start the controller thread */
496        /* TODO consider affinity of this thread? */
497
498        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
499                        ndag_controller_run, libtrace);
500        if (ret != 0) {
501                return -1;
502        }
503        return maxthreads;
504}
505
506static int ndag_start_input(libtrace_t *libtrace) {
507        return ndag_start_threads(libtrace, 1);
508}
509
510static int ndag_pstart_input(libtrace_t *libtrace) {
511        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
512                        libtrace->perpkt_thread_count)
513                return 0;
514        return -1;
515}
516
517static void halt_ndag_receiver(recvstream_t *receiver) {
518        int j, i;
519        libtrace_message_queue_destroy(&(receiver->mqueue));
520
521        if (receiver->sources == NULL)
522                return;
523        for (i = 0; i < receiver->sourcecount; i++) {
524                streamsock_t src = receiver->sources[i];
525                if (src.saved) {
526                        for (j = 0; j < ENCAP_BUFFERS; j++) {
527                                if (src.saved[j]) {
528                                        free(src.saved[j]);
529                                }
530                        }
531                        free(src.saved);
532                }
533
534#if HAVE_RECVMMSG
535                for (j = 0; j < RECV_BATCH_SIZE; j++) {
536                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
537                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
538                        }
539                }
540#else
541                free(src.singlemsg.msg_iov);
542#endif
543
544                close(src.sock);
545        }
546        if (receiver->knownmonitors) {
547                free(receiver->knownmonitors);
548        }
549
550        if (receiver->sources) {
551                free(receiver->sources);
552        }
553}
554
555static int ndag_pause_input(libtrace_t *libtrace) {
556        int i;
557
558        /* Close the existing receiver sockets */
559        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
560               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
561        }
562        return 0;
563}
564
565static int ndag_fin_input(libtrace_t *libtrace) {
566
567        if (FORMAT_DATA->receivers) {
568                free(FORMAT_DATA->receivers);
569        }
570        if (FORMAT_DATA->multicastgroup) {
571                free(FORMAT_DATA->multicastgroup);
572        }
573        if (FORMAT_DATA->portstr) {
574                free(FORMAT_DATA->portstr);
575        }
576        if (FORMAT_DATA->localiface) {
577                free(FORMAT_DATA->localiface);
578        }
579
580        free(libtrace->format_data);
581        return 0;
582}
583
584static int ndag_prepare_packet_stream(libtrace_t *libtrace,
585                recvstream_t *rt,
586                streamsock_t *ssock, libtrace_packet_t *packet,
587                uint32_t flags) {
588
589        dag_record_t *erfptr;
590        ndag_encap_t *encaphdr;
591        uint16_t ndag_reccount = 0;
592        int nr;
593        uint16_t rlen;
594
595        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
596                packet->buf_control = TRACE_CTRL_PACKET;
597        } else {
598                packet->buf_control = TRACE_CTRL_EXTERNAL;
599        }
600
601        packet->trace = libtrace;
602        packet->buffer = ssock->nextread;
603        packet->header = ssock->nextread;
604        packet->type = TRACE_RT_DATA_ERF;
605
606        erfptr = (dag_record_t *)packet->header;
607
608        if (erfptr->flags.rxerror == 1) {
609                packet->payload = NULL;
610                erfptr->rlen = htons(erf_get_framing_length(packet));
611        } else {
612                packet->payload = (char *)packet->buffer +
613                                erf_get_framing_length(packet);
614        }
615
616        /* Update upstream drops using lctr */
617
618        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
619                /* TODO */
620        } else {
621                if (rt->received_packets > 0) {
622                        rt->dropped_upstream += ntohs(erfptr->lctr);
623                }
624        }
625
626        rt->received_packets ++;
627        ssock->recordcount += 1;
628
629        nr = ssock->nextreadind;
630        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
631                        sizeof(ndag_common_t));
632
633        ndag_reccount = ntohs(encaphdr->recordcount);
634        if ((ndag_reccount & 0x8000) != 0) {
635                /* Record was truncated -- update rlen appropriately */
636                rlen = ssock->savedsize[nr] -
637                                (ssock->nextread - ssock->saved[nr]);
638                erfptr->rlen = htons(rlen);
639        } else {
640                rlen = ntohs(erfptr->rlen);
641        }
642        ssock->nextread += rlen;
643        ssock->nextts = 0;
644
645        assert(ssock->nextread - ssock->saved[nr] <= ssock->savedsize[nr]);
646
647        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
648                /* Read everything from this buffer, mark as empty and
649                 * move on. */
650                ssock->savedsize[nr] = 0;
651                ssock->bufwaiting ++;
652
653                nr ++;
654                if (nr == ENCAP_BUFFERS) {
655                        nr = 0;
656                }
657                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
658                                sizeof(ndag_encap_t);
659                ssock->nextreadind = nr;
660        }
661
662        packet->order = erf_get_erf_timestamp(packet);
663        packet->error = rlen;
664        return rlen;
665}
666
667static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
668                libtrace_packet_t *packet UNUSED,
669                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
670                uint32_t flags UNUSED) {
671
672        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
673        return 0;
674
675}
676
677static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
678
679        ndag_monitor_t *mon;
680
681        if (rt->monitorcount == 0) {
682                rt->knownmonitors = (ndag_monitor_t *)
683                                malloc(sizeof(ndag_monitor_t) * 5);
684        } else {
685                rt->knownmonitors = (ndag_monitor_t *)
686                            realloc(rt->knownmonitors,
687                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
688        }
689
690        mon = &(rt->knownmonitors[rt->monitorcount]);
691        mon->monitorid = monid;
692        mon->laststart = 0;
693
694        rt->monitorcount ++;
695        return mon;
696}
697
698static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
699
700        streamsock_t *ssock = NULL;
701        ndag_monitor_t *mon = NULL;
702        int i;
703
704        /* TODO consider replacing this with a list or vector so we can
705         * easily remove sources that are no longer in use, rather than
706         * just setting the sock to -1 and having to check them every
707         * time we want to read a packet.
708         */
709        if (rt->sourcecount == 0) {
710                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
711        } else if ((rt->sourcecount % 10) == 0) {
712                rt->sources = (streamsock_t *)realloc(rt->sources,
713                        sizeof(streamsock_t) * (rt->sourcecount + 10));
714        }
715
716        ssock = &(rt->sources[rt->sourcecount]);
717
718        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
719                        NULL, src.port, &(ssock->srcaddr));
720
721        if (ssock->sock < 0) {
722                return -1;
723        }
724
725        for (i = 0; i < rt->monitorcount; i++) {
726                if (rt->knownmonitors[i].monitorid == src.monitor) {
727                        mon = &(rt->knownmonitors[i]);
728                        break;
729                }
730        }
731
732        if (mon == NULL) {
733                mon = add_new_knownmonitor(rt, src.monitor);
734        }
735
736        ssock->port = src.port;
737        ssock->groupaddr = src.groupaddr;
738        ssock->expectedseq = 0;
739        ssock->monitorptr = mon;
740        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
741        ssock->bufavail = ENCAP_BUFFERS;
742        ssock->bufwaiting = 0;
743        ssock->startidle = 0;
744        ssock->nextts = 0;
745
746        for (i = 0; i < ENCAP_BUFFERS; i++) {
747                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
748                ssock->savedsize[i] = 0;
749        }
750
751#if HAVE_RECVMMSG
752        for (i = 0; i < RECV_BATCH_SIZE; i++) {
753                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
754                                malloc(sizeof(struct iovec));
755                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
756                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
757                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
758                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
759                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
760                ssock->mmsgbufs[i].msg_len = 0;
761        }
762#else
763        ssock->singlemsg.msg_iov = (struct iovec *) malloc(sizeof(struct iovec));
764#endif
765
766        ssock->nextread = NULL;;
767        ssock->nextreadind = 0;
768        ssock->nextwriteind = 0;
769        ssock->recordcount = 0;
770        rt->sourcecount += 1;
771        if (ssock->sock > rt->maxfd) {
772                rt->maxfd = ssock->sock;
773        }
774
775        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
776                        ssock->groupaddr, ssock->port, rt->threadindex);
777
778        return ssock->port;
779}
780
781static int receiver_read_messages(recvstream_t *rt) {
782
783        ndag_internal_message_t msg;
784
785        while (libtrace_message_queue_try_get(&(rt->mqueue),
786                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
787                switch(msg.type) {
788                        case NDAG_CLIENT_NEWGROUP:
789                                if (add_new_streamsock(rt, msg.contents) < 0) {
790                                        return -1;
791                                }
792                                break;
793                        case NDAG_CLIENT_HALT:
794                                return 0;
795                }
796        }
797        return 1;
798
799}
800
801static inline int readable_data(streamsock_t *ssock) {
802
803        if (ssock->sock == -1) {
804                return 0;
805        }
806        if (ssock->savedsize[ssock->nextreadind] == 0) {
807                return 0;
808        }
809        /*
810        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
811                        ssock->savedsize[ssock->nextreadind]) {
812                return 0;
813        }
814        */
815        return 1;
816
817
818}
819
820static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
821
822        int i;
823        for (i = 0; i < rt->sourcecount; i++) {
824                if (rt->sources[i].monitorptr == mon) {
825                        rt->sources[i].expectedseq = 0;
826                }
827        }
828
829}
830
831static int init_receivers(streamsock_t *ssock, int required) {
832
833        int wind = ssock->nextwriteind;
834        int i = 1;
835
836#if HAVE_RECVMMSG
837        for (i = 0; i < required; i++) {
838                if (i >= RECV_BATCH_SIZE) {
839                        break;
840                }
841
842                if (wind >= ENCAP_BUFFERS) {
843                        wind = 0;
844                }
845
846                ssock->mmsgbufs[i].msg_len = 0;
847                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
848                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
849                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
850
851                wind ++;
852        }
853#else
854        assert(required > 0);
855        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
856        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
857        ssock->singlemsg.msg_iovlen = 1;
858#endif
859        return i;
860}
861
862static int check_ndag_received(streamsock_t *ssock, int index,
863                unsigned int msglen, recvstream_t *rt) {
864
865        ndag_encap_t *encaphdr;
866        ndag_monitor_t *mon;
867        uint8_t rectype;
868
869        /* Check that we have a valid nDAG encap record */
870        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
871
872        if (rectype == NDAG_PKT_KEEPALIVE) {
873                /* Keep-alive, reset startidle and carry on. Don't
874                 * change nextwrite -- we want to overwrite the
875                 * keep-alive with usable content. */
876                return 0;
877        } else if (rectype != NDAG_PKT_ENCAPERF) {
878                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
879                                ssock->groupaddr, ssock->port);
880                close(ssock->sock);
881                ssock->sock = -1;
882                return -1;
883        }
884
885        ssock->savedsize[index] = msglen;
886        ssock->nextwriteind ++;
887        ssock->bufavail --;
888
889        assert(ssock->bufavail >= 0);
890        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
891                ssock->nextwriteind = 0;
892        }
893
894        /* Get the useful info from the encap header */
895        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
896
897        mon = ssock->monitorptr;
898
899        if (mon->laststart == 0) {
900                mon->laststart = bswap_be_to_host64(encaphdr->started);
901        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
902                mon->laststart = bswap_be_to_host64(encaphdr->started);
903                reset_expected_seqs(rt, mon);
904
905                /* TODO what is a good way to indicate this to clients?
906                 * set the loss counter in the ERF header? a bit rude?
907                 * use another bit in the ERF header?
908                 * add a queryable flag to libtrace_packet_t?
909                 */
910
911        }
912
913        if (ssock->expectedseq != 0) {
914                rt->missing_records += seq_cmp(
915                                ntohl(encaphdr->seqno), ssock->expectedseq);
916
917        }
918        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
919        if (ssock->expectedseq == 0) {
920                ssock->expectedseq ++;
921        }
922
923        if (ssock->nextread == NULL) {
924                /* If this is our first read, set up 'nextread'
925                 * by skipping past the nDAG headers */
926                ssock->nextread = ssock->saved[0] +
927                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
928        }
929        return 1;
930
931}
932
933static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
934                int *gottime, recvstream_t *rt) {
935
936        int ret, ndagstat, avail;
937        int toret = 0;
938
939#if HAVE_RECVMMSG
940        int i;
941#endif
942
943        avail = init_receivers(ssock, ssock->bufavail);
944
945#if HAVE_RECVMMSG
946        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
947                        MSG_DONTWAIT, NULL);
948#else
949        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
950#endif
951        if (ret < 0) {
952                /* Nothing to receive right now, but we should still
953                 * count as 'ready' if at least one buffer is full */
954                if (errno == EAGAIN || errno == EWOULDBLOCK) {
955                        if (readable_data(ssock)) {
956                                toret = 1;
957                        }
958                        if (!(*gottime)) {
959                                gettimeofday(tv, NULL);
960                                *gottime = 1;
961                        }
962                        if (ssock->startidle == 0) {
963                                ssock->startidle = tv->tv_sec;
964                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
965                                fprintf(stderr,
966                                        "Closing channel %s:%u due to inactivity.\n",
967                                        ssock->groupaddr,
968                                        ssock->port);
969
970                                close(ssock->sock);
971                                ssock->sock = -1;
972                        }
973                } else {
974
975                        fprintf(stderr,
976                                "Error receiving encapsulated records from %s:%u -- %s \n",
977                                ssock->groupaddr, ssock->port,
978                                strerror(errno));
979                        close(ssock->sock);
980                        ssock->sock = -1;
981                }
982                return toret;
983        }
984
985        ssock->startidle = 0;
986
987#if HAVE_RECVMMSG
988        for (i = 0; i < ret; i++) {
989                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
990                                ssock->mmsgbufs[i].msg_len, rt);
991                if (ndagstat == -1) {
992                        break;
993                }
994
995                if (ndagstat == 1) {
996                        toret = 1;
997                }
998        }
999#else
1000        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1001        if (ndagstat <= 0) {
1002                toret = 0;
1003        } else {
1004                toret = 1;
1005        }
1006#endif
1007
1008        return toret;
1009}
1010
1011static int receive_from_sockets(recvstream_t *rt) {
1012
1013        int i, readybufs, gottime;
1014        struct timeval tv;
1015        fd_set fds;
1016        int maxfd = 0;
1017        struct timeval zerotv;
1018
1019        readybufs = 0;
1020        gottime = 0;
1021
1022        FD_ZERO(&fds);
1023
1024        if (rt->maxfd == -1) {
1025                return 0;
1026        }
1027
1028        zerotv.tv_sec = 0;
1029        zerotv.tv_usec = 0;
1030
1031        for (i = 0; i < rt->sourcecount; i++) {
1032                if (rt->sources[i].sock == -1) {
1033                        continue;
1034                }
1035
1036#if HAVE_RECVMMSG
1037                /* Plenty of full buffers, just use the packets in those */
1038                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
1039                        readybufs ++;
1040                        continue;
1041                }
1042#else
1043                if (rt->sources[i].bufavail == 0) {
1044                        readybufs ++;
1045                        continue;
1046                }
1047#endif
1048                FD_SET(rt->sources[i].sock, &fds);
1049                if (maxfd < rt->sources[i].sock) {
1050                        maxfd = rt->sources[i].sock;
1051                }
1052        }
1053
1054
1055        if (maxfd <= 0) {
1056                return readybufs;
1057        }
1058
1059        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1060                /* log the error? XXX */
1061                return -1;
1062        }
1063
1064        for (i = 0; i < rt->sourcecount; i++) {
1065                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1066                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
1067                                readybufs ++;
1068                        }
1069                        continue;
1070                }
1071                readybufs += receive_from_single_socket(&(rt->sources[i]),
1072                                &tv, &gottime, rt);
1073        }
1074
1075        return readybufs;
1076
1077}
1078
1079
1080static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1081                libtrace_packet_t *packet) {
1082
1083        int iserr = 0;
1084
1085        if (packet->buf_control == TRACE_CTRL_PACKET) {
1086                free(packet->buffer);
1087                packet->buffer = NULL;
1088        }
1089
1090        do {
1091                /* Make sure we shouldn't be halting */
1092                if ((iserr = is_halted(libtrace)) != -1) {
1093                        return iserr;
1094                }
1095
1096                /* Check for any messages from the control thread */
1097                iserr = receiver_read_messages(rt);
1098
1099                if (iserr <= 0) {
1100                        return iserr;
1101                }
1102
1103                /* If blocking and no sources, sleep for a bit and then try
1104                 * checking for messages again.
1105                 */
1106                if (rt->sourcecount == 0) {
1107                        usleep(10000);
1108                        continue;
1109                }
1110
1111                if ((iserr = receive_from_sockets(rt)) < 0) {
1112                        return iserr;
1113                } else if (iserr > 0) {
1114                        /* At least one of our input sockets has available
1115                         * data, let's go ahead and use what we have. */
1116                        break;
1117                }
1118
1119                /* None of our sources have anything available, we can take
1120                 * a short break rather than immediately trying again.
1121                 */
1122                if (iserr == 0) {
1123                        usleep(100);
1124                }
1125
1126        } while (1);
1127
1128        return iserr;
1129}
1130
1131static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1132                libtrace_packet_t *packet) {
1133
1134        int iserr = 0;
1135
1136        if (packet->buf_control == TRACE_CTRL_PACKET) {
1137                free(packet->buffer);
1138                packet->buffer = NULL;
1139        }
1140
1141        /* Make sure we shouldn't be halting */
1142        if ((iserr = is_halted(libtrace)) != -1) {
1143                return iserr;
1144        }
1145
1146        /* If non-blocking and there are no sources, just break */
1147        if (rt->sourcecount == 0) {
1148                return 0;
1149        }
1150
1151        return receive_from_sockets(rt);
1152}
1153
1154static streamsock_t *select_next_packet(recvstream_t *rt) {
1155        int i;
1156        streamsock_t *ssock = NULL;
1157        uint64_t earliest = 0;
1158        uint64_t currentts = 0;
1159        dag_record_t *daghdr;
1160
1161        /* If we only have one source, then no need to do any
1162         * timestamp parsing or byteswapping.
1163         */
1164        if (rt->sourcecount == 1) {
1165                if (readable_data(&(rt->sources[0]))) {
1166                        return &(rt->sources[0]);
1167                }
1168                return NULL;
1169        }
1170
1171
1172        for (i = 0; i < rt->sourcecount; i ++) {
1173                if (!readable_data(&(rt->sources[i]))) {
1174                        continue;
1175                }
1176
1177                if (rt->sources[i].nextts == 0) {
1178                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1179                        currentts = bswap_le_to_host64(daghdr->ts);
1180                        rt->sources[i].nextts = currentts;
1181                } else {
1182                        currentts = rt->sources[i].nextts;
1183                }
1184
1185                if (earliest == 0 || earliest > currentts) {
1186                        earliest = currentts;
1187                        ssock = &(rt->sources[i]);
1188                }
1189        }
1190        return ssock;
1191}
1192
1193static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1194
1195        int rem, ret;
1196        streamsock_t *nextavail = NULL;
1197        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1198                        packet);
1199
1200        if (rem <= 0) {
1201                return rem;
1202        }
1203
1204        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1205        if (nextavail == NULL) {
1206                return 0;
1207        }
1208
1209        /* nextread should point at an ERF header, so prepare 'packet' to be
1210         * a libtrace ERF packet. */
1211
1212        ret = ndag_prepare_packet_stream(libtrace,
1213                        &(FORMAT_DATA->receivers[0]), nextavail,
1214                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1215        nextavail->bufavail += nextavail->bufwaiting;
1216        nextavail->bufwaiting = 0;
1217        return ret;
1218}
1219
1220static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1221                libtrace_packet_t **packets, size_t nb_packets) {
1222
1223        recvstream_t *rt;
1224        int rem, i;
1225        size_t read_packets = 0;
1226        streamsock_t *nextavail = NULL;
1227
1228        rt = (recvstream_t *)t->format_data;
1229
1230
1231        do {
1232                /* Only check for messages once per batch */
1233                if (read_packets == 0) {
1234                        rem = receive_encap_records_block(libtrace, rt,
1235                                packets[read_packets]);
1236                } else {
1237                        rem = receive_encap_records_nonblock(libtrace, rt,
1238                                packets[read_packets]);
1239                }
1240
1241                if (rem < 0) {
1242                        return rem;
1243                }
1244
1245                if (rem == 0) {
1246                        break;
1247                }
1248
1249                nextavail = select_next_packet(rt);
1250                if (nextavail == NULL) {
1251                        break;
1252                }
1253
1254                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1255                                packets[read_packets],
1256                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1257
1258                read_packets  ++;
1259                if (read_packets >= nb_packets) {
1260                        break;
1261                }
1262        } while (1);
1263
1264        for (i = 0; i < rt->sourcecount; i++) {
1265                streamsock_t *src = &(rt->sources[i]);
1266                src->bufavail += src->bufwaiting;
1267                src->bufwaiting = 0;
1268                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1269        }
1270
1271        return read_packets;
1272
1273}
1274
1275static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1276                libtrace_packet_t *packet) {
1277
1278
1279        libtrace_eventobj_t event = {0,0,0.0,0};
1280        int rem, i;
1281        streamsock_t *nextavail = NULL;
1282
1283        /* Only check for messages once per call */
1284        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1285        if (rem <= 0) {
1286                event.type = TRACE_EVENT_TERMINATE;
1287                return event;
1288        }
1289
1290        do {
1291                rem = receive_encap_records_nonblock(libtrace,
1292                                &(FORMAT_DATA->receivers[0]), packet);
1293
1294                if (rem < 0) {
1295                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1296                                "Received invalid nDAG records.");
1297                        event.type = TRACE_EVENT_TERMINATE;
1298                        break;
1299                }
1300
1301                if (rem == 0) {
1302                        /* Either we've been halted or we've got no packets
1303                         * right now. */
1304                        if (is_halted(libtrace) == 0) {
1305                                event.type = TRACE_EVENT_TERMINATE;
1306                                break;
1307                        }
1308                        event.type = TRACE_EVENT_SLEEP;
1309                        event.seconds = 0.0001;
1310                        break;
1311                }
1312
1313                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1314                if (nextavail == NULL) {
1315                        event.type = TRACE_EVENT_SLEEP;
1316                        event.seconds = 0.0001;
1317                        break;
1318                }
1319
1320                event.type = TRACE_EVENT_PACKET;
1321                ndag_prepare_packet_stream(libtrace,
1322                                &(FORMAT_DATA->receivers[0]), nextavail,
1323                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1324                event.size = trace_get_capture_length(packet) +
1325                                trace_get_framing_length(packet);
1326
1327                if (libtrace->filter) {
1328                        int filtret = trace_apply_filter(libtrace->filter,
1329                                        packet);
1330                        if (filtret == -1) {
1331                                trace_set_err(libtrace,
1332                                                TRACE_ERR_BAD_FILTER,
1333                                                "Bad BPF Filter");
1334                                event.type = TRACE_EVENT_TERMINATE;
1335                                break;
1336                        }
1337
1338                        if (filtret == 0) {
1339                                /* Didn't match filter, try next one */
1340                                libtrace->filtered_packets ++;
1341                                trace_clear_cache(packet);
1342                                continue;
1343                        }
1344                }
1345
1346                if (libtrace->snaplen > 0) {
1347                        trace_set_capture_length(packet, libtrace->snaplen);
1348                }
1349                libtrace->accepted_packets ++;
1350                break;
1351        } while (1);
1352
1353        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1354                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1355                src->bufavail += src->bufwaiting;
1356                src->bufwaiting = 0;
1357                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
1358        }
1359
1360        return event;
1361}
1362
1363static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1364
1365        int i;
1366
1367        stat->dropped_valid = 1;
1368        stat->dropped = 0;
1369        stat->received_valid = 1;
1370        stat->received = 0;
1371        stat->missing_valid = 1;
1372        stat->missing = 0;
1373
1374        /* TODO Is this thread safe? */
1375        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1376                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1377                stat->received += FORMAT_DATA->receivers[i].received_packets;
1378                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1379        }
1380
1381}
1382
1383static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1384                libtrace_stat_t *stat) {
1385
1386        recvstream_t *recvr = (recvstream_t *)t->format_data;
1387
1388        if (libtrace == NULL)
1389                return;
1390        /* TODO Is this thread safe */
1391        stat->dropped_valid = 1;
1392        stat->dropped = recvr->dropped_upstream;
1393
1394        stat->received_valid = 1;
1395        stat->received = recvr->received_packets;
1396
1397        stat->missing_valid = 1;
1398        stat->missing = recvr->missing_records;
1399
1400}
1401
1402static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1403                bool reader) {
1404        recvstream_t *recvr;
1405
1406        if (!reader || t->type != THREAD_PERPKT) {
1407                return 0;
1408        }
1409
1410        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1411        t->format_data = recvr;
1412
1413        return 0;
1414}
1415
1416static struct libtrace_format_t ndag = {
1417
1418        "ndag",
1419        "",
1420        TRACE_FORMAT_NDAG,
1421        NULL,                   /* probe filename */
1422        NULL,                   /* probe magic */
1423        ndag_init_input,        /* init_input */
1424        NULL,                   /* config_input */
1425        ndag_start_input,       /* start_input */
1426        ndag_pause_input,       /* pause_input */
1427        NULL,                   /* init_output */
1428        NULL,                   /* config_output */
1429        NULL,                   /* start_output */
1430        ndag_fin_input,         /* fin_input */
1431        NULL,                   /* fin_output */
1432        ndag_read_packet,       /* read_packet */
1433        ndag_prepare_packet,    /* prepare_packet */
1434        NULL,                   /* fin_packet */
1435        NULL,                   /* write_packet */
1436        NULL,                   /* flush_output */
1437        erf_get_link_type,      /* get_link_type */
1438        erf_get_direction,      /* get_direction */
1439        erf_set_direction,      /* set_direction */
1440        erf_get_erf_timestamp,  /* get_erf_timestamp */
1441        NULL,                   /* get_timeval */
1442        NULL,                   /* get_seconds */
1443        NULL,                   /* get_timespec */
1444        NULL,                   /* seek_erf */
1445        NULL,                   /* seek_timeval */
1446        NULL,                   /* seek_seconds */
1447        erf_get_capture_length, /* get_capture_length */
1448        erf_get_wire_length,    /* get_wire_length */
1449        erf_get_framing_length, /* get_framing_length */
1450        erf_set_capture_length, /* set_capture_length */
1451        NULL,                   /* get_received_packets */
1452        NULL,                   /* get_filtered_packets */
1453        NULL,                   /* get_dropped_packets */
1454        ndag_get_statistics,    /* get_statistics */
1455        NULL,                   /* get_fd */
1456        trace_event_ndag,       /* trace_event */
1457        NULL,                   /* help */
1458        NULL,                   /* next pointer */
1459        {true, 0},              /* live packet capture */
1460        ndag_pstart_input,      /* parallel start */
1461        ndag_pread_packets,     /* parallel read */
1462        ndag_pause_input,       /* parallel pause */
1463        NULL,
1464        ndag_pregister_thread,  /* register thread */
1465        NULL,
1466        ndag_get_thread_stats   /* per-thread stats */
1467};
1468
1469void ndag_constructor(void) {
1470        register_format(&ndag);
1471}
Note: See TracBrowser for help on using the repository browser.