source: lib/format_ndag.c @ 8a58afd

develop
Last change on this file since 8a58afd was 8a58afd, checked in by Shane Alcock <salcock@…>, 2 years ago

format_ndag: only call FD_ZERO if we're going to use the fd set

A surprising amount of our "receive" workload is calling FD_ZERO
when, most of the time, our buffers contain enough packets that we
never need to call select() anyway.

  • Property mode set to 100644
File size: 47.2 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <errno.h>
38#include <fcntl.h>
39#include <stdio.h>
40#include <string.h>
41#include <unistd.h>
42#include <stdlib.h>
43#include <net/if.h>
44#include <sys/types.h>
45#include <sys/socket.h>
46#include <netdb.h>
47
48#include "format_ndag.h"
49
50#define NDAG_IDLE_TIMEOUT (600)
51#define ENCAP_BUFSIZE (10000)
52#define CTRL_BUF_SIZE (10000)
53#define ENCAP_BUFFERS (1000)
54
55#define RECV_BATCH_SIZE (50)
56
57#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
58
59static struct libtrace_format_t ndag;
60
61volatile int ndag_paused = 0;
62
63typedef struct monitor {
64        uint16_t monitorid;
65        uint64_t laststart;
66} ndag_monitor_t;
67
68
69typedef struct streamsource {
70        uint16_t monitor;
71        char *groupaddr;
72        char *localiface;
73        uint16_t port;
74} streamsource_t;
75
76typedef struct streamsock {
77        char *groupaddr;
78        int sock;
79        struct addrinfo *srcaddr;
80        uint16_t port;
81        uint32_t expectedseq;
82        ndag_monitor_t *monitorptr;
83        char **saved;
84        char *nextread;
85        int nextreadind;
86        int nextwriteind;
87        int savedsize[ENCAP_BUFFERS];
88        uint64_t nextts;
89        uint32_t startidle;
90        uint64_t recordcount;
91
92        int bufavail;
93        int bufwaiting;
94
95#if HAVE_DECL_RECVMMSG
96        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
97#else
98        struct msghdr singlemsg;
99#endif
100
101} streamsock_t;
102
103typedef struct recvstream {
104        streamsock_t *sources;
105        uint16_t sourcecount;
106        libtrace_message_queue_t mqueue;
107        int threadindex;
108        ndag_monitor_t *knownmonitors;
109        uint16_t monitorcount;
110
111        uint64_t dropped_upstream;
112        uint64_t missing_records;
113        uint64_t received_packets;
114
115        int maxfd;
116} recvstream_t;
117
118typedef struct ndag_format_data {
119        char *multicastgroup;
120        char *portstr;
121        char *localiface;
122        uint16_t nextthreadid;
123        recvstream_t *receivers;
124
125        pthread_t controlthread;
126        libtrace_message_queue_t controlqueue;
127} ndag_format_data_t;
128
129enum {
130        NDAG_CLIENT_HALT = 0x01,
131        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
132        NDAG_CLIENT_NEWGROUP = 0x03
133};
134
135typedef struct ndagreadermessage {
136        uint8_t type;
137        streamsource_t contents;
138} ndag_internal_message_t;
139
140
141static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
142
143        /* Calculate seq_a - seq_b, taking wraparound into account */
144        if (seq_a == seq_b) return 0;
145
146        if (seq_a > seq_b) {
147                return (int) (seq_a - seq_b);
148        }
149
150        /* -1 for the wrap and another -1 because we don't use zero */
151        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
152}
153
154static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
155        ndag_common_t *header = (ndag_common_t *)msgbuf;
156
157        if (msgsize < sizeof(ndag_common_t)) {
158                fprintf(stderr,
159                        "nDAG message does not have a complete nDAG header.\n");
160                return 0;
161        }
162
163        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
164                fprintf(stderr,
165                        "nDAG message does not have a valid magic number.\n");
166                return 0;
167        }
168
169        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
170                fprintf(stderr,
171                        "nDAG message has an invalid header version: %u\n",
172                                header->version);
173                return 0;
174        }
175
176        return header->type;
177}
178
179static int join_multicast_group(char *groupaddr, char *localiface,
180        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
181
182        struct addrinfo hints;
183        struct addrinfo *gotten;
184        struct addrinfo *group;
185        unsigned int interface;
186        char pstr[16];
187        struct group_req greq;
188        int bufsize, val;
189
190        int sock;
191
192        if (portstr == NULL) {
193                snprintf(pstr, 15, "%u", portnum);
194                portstr = pstr;
195        }
196
197        interface = if_nametoindex(localiface);
198        if (interface == 0) {
199                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
200                                localiface, strerror(errno));
201                return -1;
202        }
203
204        hints.ai_family = PF_UNSPEC;
205        hints.ai_socktype = SOCK_DGRAM;
206        hints.ai_flags = AI_PASSIVE;
207        hints.ai_protocol = 0;
208
209        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
210                fprintf(stderr,
211                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
212                                portstr, strerror(errno));
213                return -1;
214        }
215
216        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
217                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
218                                groupaddr, strerror(errno));
219                return -1;
220        }
221
222        *srcinfo = gotten;
223        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
224        if (sock < 0) {
225                fprintf(stderr,
226                        "Failed to create multicast socket for %s:%s -- %s\n",
227                                groupaddr, portstr, strerror(errno));
228                goto sockcreateover;
229        }
230
231        val = 1;
232        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
233                fprintf(stderr,
234                        "Failed to set REUSEADDR socket option for %s:%s -- %s\n",
235                                groupaddr, portstr, strerror(errno));
236                goto sockcreateover;
237        }
238
239        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
240        {
241                fprintf(stderr,
242                        "Failed to bind to multicast socket %s:%s -- %s\n",
243                                groupaddr, portstr, strerror(errno));
244                sock = -1;
245                goto sockcreateover;
246        }
247
248        greq.gr_interface = interface;
249        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
250
251        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
252                        sizeof(greq)) < 0) {
253                fprintf(stderr,
254                        "Failed to join multicast group %s:%s -- %s\n",
255                                groupaddr, portstr, strerror(errno));
256                close(sock);
257                sock = -1;
258                goto sockcreateover;
259        }
260
261        bufsize = 16 * 1024 * 1024;
262        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
263                                (socklen_t)sizeof(int)) < 0) {
264
265                fprintf(stderr,
266                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
267                                groupaddr, portstr, strerror(errno));
268                close(sock);
269                sock = -1;
270                goto sockcreateover;
271        }
272
273sockcreateover:
274        freeaddrinfo(group);
275        return sock;
276}
277
278
279static int ndag_init_input(libtrace_t *libtrace) {
280
281        char *scan = NULL;
282        char *next = NULL;
283
284        libtrace->format_data = (ndag_format_data_t *)malloc(
285                        sizeof(ndag_format_data_t));
286
287        if (!libtrace->format_data) {
288                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
289                        "format data inside ndag_init_input()");
290                return -1;
291        }
292
293        FORMAT_DATA->multicastgroup = NULL;
294        FORMAT_DATA->portstr = NULL;
295        FORMAT_DATA->localiface = NULL;
296        FORMAT_DATA->nextthreadid = 0;
297        FORMAT_DATA->receivers = NULL;
298
299        scan = strchr(libtrace->uridata, ',');
300        if (scan == NULL) {
301                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
302                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
303                return -1;
304        }
305        FORMAT_DATA->localiface = strndup(libtrace->uridata,
306                        (size_t)(scan - libtrace->uridata));
307        next = scan + 1;
308
309        scan = strchr(next, ',');
310        if (scan == NULL) {
311                FORMAT_DATA->portstr = strdup("9001");
312                FORMAT_DATA->multicastgroup = strdup(next);
313        } else {
314                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
315
316                FORMAT_DATA->portstr = strdup(scan + 1);
317        }
318        return 0;
319}
320
321static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
322                uint16_t portnum, uint16_t monid) {
323
324        ndag_internal_message_t alert;
325
326        alert.type = NDAG_CLIENT_NEWGROUP;
327        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
328        alert.contents.localiface = FORMAT_DATA->localiface;
329        alert.contents.port = portnum;
330        alert.contents.monitor = monid;
331
332        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
333                        (void *)&alert);
334
335}
336
337static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
338                int msgsize, uint16_t *ptmap) {
339
340        int i;
341        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
342        uint8_t msgtype;
343
344        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
345        if (msgtype == 0) {
346                return -1;
347        }
348
349        msgsize -= sizeof(ndag_common_t);
350        if (msgtype == NDAG_PKT_BEACON) {
351                /* If message is a beacon, make sure every port included in the
352                 * beacon is assigned to a receive thread.
353                 */
354                uint16_t *ptr, numstreams;
355
356                if ((uint32_t)msgsize < sizeof(uint16_t)) {
357                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
358                        return -1;
359                }
360
361                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
362                numstreams = ntohs(*ptr);
363                ptr ++;
364
365                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
366                {
367                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
368                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
369                        return -1;
370                }
371
372                for (i = 0; i < numstreams; i++) {
373                        uint16_t streamport = ntohs(*ptr);
374
375                        if (ptmap[streamport] == 0xffff) {
376                                new_group_alert(libtrace,
377                                        FORMAT_DATA->nextthreadid, streamport,
378                                        ntohs(ndaghdr->monitorid));
379
380                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
381
382                                if (libtrace->perpkt_thread_count == 0) {
383                                        FORMAT_DATA->nextthreadid = 0;
384                                } else {
385                                        FORMAT_DATA->nextthreadid =
386                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
387                                }
388                        }
389
390                        ptr ++;
391                }
392        } else {
393                fprintf(stderr,
394                        "Unexpected message type on control channel: %u\n",
395                         msgtype);
396                return -1;
397        }
398
399        return 0;
400
401}
402
403static void *ndag_controller_run(void *tdata) {
404
405        libtrace_t *libtrace = (libtrace_t *)tdata;
406        uint16_t ptmap[65536];
407        int sock = -1;
408        struct addrinfo *receiveaddr = NULL;
409        fd_set listening;
410        struct timeval timeout;
411
412        /* ptmap is a dirty hack to allow us to quickly check if we've already
413         * assigned a stream to a thread.
414         */
415        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
416
417        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
418                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
419                        &receiveaddr);
420        if (sock == -1) {
421                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
422                        "Unable to join multicast group for nDAG control channel");
423                trace_interrupt();
424                pthread_exit(NULL);
425        }
426
427        ndag_paused = 0;
428        while ((is_halted(libtrace) == -1) && !ndag_paused) {
429                int ret;
430                char buf[CTRL_BUF_SIZE];
431
432                FD_ZERO(&listening);
433                FD_SET(sock, &listening);
434
435                timeout.tv_sec = 0;
436                timeout.tv_usec = 500000;
437
438                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
439                if (ret < 0) {
440                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
441                        break;
442                }
443
444                if (!FD_ISSET(sock, &listening)) {
445                        continue;
446                }
447
448                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
449                                receiveaddr->ai_addr,
450                                &(receiveaddr->ai_addrlen));
451                if (ret < 0) {
452                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
453                        break;
454                }
455
456                if (ret == 0) {
457                        break;
458                }
459
460                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
461                        fprintf(stderr, "Error while parsing nDAG control message.\n");
462                        continue;
463                }
464        }
465
466        if (sock >= 0) {
467                close(sock);
468        }
469
470        /* Control channel has fallen over, should probably encourage libtrace
471         * to halt the receiver threads as well.
472         */
473        if (!is_halted(libtrace)) {
474                trace_interrupt();
475        }
476
477        pthread_exit(NULL);
478}
479
480static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
481{
482        int ret;
483        uint32_t i;
484        /* Configure the set of receiver threads */
485
486        if (FORMAT_DATA->receivers == NULL) {
487                /* What if the number of threads changes between a pause and
488                 * a restart? Can this happen? */
489                FORMAT_DATA->receivers = (recvstream_t *)
490                                malloc(sizeof(recvstream_t) * maxthreads);
491        }
492
493        for (i = 0; i < maxthreads; i++) {
494                FORMAT_DATA->receivers[i].sources = NULL;
495                FORMAT_DATA->receivers[i].sourcecount = 0;
496                FORMAT_DATA->receivers[i].knownmonitors = NULL;
497                FORMAT_DATA->receivers[i].monitorcount = 0;
498                FORMAT_DATA->receivers[i].threadindex = i;
499                FORMAT_DATA->receivers[i].dropped_upstream = 0;
500                FORMAT_DATA->receivers[i].received_packets = 0;
501                FORMAT_DATA->receivers[i].missing_records = 0;
502                FORMAT_DATA->receivers[i].maxfd = -1;
503
504                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
505                                sizeof(ndag_internal_message_t));
506        }
507
508        /* Start the controller thread */
509        /* TODO consider affinity of this thread? */
510
511        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
512                        ndag_controller_run, libtrace);
513        if (ret != 0) {
514                return -1;
515        }
516        return maxthreads;
517}
518
519static int ndag_start_input(libtrace_t *libtrace) {
520        return ndag_start_threads(libtrace, 1);
521}
522
523static int ndag_pstart_input(libtrace_t *libtrace) {
524        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
525                        libtrace->perpkt_thread_count)
526                return 0;
527        return -1;
528}
529
530static void halt_ndag_receiver(recvstream_t *receiver) {
531        int j, i;
532        libtrace_message_queue_destroy(&(receiver->mqueue));
533
534        if (receiver->sources == NULL)
535                return;
536        for (i = 0; i < receiver->sourcecount; i++) {
537                streamsock_t src = receiver->sources[i];
538                if (src.saved) {
539                        for (j = 0; j < ENCAP_BUFFERS; j++) {
540                                if (src.saved[j]) {
541                                        free(src.saved[j]);
542                                }
543                        }
544                        free(src.saved);
545                }
546
547#if HAVE_DECL_RECVMMSG
548                for (j = 0; j < RECV_BATCH_SIZE; j++) {
549                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
550                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
551                        }
552                }
553#else
554                free(src.singlemsg.msg_iov);
555#endif
556
557                if (src.sock != -1) {
558                        close(src.sock);
559                }
560        }
561        if (receiver->knownmonitors) {
562                free(receiver->knownmonitors);
563        }
564
565        if (receiver->sources) {
566                free(receiver->sources);
567        }
568}
569
570static int ndag_pause_input(libtrace_t *libtrace) {
571        int i;
572
573        /* Close the existing receiver sockets */
574        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
575               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
576        }
577        return 0;
578}
579
580static int ndag_fin_input(libtrace_t *libtrace) {
581
582        if (FORMAT_DATA->receivers) {
583                free(FORMAT_DATA->receivers);
584        }
585        if (FORMAT_DATA->multicastgroup) {
586                free(FORMAT_DATA->multicastgroup);
587        }
588        if (FORMAT_DATA->portstr) {
589                free(FORMAT_DATA->portstr);
590        }
591        if (FORMAT_DATA->localiface) {
592                free(FORMAT_DATA->localiface);
593        }
594
595        free(libtrace->format_data);
596        return 0;
597}
598
599static int ndag_prepare_packet_stream(libtrace_t *libtrace,
600                recvstream_t *rt,
601                streamsock_t *ssock, libtrace_packet_t *packet,
602                uint32_t flags) {
603
604        dag_record_t *erfptr;
605        ndag_encap_t *encaphdr;
606        uint16_t ndag_reccount = 0;
607        int nr;
608        uint16_t rlen;
609
610        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
611                packet->buf_control = TRACE_CTRL_PACKET;
612        } else {
613                packet->buf_control = TRACE_CTRL_EXTERNAL;
614        }
615
616        packet->trace = libtrace;
617        packet->buffer = ssock->nextread;
618        packet->header = ssock->nextread;
619        packet->type = TRACE_RT_DATA_ERF;
620
621        erfptr = (dag_record_t *)packet->header;
622
623        if (erfptr->flags.rxerror == 1) {
624                packet->payload = NULL;
625                erfptr->rlen = htons(erf_get_framing_length(packet));
626        } else {
627                packet->payload = (char *)packet->buffer +
628                                erf_get_framing_length(packet);
629        }
630
631        /* Update upstream drops using lctr */
632
633        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
634                /* TODO */
635        } else {
636                if (rt->received_packets > 0) {
637                        rt->dropped_upstream += ntohs(erfptr->lctr);
638                }
639        }
640
641        rt->received_packets ++;
642        ssock->recordcount += 1;
643
644        nr = ssock->nextreadind;
645        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
646                        sizeof(ndag_common_t));
647
648        ndag_reccount = ntohs(encaphdr->recordcount);
649        if ((ndag_reccount & 0x8000) != 0) {
650                /* Record was truncated -- update rlen appropriately */
651                rlen = ssock->savedsize[nr] -
652                                (ssock->nextread - ssock->saved[nr]);
653                erfptr->rlen = htons(rlen);
654        } else {
655                rlen = ntohs(erfptr->rlen);
656        }
657        ssock->nextread += rlen;
658        ssock->nextts = 0;
659
660        if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) {
661                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the "
662                        "nDAG receive buffer, probably due to a invalid rlen, in ndag_prepare_packet_stream()");
663                return -1;
664        }
665
666        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
667                /* Read everything from this buffer, mark as empty and
668                 * move on. */
669                ssock->savedsize[nr] = 0;
670                ssock->bufwaiting ++;
671
672                nr ++;
673                if (nr == ENCAP_BUFFERS) {
674                        nr = 0;
675                }
676                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
677                                sizeof(ndag_encap_t);
678                ssock->nextreadind = nr;
679        }
680
681        packet->order = erf_get_erf_timestamp(packet);
682        packet->error = rlen;
683        return rlen;
684}
685
686static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
687                libtrace_packet_t *packet UNUSED,
688                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
689                uint32_t flags UNUSED) {
690
691        fprintf(stderr, "Sending nDAG records over RT doesn't make sense! Please stop\n");
692        return 0;
693
694}
695
696static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
697
698        ndag_monitor_t *mon;
699
700        if (rt->monitorcount == 0) {
701                rt->knownmonitors = (ndag_monitor_t *)
702                                malloc(sizeof(ndag_monitor_t) * 5);
703        } else {
704                rt->knownmonitors = (ndag_monitor_t *)
705                            realloc(rt->knownmonitors,
706                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
707        }
708
709        mon = &(rt->knownmonitors[rt->monitorcount]);
710        mon->monitorid = monid;
711        mon->laststart = 0;
712
713        rt->monitorcount ++;
714        return mon;
715}
716
717static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
718
719        streamsock_t *ssock = NULL;
720        ndag_monitor_t *mon = NULL;
721        int i;
722
723        /* TODO consider replacing this with a list or vector so we can
724         * easily remove sources that are no longer in use, rather than
725         * just setting the sock to -1 and having to check them every
726         * time we want to read a packet.
727         */
728        if (rt->sourcecount == 0) {
729                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
730        } else if ((rt->sourcecount % 10) == 0) {
731                rt->sources = (streamsock_t *)realloc(rt->sources,
732                        sizeof(streamsock_t) * (rt->sourcecount + 10));
733        }
734
735        ssock = &(rt->sources[rt->sourcecount]);
736
737        for (i = 0; i < rt->monitorcount; i++) {
738                if (rt->knownmonitors[i].monitorid == src.monitor) {
739                        mon = &(rt->knownmonitors[i]);
740                        break;
741                }
742        }
743
744        if (mon == NULL) {
745                mon = add_new_knownmonitor(rt, src.monitor);
746        }
747
748        ssock->port = src.port;
749        ssock->groupaddr = src.groupaddr;
750        ssock->expectedseq = 0;
751        ssock->monitorptr = mon;
752        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
753        ssock->bufavail = ENCAP_BUFFERS;
754        ssock->bufwaiting = 0;
755        ssock->startidle = 0;
756        ssock->nextts = 0;
757
758        for (i = 0; i < ENCAP_BUFFERS; i++) {
759                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
760                ssock->savedsize[i] = 0;
761        }
762
763        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
764                        NULL, src.port, &(ssock->srcaddr));
765
766        if (ssock->sock < 0) {
767                return -1;
768        }
769
770        if (ssock->sock > rt->maxfd) {
771                rt->maxfd = ssock->sock;
772        }
773
774#if HAVE_DECL_RECVMMSG
775        for (i = 0; i < RECV_BATCH_SIZE; i++) {
776                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
777                                malloc(sizeof(struct iovec));
778                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
779                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
780                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
781                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
782                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
783                ssock->mmsgbufs[i].msg_len = 0;
784        }
785#else
786        ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec));
787#endif
788
789        ssock->nextread = NULL;;
790        ssock->nextreadind = 0;
791        ssock->nextwriteind = 0;
792        ssock->recordcount = 0;
793        rt->sourcecount += 1;
794
795        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
796                        ssock->groupaddr, ssock->port, rt->threadindex);
797
798        return ssock->port;
799}
800
801static int receiver_read_messages(recvstream_t *rt) {
802
803        ndag_internal_message_t msg;
804
805        while (libtrace_message_queue_try_get(&(rt->mqueue),
806                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
807                switch(msg.type) {
808                        case NDAG_CLIENT_NEWGROUP:
809                                if (add_new_streamsock(rt, msg.contents) < 0) {
810                                        return -1;
811                                }
812                                break;
813                        case NDAG_CLIENT_HALT:
814                                return 0;
815                }
816        }
817        return 1;
818
819}
820
821static inline int readable_data(streamsock_t *ssock) {
822
823        if (ssock->sock == -1) {
824                return 0;
825        }
826        if (ssock->savedsize[ssock->nextreadind] == 0) {
827                return 0;
828        }
829        /*
830        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
831                        ssock->savedsize[ssock->nextreadind]) {
832                return 0;
833        }
834        */
835        return 1;
836
837
838}
839
840static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
841
842        int i;
843        for (i = 0; i < rt->sourcecount; i++) {
844                if (rt->sources[i].monitorptr == mon) {
845                        rt->sources[i].expectedseq = 0;
846                }
847        }
848
849}
850
851static int init_receivers(streamsock_t *ssock, int required) {
852
853        int wind = ssock->nextwriteind;
854        int i = 1;
855
856#if HAVE_DECL_RECVMMSG
857        for (i = 0; i < required; i++) {
858                if (i >= RECV_BATCH_SIZE) {
859                        break;
860                }
861
862                if (wind >= ENCAP_BUFFERS) {
863                        wind = 0;
864                }
865
866                ssock->mmsgbufs[i].msg_len = 0;
867                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
868                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
869                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
870
871                wind ++;
872        }
873#else
874        if (required <= 0) {
875                fprintf(stderr, "You are required to have atleast 1 receiver in init_receivers\n");
876                return TRACE_ERR_INIT_FAILED;
877        }
878        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
879        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
880        ssock->singlemsg.msg_iovlen = 1;
881#endif
882        return i;
883}
884
885static int check_ndag_received(streamsock_t *ssock, int index,
886                unsigned int msglen, recvstream_t *rt) {
887
888        ndag_encap_t *encaphdr;
889        ndag_monitor_t *mon;
890        uint8_t rectype;
891
892        /* Check that we have a valid nDAG encap record */
893        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
894
895        if (rectype == NDAG_PKT_KEEPALIVE) {
896                /* Keep-alive, reset startidle and carry on. Don't
897                 * change nextwrite -- we want to overwrite the
898                 * keep-alive with usable content. */
899                return 0;
900        } else if (rectype != NDAG_PKT_ENCAPERF) {
901                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
902                                ssock->groupaddr, ssock->port);
903                close(ssock->sock);
904                ssock->sock = -1;
905                return -1;
906        }
907
908        ssock->savedsize[index] = msglen;
909        ssock->nextwriteind ++;
910        ssock->bufavail --;
911
912        if (ssock->bufavail < 0) {
913                fprintf(stderr, "No space in buffer in check_ndag_received()\n");
914                return -1;
915        }
916        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
917                ssock->nextwriteind = 0;
918        }
919
920        /* Get the useful info from the encap header */
921        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
922
923        mon = ssock->monitorptr;
924
925        if (mon->laststart == 0) {
926                mon->laststart = bswap_be_to_host64(encaphdr->started);
927        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
928                mon->laststart = bswap_be_to_host64(encaphdr->started);
929                reset_expected_seqs(rt, mon);
930
931                /* TODO what is a good way to indicate this to clients?
932                 * set the loss counter in the ERF header? a bit rude?
933                 * use another bit in the ERF header?
934                 * add a queryable flag to libtrace_packet_t?
935                 */
936
937        }
938
939        if (ssock->expectedseq != 0) {
940                rt->missing_records += seq_cmp(
941                                ntohl(encaphdr->seqno), ssock->expectedseq);
942
943        }
944        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
945        if (ssock->expectedseq == 0) {
946                ssock->expectedseq ++;
947        }
948
949        if (ssock->nextread == NULL) {
950                /* If this is our first read, set up 'nextread'
951                 * by skipping past the nDAG headers */
952                ssock->nextread = ssock->saved[0] +
953                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
954        }
955        return 1;
956
957}
958
959static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
960                int *gottime, recvstream_t *rt) {
961
962        int ret, ndagstat, avail;
963        int toret = 0;
964
965#if HAVE_DECL_RECVMMSG
966        int i;
967#endif
968
969        avail = init_receivers(ssock, ssock->bufavail);
970
971#if HAVE_DECL_RECVMMSG
972        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
973                        MSG_DONTWAIT, NULL);
974#else
975        if (avail != 1) {
976                return 0;
977        }
978
979        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
980#endif
981        if (ret < 0) {
982                /* Nothing to receive right now, but we should still
983                 * count as 'ready' if at least one buffer is full */
984                if (errno == EAGAIN || errno == EWOULDBLOCK) {
985                        if (readable_data(ssock)) {
986                                toret = 1;
987                        }
988                        if (!(*gottime)) {
989                                gettimeofday(tv, NULL);
990                                *gottime = 1;
991                        }
992                        if (ssock->startidle == 0) {
993                                ssock->startidle = tv->tv_sec;
994                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
995                                fprintf(stderr,
996                                        "Closing channel %s:%u due to inactivity.\n",
997                                        ssock->groupaddr,
998                                        ssock->port);
999
1000                                close(ssock->sock);
1001                                ssock->sock = -1;
1002                        }
1003                } else {
1004
1005                        fprintf(stderr,
1006                                "Error receiving encapsulated records from %s:%u -- %s \n",
1007                                ssock->groupaddr, ssock->port,
1008                                strerror(errno));
1009                        close(ssock->sock);
1010                        ssock->sock = -1;
1011                }
1012                return toret;
1013        }
1014
1015        ssock->startidle = 0;
1016
1017#if HAVE_DECL_RECVMMSG
1018        for (i = 0; i < ret; i++) {
1019                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
1020                                ssock->mmsgbufs[i].msg_len, rt);
1021                if (ndagstat == -1) {
1022                        break;
1023                }
1024
1025                if (ndagstat == 1) {
1026                        toret = 1;
1027                }
1028        }
1029#else
1030        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1031        if (ndagstat <= 0) {
1032                toret = 0;
1033        } else {
1034                toret = 1;
1035        }
1036#endif
1037
1038        return toret;
1039}
1040
1041static int receive_from_sockets(recvstream_t *rt) {
1042
1043        int i, readybufs, gottime;
1044        struct timeval tv;
1045        fd_set fds;
1046        int maxfd = 0;
1047        struct timeval zerotv;
1048
1049        readybufs = 0;
1050        gottime = 0;
1051
1052        if (rt->maxfd == -1) {
1053                return 0;
1054        }
1055
1056        zerotv.tv_sec = 0;
1057        zerotv.tv_usec = 0;
1058
1059        for (i = 0; i < rt->sourcecount; i++) {
1060                if (rt->sources[i].sock == -1) {
1061                        continue;
1062                }
1063
1064#if HAVE_DECL_RECVMMSG
1065                /* Plenty of full buffers, just use the packets in those */
1066                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
1067                        readybufs ++;
1068                        continue;
1069                }
1070#else
1071                if (rt->sources[i].bufavail == 0) {
1072                        readybufs ++;
1073                        continue;
1074                }
1075#endif
1076                if (maxfd == 0) {
1077                        FD_ZERO(&fds);
1078                }
1079                FD_SET(rt->sources[i].sock, &fds);
1080                if (maxfd < rt->sources[i].sock) {
1081                        maxfd = rt->sources[i].sock;
1082                }
1083        }
1084
1085
1086        if (maxfd <= 0) {
1087                return readybufs;
1088        }
1089
1090        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1091                /* log the error? XXX */
1092                return -1;
1093        }
1094
1095        for (i = 0; i < rt->sourcecount; i++) {
1096                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1097                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
1098                                readybufs ++;
1099                        }
1100                        continue;
1101                }
1102                readybufs += receive_from_single_socket(&(rt->sources[i]),
1103                                &tv, &gottime, rt);
1104        }
1105
1106        return readybufs;
1107
1108}
1109
1110
1111static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1112                libtrace_packet_t *packet) {
1113
1114        int iserr = 0;
1115
1116        if (packet->buf_control == TRACE_CTRL_PACKET) {
1117                free(packet->buffer);
1118                packet->buffer = NULL;
1119        }
1120
1121        do {
1122                /* Make sure we shouldn't be halting */
1123                if ((iserr = is_halted(libtrace)) != -1) {
1124                        return iserr;
1125                }
1126
1127                /* Check for any messages from the control thread */
1128                iserr = receiver_read_messages(rt);
1129
1130                if (iserr <= 0) {
1131                        return iserr;
1132                }
1133
1134                /* If blocking and no sources, sleep for a bit and then try
1135                 * checking for messages again.
1136                 */
1137                if (rt->sourcecount == 0) {
1138                        usleep(10000);
1139                        continue;
1140                }
1141
1142                if ((iserr = receive_from_sockets(rt)) < 0) {
1143                        return iserr;
1144                } else if (iserr > 0) {
1145                        /* At least one of our input sockets has available
1146                         * data, let's go ahead and use what we have. */
1147                        break;
1148                }
1149
1150                /* None of our sources have anything available, we can take
1151                 * a short break rather than immediately trying again.
1152                 */
1153                if (iserr == 0) {
1154                        usleep(100);
1155                }
1156
1157        } while (1);
1158
1159        return iserr;
1160}
1161
1162static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1163                libtrace_packet_t *packet) {
1164
1165        int iserr = 0;
1166
1167        if (packet->buf_control == TRACE_CTRL_PACKET) {
1168                free(packet->buffer);
1169                packet->buffer = NULL;
1170        }
1171
1172        /* Make sure we shouldn't be halting */
1173        if ((iserr = is_halted(libtrace)) != -1) {
1174                return iserr;
1175        }
1176
1177        /* If non-blocking and there are no sources, just break */
1178        if (rt->sourcecount == 0) {
1179                return 0;
1180        }
1181
1182        return receive_from_sockets(rt);
1183}
1184
1185static streamsock_t *select_next_packet(recvstream_t *rt) {
1186        int i;
1187        streamsock_t *ssock = NULL;
1188        uint64_t earliest = 0;
1189        uint64_t currentts = 0;
1190        dag_record_t *daghdr;
1191
1192        /* If we only have one source, then no need to do any
1193         * timestamp parsing or byteswapping.
1194         */
1195        if (rt->sourcecount == 1) {
1196                if (readable_data(&(rt->sources[0]))) {
1197                        return &(rt->sources[0]);
1198                }
1199                return NULL;
1200        }
1201
1202
1203        for (i = 0; i < rt->sourcecount; i ++) {
1204                if (!readable_data(&(rt->sources[i]))) {
1205                        continue;
1206                }
1207
1208                if (rt->sources[i].nextts == 0) {
1209                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1210                        currentts = bswap_le_to_host64(daghdr->ts);
1211                        rt->sources[i].nextts = currentts;
1212                } else {
1213                        currentts = rt->sources[i].nextts;
1214                }
1215
1216                if (earliest == 0 || earliest > currentts) {
1217                        earliest = currentts;
1218                        ssock = &(rt->sources[i]);
1219                }
1220        }
1221        return ssock;
1222}
1223
1224static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1225
1226        int rem, ret;
1227        streamsock_t *nextavail = NULL;
1228        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1229                        packet);
1230
1231        if (rem <= 0) {
1232                return rem;
1233        }
1234
1235        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1236        if (nextavail == NULL) {
1237                return 0;
1238        }
1239
1240        /* nextread should point at an ERF header, so prepare 'packet' to be
1241         * a libtrace ERF packet. */
1242
1243        ret = ndag_prepare_packet_stream(libtrace,
1244                        &(FORMAT_DATA->receivers[0]), nextavail,
1245                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1246        nextavail->bufavail += nextavail->bufwaiting;
1247        nextavail->bufwaiting = 0;
1248        return ret;
1249}
1250
1251static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1252                libtrace_packet_t **packets, size_t nb_packets) {
1253
1254        recvstream_t *rt;
1255        int rem, i;
1256        size_t read_packets = 0;
1257        streamsock_t *nextavail = NULL;
1258
1259        rt = (recvstream_t *)t->format_data;
1260
1261
1262        do {
1263                /* Only check for messages once per batch */
1264                if (read_packets == 0) {
1265                        rem = receive_encap_records_block(libtrace, rt,
1266                                packets[read_packets]);
1267                } else {
1268                        rem = receive_encap_records_nonblock(libtrace, rt,
1269                                packets[read_packets]);
1270                }
1271
1272                if (rem < 0) {
1273                        return rem;
1274                }
1275
1276                if (rem == 0) {
1277                        break;
1278                }
1279
1280                nextavail = select_next_packet(rt);
1281                if (nextavail == NULL) {
1282                        break;
1283                }
1284
1285                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1286                                packets[read_packets],
1287                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1288
1289                read_packets  ++;
1290                if (read_packets >= nb_packets) {
1291                        break;
1292                }
1293        } while (1);
1294
1295        for (i = 0; i < rt->sourcecount; i++) {
1296                streamsock_t *src = &(rt->sources[i]);
1297                src->bufavail += src->bufwaiting;
1298                src->bufwaiting = 0;
1299                if (src->bufavail < 0 || src->bufavail > ENCAP_BUFFERS) {
1300                        trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in ndag_pread_packets()");
1301                        return -1;
1302                }
1303        }
1304
1305        return read_packets;
1306
1307}
1308
1309static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1310                libtrace_packet_t *packet) {
1311
1312
1313        libtrace_eventobj_t event = {0,0,0.0,0};
1314        int rem, i;
1315        streamsock_t *nextavail = NULL;
1316
1317        /* Only check for messages once per call */
1318        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1319        if (rem <= 0) {
1320                event.type = TRACE_EVENT_TERMINATE;
1321                return event;
1322        }
1323
1324        do {
1325                rem = receive_encap_records_nonblock(libtrace,
1326                                &(FORMAT_DATA->receivers[0]), packet);
1327
1328                if (rem < 0) {
1329                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1330                                "Received invalid nDAG records.");
1331                        event.type = TRACE_EVENT_TERMINATE;
1332                        break;
1333                }
1334
1335                if (rem == 0) {
1336                        /* Either we've been halted or we've got no packets
1337                         * right now. */
1338                        if (is_halted(libtrace) == 0) {
1339                                event.type = TRACE_EVENT_TERMINATE;
1340                                break;
1341                        }
1342                        event.type = TRACE_EVENT_SLEEP;
1343                        event.seconds = 0.0001;
1344                        break;
1345                }
1346
1347                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1348                if (nextavail == NULL) {
1349                        event.type = TRACE_EVENT_SLEEP;
1350                        event.seconds = 0.0001;
1351                        break;
1352                }
1353
1354                event.type = TRACE_EVENT_PACKET;
1355                ndag_prepare_packet_stream(libtrace,
1356                                &(FORMAT_DATA->receivers[0]), nextavail,
1357                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1358                event.size = trace_get_capture_length(packet) +
1359                                trace_get_framing_length(packet);
1360
1361                if (libtrace->filter) {
1362                        int filtret = trace_apply_filter(libtrace->filter,
1363                                        packet);
1364                        if (filtret == -1) {
1365                                trace_set_err(libtrace,
1366                                                TRACE_ERR_BAD_FILTER,
1367                                                "Bad BPF Filter");
1368                                event.type = TRACE_EVENT_TERMINATE;
1369                                break;
1370                        }
1371
1372                        if (filtret == 0) {
1373                                /* Didn't match filter, try next one */
1374                                libtrace->filtered_packets ++;
1375                                trace_clear_cache(packet);
1376                                continue;
1377                        }
1378                }
1379
1380                if (libtrace->snaplen > 0) {
1381                        trace_set_capture_length(packet, libtrace->snaplen);
1382                }
1383                libtrace->accepted_packets ++;
1384                break;
1385        } while (1);
1386
1387        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1388                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1389                src->bufavail += src->bufwaiting;
1390                src->bufwaiting = 0;
1391                if (src->bufavail < 0 || src->bufavail > ENCAP_BUFFERS) {
1392                        trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in trace_event_ndag()");
1393                        break;
1394                }
1395        }
1396
1397        return event;
1398}
1399
1400static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1401
1402        int i;
1403
1404        stat->dropped_valid = 1;
1405        stat->dropped = 0;
1406        stat->received_valid = 1;
1407        stat->received = 0;
1408        stat->missing_valid = 1;
1409        stat->missing = 0;
1410
1411        /* TODO Is this thread safe? */
1412        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1413                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1414                stat->received += FORMAT_DATA->receivers[i].received_packets;
1415                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1416        }
1417
1418}
1419
1420static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1421                libtrace_stat_t *stat) {
1422
1423        recvstream_t *recvr = (recvstream_t *)t->format_data;
1424
1425        if (libtrace == NULL)
1426                return;
1427        /* TODO Is this thread safe */
1428        stat->dropped_valid = 1;
1429        stat->dropped = recvr->dropped_upstream;
1430
1431        stat->received_valid = 1;
1432        stat->received = recvr->received_packets;
1433
1434        stat->missing_valid = 1;
1435        stat->missing = recvr->missing_records;
1436
1437}
1438
1439static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1440                bool reader) {
1441        recvstream_t *recvr;
1442
1443        if (!reader || t->type != THREAD_PERPKT) {
1444                return 0;
1445        }
1446
1447        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1448        t->format_data = recvr;
1449
1450        return 0;
1451}
1452
1453static struct libtrace_format_t ndag = {
1454
1455        "ndag",
1456        "",
1457        TRACE_FORMAT_NDAG,
1458        NULL,                   /* probe filename */
1459        NULL,                   /* probe magic */
1460        ndag_init_input,        /* init_input */
1461        NULL,                   /* config_input */
1462        ndag_start_input,       /* start_input */
1463        ndag_pause_input,       /* pause_input */
1464        NULL,                   /* init_output */
1465        NULL,                   /* config_output */
1466        NULL,                   /* start_output */
1467        ndag_fin_input,         /* fin_input */
1468        NULL,                   /* fin_output */
1469        ndag_read_packet,       /* read_packet */
1470        ndag_prepare_packet,    /* prepare_packet */
1471        NULL,                   /* fin_packet */
1472        NULL,                   /* write_packet */
1473        NULL,                   /* flush_output */
1474        erf_get_link_type,      /* get_link_type */
1475        erf_get_direction,      /* get_direction */
1476        erf_set_direction,      /* set_direction */
1477        erf_get_erf_timestamp,  /* get_erf_timestamp */
1478        NULL,                   /* get_timeval */
1479        NULL,                   /* get_seconds */
1480        NULL,                   /* get_timespec */
1481        NULL,                   /* seek_erf */
1482        NULL,                   /* seek_timeval */
1483        NULL,                   /* seek_seconds */
1484        erf_get_capture_length, /* get_capture_length */
1485        erf_get_wire_length,    /* get_wire_length */
1486        erf_get_framing_length, /* get_framing_length */
1487        erf_set_capture_length, /* set_capture_length */
1488        NULL,                   /* get_received_packets */
1489        NULL,                   /* get_filtered_packets */
1490        NULL,                   /* get_dropped_packets */
1491        ndag_get_statistics,    /* get_statistics */
1492        NULL,                   /* get_fd */
1493        trace_event_ndag,       /* trace_event */
1494        NULL,                   /* help */
1495        NULL,                   /* next pointer */
1496        {true, 0},              /* live packet capture */
1497        ndag_pstart_input,      /* parallel start */
1498        ndag_pread_packets,     /* parallel read */
1499        ndag_pause_input,       /* parallel pause */
1500        NULL,
1501        ndag_pregister_thread,  /* register thread */
1502        NULL,
1503        ndag_get_thread_stats   /* per-thread stats */
1504};
1505
1506void ndag_constructor(void) {
1507        register_format(&ndag);
1508}
Note: See TracBrowser for help on using the repository browser.