source: lib/format_ndag.c @ 25a3255

develop
Last change on this file since 25a3255 was 25a3255, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

More assertion cleanup

  • Property mode set to 100644
File size: 47.5 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <assert.h>
38#include <errno.h>
39#include <fcntl.h>
40#include <stdio.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <net/if.h>
45#include <sys/types.h>
46#include <sys/socket.h>
47#include <netdb.h>
48
49#include "format_ndag.h"
50
51#define NDAG_IDLE_TIMEOUT (600)
52#define ENCAP_BUFSIZE (10000)
53#define CTRL_BUF_SIZE (10000)
54#define ENCAP_BUFFERS (1000)
55
56#define RECV_BATCH_SIZE (50)
57
58#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
59
60static struct libtrace_format_t ndag;
61
62volatile int ndag_paused = 0;
63
64typedef struct monitor {
65        uint16_t monitorid;
66        uint64_t laststart;
67} ndag_monitor_t;
68
69
70typedef struct streamsource {
71        uint16_t monitor;
72        char *groupaddr;
73        char *localiface;
74        uint16_t port;
75} streamsource_t;
76
77typedef struct streamsock {
78        char *groupaddr;
79        int sock;
80        struct addrinfo *srcaddr;
81        uint16_t port;
82        uint32_t expectedseq;
83        ndag_monitor_t *monitorptr;
84        char **saved;
85        char *nextread;
86        int nextreadind;
87        int nextwriteind;
88        int savedsize[ENCAP_BUFFERS];
89        uint64_t nextts;
90        uint32_t startidle;
91        uint64_t recordcount;
92
93        int bufavail;
94        int bufwaiting;
95
96#if HAVE_DECL_RECVMMSG
97        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
98#else
99        struct msghdr singlemsg;
100#endif
101
102} streamsock_t;
103
104typedef struct recvstream {
105        streamsock_t *sources;
106        uint16_t sourcecount;
107        libtrace_message_queue_t mqueue;
108        int threadindex;
109        ndag_monitor_t *knownmonitors;
110        uint16_t monitorcount;
111
112        uint64_t dropped_upstream;
113        uint64_t missing_records;
114        uint64_t received_packets;
115
116        int maxfd;
117} recvstream_t;
118
119typedef struct ndag_format_data {
120        char *multicastgroup;
121        char *portstr;
122        char *localiface;
123        uint16_t nextthreadid;
124        recvstream_t *receivers;
125
126        pthread_t controlthread;
127        libtrace_message_queue_t controlqueue;
128} ndag_format_data_t;
129
130enum {
131        NDAG_CLIENT_HALT = 0x01,
132        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
133        NDAG_CLIENT_NEWGROUP = 0x03
134};
135
136typedef struct ndagreadermessage {
137        uint8_t type;
138        streamsource_t contents;
139} ndag_internal_message_t;
140
141
142static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
143
144        /* Calculate seq_a - seq_b, taking wraparound into account */
145        if (seq_a == seq_b) return 0;
146
147        if (seq_a > seq_b) {
148                return (int) (seq_a - seq_b);
149        }
150
151        /* -1 for the wrap and another -1 because we don't use zero */
152        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
153}
154
155static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
156        ndag_common_t *header = (ndag_common_t *)msgbuf;
157
158        if (msgsize < sizeof(ndag_common_t)) {
159                fprintf(stderr,
160                        "nDAG message does not have a complete nDAG header.\n");
161                return 0;
162        }
163
164        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
165                fprintf(stderr,
166                        "nDAG message does not have a valid magic number.\n");
167                return 0;
168        }
169
170        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
171                fprintf(stderr,
172                        "nDAG message has an invalid header version: %u\n",
173                                header->version);
174                return 0;
175        }
176
177        return header->type;
178}
179
180static int join_multicast_group(char *groupaddr, char *localiface,
181        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
182
183        struct addrinfo hints;
184        struct addrinfo *gotten;
185        struct addrinfo *group;
186        unsigned int interface;
187        char pstr[16];
188        struct group_req greq;
189        int bufsize, val;
190
191        int sock;
192
193        if (portstr == NULL) {
194                snprintf(pstr, 15, "%u", portnum);
195                portstr = pstr;
196        }
197
198        interface = if_nametoindex(localiface);
199        if (interface == 0) {
200                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
201                                localiface, strerror(errno));
202                return -1;
203        }
204
205        hints.ai_family = PF_UNSPEC;
206        hints.ai_socktype = SOCK_DGRAM;
207        hints.ai_flags = AI_PASSIVE;
208        hints.ai_protocol = 0;
209
210        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
211                fprintf(stderr,
212                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
213                                portstr, strerror(errno));
214                return -1;
215        }
216
217        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
218                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
219                                groupaddr, strerror(errno));
220                return -1;
221        }
222
223        *srcinfo = gotten;
224        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
225        if (sock < 0) {
226                fprintf(stderr,
227                        "Failed to create multicast socket for %s:%s -- %s\n",
228                                groupaddr, portstr, strerror(errno));
229                goto sockcreateover;
230        }
231
232        val = 1;
233        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
234                fprintf(stderr,
235                        "Failed to set REUSEADDR socket option for %s:%s -- %s\n",
236                                groupaddr, portstr, strerror(errno));
237                goto sockcreateover;
238        }
239
240        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
241        {
242                fprintf(stderr,
243                        "Failed to bind to multicast socket %s:%s -- %s\n",
244                                groupaddr, portstr, strerror(errno));
245                sock = -1;
246                goto sockcreateover;
247        }
248
249        greq.gr_interface = interface;
250        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
251
252        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
253                        sizeof(greq)) < 0) {
254                fprintf(stderr,
255                        "Failed to join multicast group %s:%s -- %s\n",
256                                groupaddr, portstr, strerror(errno));
257                close(sock);
258                sock = -1;
259                goto sockcreateover;
260        }
261
262        bufsize = 16 * 1024 * 1024;
263        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
264                                (socklen_t)sizeof(int)) < 0) {
265
266                fprintf(stderr,
267                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
268                                groupaddr, portstr, strerror(errno));
269                close(sock);
270                sock = -1;
271                goto sockcreateover;
272        }
273
274sockcreateover:
275        freeaddrinfo(group);
276        return sock;
277}
278
279
280static int ndag_init_input(libtrace_t *libtrace) {
281
282        char *scan = NULL;
283        char *next = NULL;
284
285        libtrace->format_data = (ndag_format_data_t *)malloc(
286                        sizeof(ndag_format_data_t));
287
288        if (!libtrace->format_data) {
289                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory ndag_init_input()");
290                return -1;
291        }
292
293        FORMAT_DATA->multicastgroup = NULL;
294        FORMAT_DATA->portstr = NULL;
295        FORMAT_DATA->localiface = NULL;
296        FORMAT_DATA->nextthreadid = 0;
297        FORMAT_DATA->receivers = NULL;
298
299        scan = strchr(libtrace->uridata, ',');
300        if (scan == NULL) {
301                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
302                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
303                return -1;
304        }
305        FORMAT_DATA->localiface = strndup(libtrace->uridata,
306                        (size_t)(scan - libtrace->uridata));
307        next = scan + 1;
308
309        scan = strchr(next, ',');
310        if (scan == NULL) {
311                FORMAT_DATA->portstr = strdup("9001");
312                FORMAT_DATA->multicastgroup = strdup(next);
313        } else {
314                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
315
316                FORMAT_DATA->portstr = strdup(scan + 1);
317        }
318        return 0;
319}
320
321static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
322                uint16_t portnum, uint16_t monid) {
323
324        ndag_internal_message_t alert;
325
326        alert.type = NDAG_CLIENT_NEWGROUP;
327        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
328        alert.contents.localiface = FORMAT_DATA->localiface;
329        alert.contents.port = portnum;
330        alert.contents.monitor = monid;
331
332        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
333                        (void *)&alert);
334
335}
336
337static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
338                int msgsize, uint16_t *ptmap) {
339
340        int i;
341        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
342        uint8_t msgtype;
343
344        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
345        if (msgtype == 0) {
346                return -1;
347        }
348
349        msgsize -= sizeof(ndag_common_t);
350        if (msgtype == NDAG_PKT_BEACON) {
351                /* If message is a beacon, make sure every port included in the
352                 * beacon is assigned to a receive thread.
353                 */
354                uint16_t *ptr, numstreams;
355
356                if ((uint32_t)msgsize < sizeof(uint16_t)) {
357                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
358                        return -1;
359                }
360
361                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
362                numstreams = ntohs(*ptr);
363                ptr ++;
364
365                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
366                {
367                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
368                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
369                        return -1;
370                }
371
372                for (i = 0; i < numstreams; i++) {
373                        uint16_t streamport = ntohs(*ptr);
374
375                        if (ptmap[streamport] == 0xffff) {
376                                new_group_alert(libtrace,
377                                        FORMAT_DATA->nextthreadid, streamport,
378                                        ntohs(ndaghdr->monitorid));
379
380                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
381
382                                if (libtrace->perpkt_thread_count == 0) {
383                                        FORMAT_DATA->nextthreadid = 0;
384                                } else {
385                                        FORMAT_DATA->nextthreadid =
386                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
387                                }
388                        }
389
390                        ptr ++;
391                }
392        } else {
393                fprintf(stderr,
394                        "Unexpected message type on control channel: %u\n",
395                         msgtype);
396                return -1;
397        }
398
399        return 0;
400
401}
402
403static void *ndag_controller_run(void *tdata) {
404
405        libtrace_t *libtrace = (libtrace_t *)tdata;
406        uint16_t ptmap[65536];
407        int sock = -1;
408        struct addrinfo *receiveaddr = NULL;
409        fd_set listening;
410        struct timeval timeout;
411
412        /* ptmap is a dirty hack to allow us to quickly check if we've already
413         * assigned a stream to a thread.
414         */
415        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
416
417        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
418                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
419                        &receiveaddr);
420        if (sock == -1) {
421                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
422                        "Unable to join multicast group for nDAG control channel");
423                trace_interrupt();
424                pthread_exit(NULL);
425        }
426
427        ndag_paused = 0;
428        while ((is_halted(libtrace) == -1) && !ndag_paused) {
429                int ret;
430                char buf[CTRL_BUF_SIZE];
431
432                FD_ZERO(&listening);
433                FD_SET(sock, &listening);
434
435                timeout.tv_sec = 0;
436                timeout.tv_usec = 500000;
437
438                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
439                if (ret < 0) {
440                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
441                        break;
442                }
443
444                if (!FD_ISSET(sock, &listening)) {
445                        continue;
446                }
447
448                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
449                                receiveaddr->ai_addr,
450                                &(receiveaddr->ai_addrlen));
451                if (ret < 0) {
452                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
453                        break;
454                }
455
456                if (ret == 0) {
457                        break;
458                }
459
460                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
461                        fprintf(stderr, "Error while parsing nDAG control message.\n");
462                        continue;
463                }
464        }
465
466        if (sock >= 0) {
467                close(sock);
468        }
469
470        /* Control channel has fallen over, should probably encourage libtrace
471         * to halt the receiver threads as well.
472         */
473        if (!is_halted(libtrace)) {
474                trace_interrupt();
475        }
476
477        pthread_exit(NULL);
478}
479
480static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
481{
482        int ret;
483        uint32_t i;
484        /* Configure the set of receiver threads */
485
486        if (FORMAT_DATA->receivers == NULL) {
487                /* What if the number of threads changes between a pause and
488                 * a restart? Can this happen? */
489                FORMAT_DATA->receivers = (recvstream_t *)
490                                malloc(sizeof(recvstream_t) * maxthreads);
491        }
492
493        for (i = 0; i < maxthreads; i++) {
494                FORMAT_DATA->receivers[i].sources = NULL;
495                FORMAT_DATA->receivers[i].sourcecount = 0;
496                FORMAT_DATA->receivers[i].knownmonitors = NULL;
497                FORMAT_DATA->receivers[i].monitorcount = 0;
498                FORMAT_DATA->receivers[i].threadindex = i;
499                FORMAT_DATA->receivers[i].dropped_upstream = 0;
500                FORMAT_DATA->receivers[i].received_packets = 0;
501                FORMAT_DATA->receivers[i].missing_records = 0;
502                FORMAT_DATA->receivers[i].maxfd = -1;
503
504                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
505                                sizeof(ndag_internal_message_t));
506        }
507
508        /* Start the controller thread */
509        /* TODO consider affinity of this thread? */
510
511        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
512                        ndag_controller_run, libtrace);
513        if (ret != 0) {
514                return -1;
515        }
516        return maxthreads;
517}
518
519static int ndag_start_input(libtrace_t *libtrace) {
520        return ndag_start_threads(libtrace, 1);
521}
522
523static int ndag_pstart_input(libtrace_t *libtrace) {
524        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
525                        libtrace->perpkt_thread_count)
526                return 0;
527        return -1;
528}
529
530static void halt_ndag_receiver(recvstream_t *receiver) {
531        int j, i;
532        libtrace_message_queue_destroy(&(receiver->mqueue));
533
534        if (receiver->sources == NULL)
535                return;
536        for (i = 0; i < receiver->sourcecount; i++) {
537                streamsock_t src = receiver->sources[i];
538                if (src.saved) {
539                        for (j = 0; j < ENCAP_BUFFERS; j++) {
540                                if (src.saved[j]) {
541                                        free(src.saved[j]);
542                                }
543                        }
544                        free(src.saved);
545                }
546
547#if HAVE_DECL_RECVMMSG
548                for (j = 0; j < RECV_BATCH_SIZE; j++) {
549                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
550                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
551                        }
552                }
553#else
554                free(src.singlemsg.msg_iov);
555#endif
556
557                if (src.sock != -1) {
558                        close(src.sock);
559                }
560        }
561        if (receiver->knownmonitors) {
562                free(receiver->knownmonitors);
563        }
564
565        if (receiver->sources) {
566                free(receiver->sources);
567        }
568}
569
570static int ndag_pause_input(libtrace_t *libtrace) {
571        int i;
572
573        /* Close the existing receiver sockets */
574        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
575               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
576        }
577        return 0;
578}
579
580static int ndag_fin_input(libtrace_t *libtrace) {
581
582        if (FORMAT_DATA->receivers) {
583                free(FORMAT_DATA->receivers);
584        }
585        if (FORMAT_DATA->multicastgroup) {
586                free(FORMAT_DATA->multicastgroup);
587        }
588        if (FORMAT_DATA->portstr) {
589                free(FORMAT_DATA->portstr);
590        }
591        if (FORMAT_DATA->localiface) {
592                free(FORMAT_DATA->localiface);
593        }
594
595        free(libtrace->format_data);
596        return 0;
597}
598
599static int ndag_prepare_packet_stream(libtrace_t *libtrace,
600                recvstream_t *rt,
601                streamsock_t *ssock, libtrace_packet_t *packet,
602                uint32_t flags) {
603
604        dag_record_t *erfptr;
605        ndag_encap_t *encaphdr;
606        uint16_t ndag_reccount = 0;
607        int nr;
608        uint16_t rlen;
609
610        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
611                packet->buf_control = TRACE_CTRL_PACKET;
612        } else {
613                packet->buf_control = TRACE_CTRL_EXTERNAL;
614        }
615
616        packet->trace = libtrace;
617        packet->buffer = ssock->nextread;
618        packet->header = ssock->nextread;
619        packet->type = TRACE_RT_DATA_ERF;
620
621        erfptr = (dag_record_t *)packet->header;
622
623        if (erfptr->flags.rxerror == 1) {
624                packet->payload = NULL;
625                erfptr->rlen = htons(erf_get_framing_length(packet));
626        } else {
627                packet->payload = (char *)packet->buffer +
628                                erf_get_framing_length(packet);
629        }
630
631        /* Update upstream drops using lctr */
632
633        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
634                /* TODO */
635        } else {
636                if (rt->received_packets > 0) {
637                        rt->dropped_upstream += ntohs(erfptr->lctr);
638                }
639        }
640
641        rt->received_packets ++;
642        ssock->recordcount += 1;
643
644        nr = ssock->nextreadind;
645        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
646                        sizeof(ndag_common_t));
647
648        ndag_reccount = ntohs(encaphdr->recordcount);
649        if ((ndag_reccount & 0x8000) != 0) {
650                /* Record was truncated -- update rlen appropriately */
651                rlen = ssock->savedsize[nr] -
652                                (ssock->nextread - ssock->saved[nr]);
653                erfptr->rlen = htons(rlen);
654        } else {
655                rlen = ntohs(erfptr->rlen);
656        }
657        ssock->nextread += rlen;
658        ssock->nextts = 0;
659
660        /*assert(ssock->nextread - ssock->saved[nr] <= ssock->savedsize[nr]);*/
661        if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) {
662                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Failed to prepare packet stream in ndag_prepare_packet_stream()");
663                return -1;
664        }
665
666        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
667                /* Read everything from this buffer, mark as empty and
668                 * move on. */
669                ssock->savedsize[nr] = 0;
670                ssock->bufwaiting ++;
671
672                nr ++;
673                if (nr == ENCAP_BUFFERS) {
674                        nr = 0;
675                }
676                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
677                                sizeof(ndag_encap_t);
678                ssock->nextreadind = nr;
679        }
680
681        packet->order = erf_get_erf_timestamp(packet);
682        packet->error = rlen;
683        return rlen;
684}
685
686static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
687                libtrace_packet_t *packet UNUSED,
688                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
689                uint32_t flags UNUSED) {
690
691        /*assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");*/
692        fprintf(stderr, "Sending nDAG records over RT doesn't make sense! Please stop\n");
693        return 0;
694
695}
696
697static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
698
699        ndag_monitor_t *mon;
700
701        if (rt->monitorcount == 0) {
702                rt->knownmonitors = (ndag_monitor_t *)
703                                malloc(sizeof(ndag_monitor_t) * 5);
704        } else {
705                rt->knownmonitors = (ndag_monitor_t *)
706                            realloc(rt->knownmonitors,
707                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
708        }
709
710        mon = &(rt->knownmonitors[rt->monitorcount]);
711        mon->monitorid = monid;
712        mon->laststart = 0;
713
714        rt->monitorcount ++;
715        return mon;
716}
717
718static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
719
720        streamsock_t *ssock = NULL;
721        ndag_monitor_t *mon = NULL;
722        int i;
723
724        /* TODO consider replacing this with a list or vector so we can
725         * easily remove sources that are no longer in use, rather than
726         * just setting the sock to -1 and having to check them every
727         * time we want to read a packet.
728         */
729        if (rt->sourcecount == 0) {
730                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
731        } else if ((rt->sourcecount % 10) == 0) {
732                rt->sources = (streamsock_t *)realloc(rt->sources,
733                        sizeof(streamsock_t) * (rt->sourcecount + 10));
734        }
735
736        ssock = &(rt->sources[rt->sourcecount]);
737
738        for (i = 0; i < rt->monitorcount; i++) {
739                if (rt->knownmonitors[i].monitorid == src.monitor) {
740                        mon = &(rt->knownmonitors[i]);
741                        break;
742                }
743        }
744
745        if (mon == NULL) {
746                mon = add_new_knownmonitor(rt, src.monitor);
747        }
748
749        ssock->port = src.port;
750        ssock->groupaddr = src.groupaddr;
751        ssock->expectedseq = 0;
752        ssock->monitorptr = mon;
753        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
754        ssock->bufavail = ENCAP_BUFFERS;
755        ssock->bufwaiting = 0;
756        ssock->startidle = 0;
757        ssock->nextts = 0;
758
759        for (i = 0; i < ENCAP_BUFFERS; i++) {
760                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
761                ssock->savedsize[i] = 0;
762        }
763
764        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
765                        NULL, src.port, &(ssock->srcaddr));
766
767        if (ssock->sock < 0) {
768                return -1;
769        }
770
771        if (ssock->sock > rt->maxfd) {
772                rt->maxfd = ssock->sock;
773        }
774
775#if HAVE_DECL_RECVMMSG
776        for (i = 0; i < RECV_BATCH_SIZE; i++) {
777                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
778                                malloc(sizeof(struct iovec));
779                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
780                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
781                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
782                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
783                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
784                ssock->mmsgbufs[i].msg_len = 0;
785        }
786#else
787        ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec));
788#endif
789
790        ssock->nextread = NULL;;
791        ssock->nextreadind = 0;
792        ssock->nextwriteind = 0;
793        ssock->recordcount = 0;
794        rt->sourcecount += 1;
795
796        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
797                        ssock->groupaddr, ssock->port, rt->threadindex);
798
799        return ssock->port;
800}
801
802static int receiver_read_messages(recvstream_t *rt) {
803
804        ndag_internal_message_t msg;
805
806        while (libtrace_message_queue_try_get(&(rt->mqueue),
807                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
808                switch(msg.type) {
809                        case NDAG_CLIENT_NEWGROUP:
810                                if (add_new_streamsock(rt, msg.contents) < 0) {
811                                        return -1;
812                                }
813                                break;
814                        case NDAG_CLIENT_HALT:
815                                return 0;
816                }
817        }
818        return 1;
819
820}
821
822static inline int readable_data(streamsock_t *ssock) {
823
824        if (ssock->sock == -1) {
825                return 0;
826        }
827        if (ssock->savedsize[ssock->nextreadind] == 0) {
828                return 0;
829        }
830        /*
831        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
832                        ssock->savedsize[ssock->nextreadind]) {
833                return 0;
834        }
835        */
836        return 1;
837
838
839}
840
841static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
842
843        int i;
844        for (i = 0; i < rt->sourcecount; i++) {
845                if (rt->sources[i].monitorptr == mon) {
846                        rt->sources[i].expectedseq = 0;
847                }
848        }
849
850}
851
852static int init_receivers(streamsock_t *ssock, int required) {
853
854        int wind = ssock->nextwriteind;
855        int i = 1;
856
857#if HAVE_DECL_RECVMMSG
858        for (i = 0; i < required; i++) {
859                if (i >= RECV_BATCH_SIZE) {
860                        break;
861                }
862
863                if (wind >= ENCAP_BUFFERS) {
864                        wind = 0;
865                }
866
867                ssock->mmsgbufs[i].msg_len = 0;
868                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
869                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
870                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
871
872                wind ++;
873        }
874#else
875        /*assert(required > 0);*/
876        if (required <= 0) {
877                fprintf(stderr, "You are required to have atleast 1 receiver in init_reveivers\n");
878                return TRACE_ERR_INIT_FAILED;
879        }
880        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
881        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
882        ssock->singlemsg.msg_iovlen = 1;
883#endif
884        return i;
885}
886
887static int check_ndag_received(streamsock_t *ssock, int index,
888                unsigned int msglen, recvstream_t *rt) {
889
890        ndag_encap_t *encaphdr;
891        ndag_monitor_t *mon;
892        uint8_t rectype;
893
894        /* Check that we have a valid nDAG encap record */
895        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
896
897        if (rectype == NDAG_PKT_KEEPALIVE) {
898                /* Keep-alive, reset startidle and carry on. Don't
899                 * change nextwrite -- we want to overwrite the
900                 * keep-alive with usable content. */
901                return 0;
902        } else if (rectype != NDAG_PKT_ENCAPERF) {
903                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
904                                ssock->groupaddr, ssock->port);
905                close(ssock->sock);
906                ssock->sock = -1;
907                return -1;
908        }
909
910        ssock->savedsize[index] = msglen;
911        ssock->nextwriteind ++;
912        ssock->bufavail --;
913
914        /*assert(ssock->bufavail >= 0);*/
915        if (ssock->bufavail < 0) {
916                fprintf(stderr, "No space in buffer in check_ndag_received()\n");
917                return -1;
918        }
919        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
920                ssock->nextwriteind = 0;
921        }
922
923        /* Get the useful info from the encap header */
924        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
925
926        mon = ssock->monitorptr;
927
928        if (mon->laststart == 0) {
929                mon->laststart = bswap_be_to_host64(encaphdr->started);
930        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
931                mon->laststart = bswap_be_to_host64(encaphdr->started);
932                reset_expected_seqs(rt, mon);
933
934                /* TODO what is a good way to indicate this to clients?
935                 * set the loss counter in the ERF header? a bit rude?
936                 * use another bit in the ERF header?
937                 * add a queryable flag to libtrace_packet_t?
938                 */
939
940        }
941
942        if (ssock->expectedseq != 0) {
943                rt->missing_records += seq_cmp(
944                                ntohl(encaphdr->seqno), ssock->expectedseq);
945
946        }
947        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
948        if (ssock->expectedseq == 0) {
949                ssock->expectedseq ++;
950        }
951
952        if (ssock->nextread == NULL) {
953                /* If this is our first read, set up 'nextread'
954                 * by skipping past the nDAG headers */
955                ssock->nextread = ssock->saved[0] +
956                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
957        }
958        return 1;
959
960}
961
962static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
963                int *gottime, recvstream_t *rt) {
964
965        int ret, ndagstat, avail;
966        int toret = 0;
967
968#if HAVE_DECL_RECVMMSG
969        int i;
970#endif
971
972        avail = init_receivers(ssock, ssock->bufavail);
973
974#if HAVE_DECL_RECVMMSG
975        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
976                        MSG_DONTWAIT, NULL);
977#else
978        if (avail != 1) {
979                return 0;
980        }
981
982        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
983#endif
984        if (ret < 0) {
985                /* Nothing to receive right now, but we should still
986                 * count as 'ready' if at least one buffer is full */
987                if (errno == EAGAIN || errno == EWOULDBLOCK) {
988                        if (readable_data(ssock)) {
989                                toret = 1;
990                        }
991                        if (!(*gottime)) {
992                                gettimeofday(tv, NULL);
993                                *gottime = 1;
994                        }
995                        if (ssock->startidle == 0) {
996                                ssock->startidle = tv->tv_sec;
997                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
998                                fprintf(stderr,
999                                        "Closing channel %s:%u due to inactivity.\n",
1000                                        ssock->groupaddr,
1001                                        ssock->port);
1002
1003                                close(ssock->sock);
1004                                ssock->sock = -1;
1005                        }
1006                } else {
1007
1008                        fprintf(stderr,
1009                                "Error receiving encapsulated records from %s:%u -- %s \n",
1010                                ssock->groupaddr, ssock->port,
1011                                strerror(errno));
1012                        close(ssock->sock);
1013                        ssock->sock = -1;
1014                }
1015                return toret;
1016        }
1017
1018        ssock->startidle = 0;
1019
1020#if HAVE_DECL_RECVMMSG
1021        for (i = 0; i < ret; i++) {
1022                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
1023                                ssock->mmsgbufs[i].msg_len, rt);
1024                if (ndagstat == -1) {
1025                        break;
1026                }
1027
1028                if (ndagstat == 1) {
1029                        toret = 1;
1030                }
1031        }
1032#else
1033        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1034        if (ndagstat <= 0) {
1035                toret = 0;
1036        } else {
1037                toret = 1;
1038        }
1039#endif
1040
1041        return toret;
1042}
1043
1044static int receive_from_sockets(recvstream_t *rt) {
1045
1046        int i, readybufs, gottime;
1047        struct timeval tv;
1048        fd_set fds;
1049        int maxfd = 0;
1050        struct timeval zerotv;
1051
1052        readybufs = 0;
1053        gottime = 0;
1054
1055        FD_ZERO(&fds);
1056
1057        if (rt->maxfd == -1) {
1058                return 0;
1059        }
1060
1061        zerotv.tv_sec = 0;
1062        zerotv.tv_usec = 0;
1063
1064        for (i = 0; i < rt->sourcecount; i++) {
1065                if (rt->sources[i].sock == -1) {
1066                        continue;
1067                }
1068
1069#if HAVE_DECL_RECVMMSG
1070                /* Plenty of full buffers, just use the packets in those */
1071                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
1072                        readybufs ++;
1073                        continue;
1074                }
1075#else
1076                if (rt->sources[i].bufavail == 0) {
1077                        readybufs ++;
1078                        continue;
1079                }
1080#endif
1081                FD_SET(rt->sources[i].sock, &fds);
1082                if (maxfd < rt->sources[i].sock) {
1083                        maxfd = rt->sources[i].sock;
1084                }
1085        }
1086
1087
1088        if (maxfd <= 0) {
1089                return readybufs;
1090        }
1091
1092        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1093                /* log the error? XXX */
1094                return -1;
1095        }
1096
1097        for (i = 0; i < rt->sourcecount; i++) {
1098                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1099                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
1100                                readybufs ++;
1101                        }
1102                        continue;
1103                }
1104                readybufs += receive_from_single_socket(&(rt->sources[i]),
1105                                &tv, &gottime, rt);
1106        }
1107
1108        return readybufs;
1109
1110}
1111
1112
1113static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1114                libtrace_packet_t *packet) {
1115
1116        int iserr = 0;
1117
1118        if (packet->buf_control == TRACE_CTRL_PACKET) {
1119                free(packet->buffer);
1120                packet->buffer = NULL;
1121        }
1122
1123        do {
1124                /* Make sure we shouldn't be halting */
1125                if ((iserr = is_halted(libtrace)) != -1) {
1126                        return iserr;
1127                }
1128
1129                /* Check for any messages from the control thread */
1130                iserr = receiver_read_messages(rt);
1131
1132                if (iserr <= 0) {
1133                        return iserr;
1134                }
1135
1136                /* If blocking and no sources, sleep for a bit and then try
1137                 * checking for messages again.
1138                 */
1139                if (rt->sourcecount == 0) {
1140                        usleep(10000);
1141                        continue;
1142                }
1143
1144                if ((iserr = receive_from_sockets(rt)) < 0) {
1145                        return iserr;
1146                } else if (iserr > 0) {
1147                        /* At least one of our input sockets has available
1148                         * data, let's go ahead and use what we have. */
1149                        break;
1150                }
1151
1152                /* None of our sources have anything available, we can take
1153                 * a short break rather than immediately trying again.
1154                 */
1155                if (iserr == 0) {
1156                        usleep(100);
1157                }
1158
1159        } while (1);
1160
1161        return iserr;
1162}
1163
1164static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1165                libtrace_packet_t *packet) {
1166
1167        int iserr = 0;
1168
1169        if (packet->buf_control == TRACE_CTRL_PACKET) {
1170                free(packet->buffer);
1171                packet->buffer = NULL;
1172        }
1173
1174        /* Make sure we shouldn't be halting */
1175        if ((iserr = is_halted(libtrace)) != -1) {
1176                return iserr;
1177        }
1178
1179        /* If non-blocking and there are no sources, just break */
1180        if (rt->sourcecount == 0) {
1181                return 0;
1182        }
1183
1184        return receive_from_sockets(rt);
1185}
1186
1187static streamsock_t *select_next_packet(recvstream_t *rt) {
1188        int i;
1189        streamsock_t *ssock = NULL;
1190        uint64_t earliest = 0;
1191        uint64_t currentts = 0;
1192        dag_record_t *daghdr;
1193
1194        /* If we only have one source, then no need to do any
1195         * timestamp parsing or byteswapping.
1196         */
1197        if (rt->sourcecount == 1) {
1198                if (readable_data(&(rt->sources[0]))) {
1199                        return &(rt->sources[0]);
1200                }
1201                return NULL;
1202        }
1203
1204
1205        for (i = 0; i < rt->sourcecount; i ++) {
1206                if (!readable_data(&(rt->sources[i]))) {
1207                        continue;
1208                }
1209
1210                if (rt->sources[i].nextts == 0) {
1211                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1212                        currentts = bswap_le_to_host64(daghdr->ts);
1213                        rt->sources[i].nextts = currentts;
1214                } else {
1215                        currentts = rt->sources[i].nextts;
1216                }
1217
1218                if (earliest == 0 || earliest > currentts) {
1219                        earliest = currentts;
1220                        ssock = &(rt->sources[i]);
1221                }
1222        }
1223        return ssock;
1224}
1225
1226static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1227
1228        int rem, ret;
1229        streamsock_t *nextavail = NULL;
1230        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1231                        packet);
1232
1233        if (rem <= 0) {
1234                return rem;
1235        }
1236
1237        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1238        if (nextavail == NULL) {
1239                return 0;
1240        }
1241
1242        /* nextread should point at an ERF header, so prepare 'packet' to be
1243         * a libtrace ERF packet. */
1244
1245        ret = ndag_prepare_packet_stream(libtrace,
1246                        &(FORMAT_DATA->receivers[0]), nextavail,
1247                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1248        nextavail->bufavail += nextavail->bufwaiting;
1249        nextavail->bufwaiting = 0;
1250        return ret;
1251}
1252
1253static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1254                libtrace_packet_t **packets, size_t nb_packets) {
1255
1256        recvstream_t *rt;
1257        int rem, i;
1258        size_t read_packets = 0;
1259        streamsock_t *nextavail = NULL;
1260
1261        rt = (recvstream_t *)t->format_data;
1262
1263
1264        do {
1265                /* Only check for messages once per batch */
1266                if (read_packets == 0) {
1267                        rem = receive_encap_records_block(libtrace, rt,
1268                                packets[read_packets]);
1269                } else {
1270                        rem = receive_encap_records_nonblock(libtrace, rt,
1271                                packets[read_packets]);
1272                }
1273
1274                if (rem < 0) {
1275                        return rem;
1276                }
1277
1278                if (rem == 0) {
1279                        break;
1280                }
1281
1282                nextavail = select_next_packet(rt);
1283                if (nextavail == NULL) {
1284                        break;
1285                }
1286
1287                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1288                                packets[read_packets],
1289                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1290
1291                read_packets  ++;
1292                if (read_packets >= nb_packets) {
1293                        break;
1294                }
1295        } while (1);
1296
1297        for (i = 0; i < rt->sourcecount; i++) {
1298                streamsock_t *src = &(rt->sources[i]);
1299                src->bufavail += src->bufwaiting;
1300                src->bufwaiting = 0;
1301                /*assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);*/
1302                if (src->bufavail < 0 && src->bufavail > ENCAP_BUFFERS) {
1303                        trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in ndag_pread_packets()");
1304                        return -1;
1305                }
1306        }
1307
1308        return read_packets;
1309
1310}
1311
1312static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1313                libtrace_packet_t *packet) {
1314
1315
1316        libtrace_eventobj_t event = {0,0,0.0,0};
1317        int rem, i;
1318        streamsock_t *nextavail = NULL;
1319
1320        /* Only check for messages once per call */
1321        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1322        if (rem <= 0) {
1323                event.type = TRACE_EVENT_TERMINATE;
1324                return event;
1325        }
1326
1327        do {
1328                rem = receive_encap_records_nonblock(libtrace,
1329                                &(FORMAT_DATA->receivers[0]), packet);
1330
1331                if (rem < 0) {
1332                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1333                                "Received invalid nDAG records.");
1334                        event.type = TRACE_EVENT_TERMINATE;
1335                        break;
1336                }
1337
1338                if (rem == 0) {
1339                        /* Either we've been halted or we've got no packets
1340                         * right now. */
1341                        if (is_halted(libtrace) == 0) {
1342                                event.type = TRACE_EVENT_TERMINATE;
1343                                break;
1344                        }
1345                        event.type = TRACE_EVENT_SLEEP;
1346                        event.seconds = 0.0001;
1347                        break;
1348                }
1349
1350                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1351                if (nextavail == NULL) {
1352                        event.type = TRACE_EVENT_SLEEP;
1353                        event.seconds = 0.0001;
1354                        break;
1355                }
1356
1357                event.type = TRACE_EVENT_PACKET;
1358                ndag_prepare_packet_stream(libtrace,
1359                                &(FORMAT_DATA->receivers[0]), nextavail,
1360                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1361                event.size = trace_get_capture_length(packet) +
1362                                trace_get_framing_length(packet);
1363
1364                if (libtrace->filter) {
1365                        int filtret = trace_apply_filter(libtrace->filter,
1366                                        packet);
1367                        if (filtret == -1) {
1368                                trace_set_err(libtrace,
1369                                                TRACE_ERR_BAD_FILTER,
1370                                                "Bad BPF Filter");
1371                                event.type = TRACE_EVENT_TERMINATE;
1372                                break;
1373                        }
1374
1375                        if (filtret == 0) {
1376                                /* Didn't match filter, try next one */
1377                                libtrace->filtered_packets ++;
1378                                trace_clear_cache(packet);
1379                                continue;
1380                        }
1381                }
1382
1383                if (libtrace->snaplen > 0) {
1384                        trace_set_capture_length(packet, libtrace->snaplen);
1385                }
1386                libtrace->accepted_packets ++;
1387                break;
1388        } while (1);
1389
1390        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1391                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1392                src->bufavail += src->bufwaiting;
1393                src->bufwaiting = 0;
1394                /*assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);*/
1395                if (src->bufavail < 0 && src->bufavail > ENCAP_BUFFERS) {
1396                        trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in trace_event_ndag()");
1397                        break; /* breaking here cause error above also does? */
1398                }
1399        }
1400
1401        return event;
1402}
1403
1404static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1405
1406        int i;
1407
1408        stat->dropped_valid = 1;
1409        stat->dropped = 0;
1410        stat->received_valid = 1;
1411        stat->received = 0;
1412        stat->missing_valid = 1;
1413        stat->missing = 0;
1414
1415        /* TODO Is this thread safe? */
1416        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1417                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1418                stat->received += FORMAT_DATA->receivers[i].received_packets;
1419                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1420        }
1421
1422}
1423
1424static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1425                libtrace_stat_t *stat) {
1426
1427        recvstream_t *recvr = (recvstream_t *)t->format_data;
1428
1429        if (libtrace == NULL)
1430                return;
1431        /* TODO Is this thread safe */
1432        stat->dropped_valid = 1;
1433        stat->dropped = recvr->dropped_upstream;
1434
1435        stat->received_valid = 1;
1436        stat->received = recvr->received_packets;
1437
1438        stat->missing_valid = 1;
1439        stat->missing = recvr->missing_records;
1440
1441}
1442
1443static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1444                bool reader) {
1445        recvstream_t *recvr;
1446
1447        if (!reader || t->type != THREAD_PERPKT) {
1448                return 0;
1449        }
1450
1451        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1452        t->format_data = recvr;
1453
1454        return 0;
1455}
1456
1457static struct libtrace_format_t ndag = {
1458
1459        "ndag",
1460        "",
1461        TRACE_FORMAT_NDAG,
1462        NULL,                   /* probe filename */
1463        NULL,                   /* probe magic */
1464        ndag_init_input,        /* init_input */
1465        NULL,                   /* config_input */
1466        ndag_start_input,       /* start_input */
1467        ndag_pause_input,       /* pause_input */
1468        NULL,                   /* init_output */
1469        NULL,                   /* config_output */
1470        NULL,                   /* start_output */
1471        ndag_fin_input,         /* fin_input */
1472        NULL,                   /* fin_output */
1473        ndag_read_packet,       /* read_packet */
1474        ndag_prepare_packet,    /* prepare_packet */
1475        NULL,                   /* fin_packet */
1476        NULL,                   /* write_packet */
1477        NULL,                   /* flush_output */
1478        erf_get_link_type,      /* get_link_type */
1479        erf_get_direction,      /* get_direction */
1480        erf_set_direction,      /* set_direction */
1481        erf_get_erf_timestamp,  /* get_erf_timestamp */
1482        NULL,                   /* get_timeval */
1483        NULL,                   /* get_seconds */
1484        NULL,                   /* get_timespec */
1485        NULL,                   /* seek_erf */
1486        NULL,                   /* seek_timeval */
1487        NULL,                   /* seek_seconds */
1488        erf_get_capture_length, /* get_capture_length */
1489        erf_get_wire_length,    /* get_wire_length */
1490        erf_get_framing_length, /* get_framing_length */
1491        erf_set_capture_length, /* set_capture_length */
1492        NULL,                   /* get_received_packets */
1493        NULL,                   /* get_filtered_packets */
1494        NULL,                   /* get_dropped_packets */
1495        ndag_get_statistics,    /* get_statistics */
1496        NULL,                   /* get_fd */
1497        trace_event_ndag,       /* trace_event */
1498        NULL,                   /* help */
1499        NULL,                   /* next pointer */
1500        {true, 0},              /* live packet capture */
1501        ndag_pstart_input,      /* parallel start */
1502        ndag_pread_packets,     /* parallel read */
1503        ndag_pause_input,       /* parallel pause */
1504        NULL,
1505        ndag_pregister_thread,  /* register thread */
1506        NULL,
1507        ndag_get_thread_stats   /* per-thread stats */
1508};
1509
1510void ndag_constructor(void) {
1511        register_format(&ndag);
1512}
Note: See TracBrowser for help on using the repository browser.