source: lib/format_ndag.c @ 509ee47

develop
Last change on this file since 509ee47 was 509ee47, checked in by Shane Alcock <salcock@…>, 23 months ago

Add option to allow users to specify a constant ERF framing length.

This can be useful for situations where

a) the input uses ERF (or some derivative thereof)
b) the link type and ERF record type are constant for the

duration of the capture

c) performance is critical

This option allows users to simply tell libtrace what the ERF
framing length on every packet is going to be, so libtrace
doesn't have to repeatedly derive the framing length for each
packet it processes. At high packet rates, the time taken to
do this calculation can really add up and it makes no sense
to risk dropping packets because you're busy calculating a value
that is always a single constant value.

  • Property mode set to 100644
File size: 48.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <errno.h>
38#include <fcntl.h>
39#include <stdio.h>
40#include <string.h>
41#include <unistd.h>
42#include <stdlib.h>
43#include <net/if.h>
44#include <sys/types.h>
45#include <sys/socket.h>
46#include <netdb.h>
47
48#include "format_ndag.h"
49
50#define NDAG_IDLE_TIMEOUT (600)
51#define ENCAP_BUFSIZE (10000)
52#define CTRL_BUF_SIZE (10000)
53#define ENCAP_BUFFERS (1000)
54
55#define RECV_BATCH_SIZE (50)
56
57#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
58
59static struct libtrace_format_t ndag;
60
61volatile int ndag_paused = 0;
62
63typedef struct monitor {
64        uint16_t monitorid;
65        uint64_t laststart;
66} ndag_monitor_t;
67
68
69typedef struct streamsource {
70        uint16_t monitor;
71        char *groupaddr;
72        char *localiface;
73        uint16_t port;
74} streamsource_t;
75
76typedef struct streamsock {
77        char *groupaddr;
78        int sock;
79        struct addrinfo *srcaddr;
80        uint16_t port;
81        uint32_t expectedseq;
82        ndag_monitor_t *monitorptr;
83        char **saved;
84        char *nextread;
85        int nextreadind;
86        int nextwriteind;
87        int savedsize[ENCAP_BUFFERS];
88        uint64_t nextts;
89        uint32_t startidle;
90        uint64_t recordcount;
91
92        int bufavail;
93        int bufwaiting;
94
95#if HAVE_DECL_RECVMMSG
96        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
97#else
98        struct msghdr singlemsg;
99#endif
100
101} streamsock_t;
102
103typedef struct recvstream {
104        streamsock_t *sources;
105        uint16_t sourcecount;
106        libtrace_message_queue_t mqueue;
107        int threadindex;
108        ndag_monitor_t *knownmonitors;
109        uint16_t monitorcount;
110
111        uint64_t dropped_upstream;
112        uint64_t missing_records;
113        uint64_t received_packets;
114
115        int maxfd;
116} recvstream_t;
117
118typedef struct ndag_format_data {
119        char *multicastgroup;
120        char *portstr;
121        char *localiface;
122        uint16_t nextthreadid;
123        recvstream_t *receivers;
124
125        pthread_t controlthread;
126        libtrace_message_queue_t controlqueue;
127        int consterfframing;
128} ndag_format_data_t;
129
130enum {
131        NDAG_CLIENT_HALT = 0x01,
132        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
133        NDAG_CLIENT_NEWGROUP = 0x03
134};
135
136typedef struct ndagreadermessage {
137        uint8_t type;
138        streamsource_t contents;
139} ndag_internal_message_t;
140
141
142static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
143
144        /* Calculate seq_a - seq_b, taking wraparound into account */
145        if (seq_a == seq_b) return 0;
146
147        if (seq_a > seq_b) {
148                return (int) (seq_a - seq_b);
149        }
150
151        /* -1 for the wrap and another -1 because we don't use zero */
152        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
153}
154
155static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
156        ndag_common_t *header = (ndag_common_t *)msgbuf;
157
158        if (msgsize < sizeof(ndag_common_t)) {
159                fprintf(stderr,
160                        "nDAG message does not have a complete nDAG header.\n");
161                return 0;
162        }
163
164        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
165                fprintf(stderr,
166                        "nDAG message does not have a valid magic number.\n");
167                return 0;
168        }
169
170        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
171                fprintf(stderr,
172                        "nDAG message has an invalid header version: %u\n",
173                                header->version);
174                return 0;
175        }
176
177        return header->type;
178}
179
180static int join_multicast_group(char *groupaddr, char *localiface,
181        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
182
183        struct addrinfo hints;
184        struct addrinfo *gotten;
185        struct addrinfo *group;
186        unsigned int interface;
187        char pstr[16];
188        struct group_req greq;
189        int bufsize, val;
190
191        int sock;
192
193        if (portstr == NULL) {
194                snprintf(pstr, 15, "%u", portnum);
195                portstr = pstr;
196        }
197
198        interface = if_nametoindex(localiface);
199        if (interface == 0) {
200                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
201                                localiface, strerror(errno));
202                return -1;
203        }
204
205        hints.ai_family = PF_UNSPEC;
206        hints.ai_socktype = SOCK_DGRAM;
207        hints.ai_flags = AI_PASSIVE;
208        hints.ai_protocol = 0;
209
210        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
211                fprintf(stderr,
212                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
213                                portstr, strerror(errno));
214                return -1;
215        }
216
217        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
218                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
219                                groupaddr, strerror(errno));
220                return -1;
221        }
222
223        *srcinfo = gotten;
224        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
225        if (sock < 0) {
226                fprintf(stderr,
227                        "Failed to create multicast socket for %s:%s -- %s\n",
228                                groupaddr, portstr, strerror(errno));
229                goto sockcreateover;
230        }
231
232        val = 1;
233        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
234                fprintf(stderr,
235                        "Failed to set REUSEADDR socket option for %s:%s -- %s\n",
236                                groupaddr, portstr, strerror(errno));
237                goto sockcreateover;
238        }
239
240        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
241        {
242                fprintf(stderr,
243                        "Failed to bind to multicast socket %s:%s -- %s\n",
244                                groupaddr, portstr, strerror(errno));
245                sock = -1;
246                goto sockcreateover;
247        }
248
249        greq.gr_interface = interface;
250        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
251
252        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
253                        sizeof(greq)) < 0) {
254                fprintf(stderr,
255                        "Failed to join multicast group %s:%s -- %s\n",
256                                groupaddr, portstr, strerror(errno));
257                close(sock);
258                sock = -1;
259                goto sockcreateover;
260        }
261
262        bufsize = 16 * 1024 * 1024;
263        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
264                                (socklen_t)sizeof(int)) < 0) {
265
266                fprintf(stderr,
267                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
268                                groupaddr, portstr, strerror(errno));
269                close(sock);
270                sock = -1;
271                goto sockcreateover;
272        }
273
274sockcreateover:
275        freeaddrinfo(group);
276        return sock;
277}
278
279
280static int ndag_init_input(libtrace_t *libtrace) {
281
282        char *scan = NULL;
283        char *next = NULL;
284
285        libtrace->format_data = (ndag_format_data_t *)malloc(
286                        sizeof(ndag_format_data_t));
287
288        if (!libtrace->format_data) {
289                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
290                        "format data inside ndag_init_input()");
291                return -1;
292        }
293
294        FORMAT_DATA->multicastgroup = NULL;
295        FORMAT_DATA->portstr = NULL;
296        FORMAT_DATA->localiface = NULL;
297        FORMAT_DATA->nextthreadid = 0;
298        FORMAT_DATA->receivers = NULL;
299        FORMAT_DATA->consterfframing = -1;
300
301        scan = strchr(libtrace->uridata, ',');
302        if (scan == NULL) {
303                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
304                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
305                return -1;
306        }
307        FORMAT_DATA->localiface = strndup(libtrace->uridata,
308                        (size_t)(scan - libtrace->uridata));
309        next = scan + 1;
310
311        scan = strchr(next, ',');
312        if (scan == NULL) {
313                FORMAT_DATA->portstr = strdup("9001");
314                FORMAT_DATA->multicastgroup = strdup(next);
315        } else {
316                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
317
318                FORMAT_DATA->portstr = strdup(scan + 1);
319        }
320        return 0;
321}
322
323static int ndag_config_input(libtrace_t *libtrace, trace_option_t option,
324                void *value) {
325
326        switch(option) {
327                case TRACE_OPTION_CONSTANT_ERF_FRAMING:
328                        FORMAT_DATA->consterfframing = *(int *)value;
329                        break;
330                case TRACE_OPTION_EVENT_REALTIME:
331                case TRACE_OPTION_SNAPLEN:
332                case TRACE_OPTION_PROMISC:
333                case TRACE_OPTION_FILTER:
334                case TRACE_OPTION_META_FREQ:
335                default:
336                        trace_set_err(libtrace, TRACE_ERR_OPTION_UNAVAIL,
337                                        "Unsupported option");
338                        return -1;
339        }
340
341        return 0;
342}
343
344static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
345                uint16_t portnum, uint16_t monid) {
346
347        ndag_internal_message_t alert;
348
349        alert.type = NDAG_CLIENT_NEWGROUP;
350        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
351        alert.contents.localiface = FORMAT_DATA->localiface;
352        alert.contents.port = portnum;
353        alert.contents.monitor = monid;
354
355        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
356                        (void *)&alert);
357
358}
359
360static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
361                int msgsize, uint16_t *ptmap) {
362
363        int i;
364        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
365        uint8_t msgtype;
366
367        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
368        if (msgtype == 0) {
369                return -1;
370        }
371
372        msgsize -= sizeof(ndag_common_t);
373        if (msgtype == NDAG_PKT_BEACON) {
374                /* If message is a beacon, make sure every port included in the
375                 * beacon is assigned to a receive thread.
376                 */
377                uint16_t *ptr, numstreams;
378
379                if ((uint32_t)msgsize < sizeof(uint16_t)) {
380                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
381                        return -1;
382                }
383
384                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
385                numstreams = ntohs(*ptr);
386                ptr ++;
387
388                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
389                {
390                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
391                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
392                        return -1;
393                }
394
395                for (i = 0; i < numstreams; i++) {
396                        uint16_t streamport = ntohs(*ptr);
397
398                        if (ptmap[streamport] == 0xffff) {
399                                new_group_alert(libtrace,
400                                        FORMAT_DATA->nextthreadid, streamport,
401                                        ntohs(ndaghdr->monitorid));
402
403                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
404
405                                if (libtrace->perpkt_thread_count == 0) {
406                                        FORMAT_DATA->nextthreadid = 0;
407                                } else {
408                                        FORMAT_DATA->nextthreadid =
409                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
410                                }
411                        }
412
413                        ptr ++;
414                }
415        } else {
416                fprintf(stderr,
417                        "Unexpected message type on control channel: %u\n",
418                         msgtype);
419                return -1;
420        }
421
422        return 0;
423
424}
425
426static void *ndag_controller_run(void *tdata) {
427
428        libtrace_t *libtrace = (libtrace_t *)tdata;
429        uint16_t ptmap[65536];
430        int sock = -1;
431        struct addrinfo *receiveaddr = NULL;
432        fd_set listening;
433        struct timeval timeout;
434
435        /* ptmap is a dirty hack to allow us to quickly check if we've already
436         * assigned a stream to a thread.
437         */
438        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
439
440        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
441                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
442                        &receiveaddr);
443        if (sock == -1) {
444                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
445                        "Unable to join multicast group for nDAG control channel");
446                trace_interrupt();
447                pthread_exit(NULL);
448        }
449
450        ndag_paused = 0;
451        while ((is_halted(libtrace) == -1) && !ndag_paused) {
452                int ret;
453                char buf[CTRL_BUF_SIZE];
454
455                FD_ZERO(&listening);
456                FD_SET(sock, &listening);
457
458                timeout.tv_sec = 0;
459                timeout.tv_usec = 500000;
460
461                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
462                if (ret < 0) {
463                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
464                        break;
465                }
466
467                if (!FD_ISSET(sock, &listening)) {
468                        continue;
469                }
470
471                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
472                                receiveaddr->ai_addr,
473                                &(receiveaddr->ai_addrlen));
474                if (ret < 0) {
475                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
476                        break;
477                }
478
479                if (ret == 0) {
480                        break;
481                }
482
483                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
484                        fprintf(stderr, "Error while parsing nDAG control message.\n");
485                        continue;
486                }
487        }
488
489        if (sock >= 0) {
490                close(sock);
491        }
492
493        /* Control channel has fallen over, should probably encourage libtrace
494         * to halt the receiver threads as well.
495         */
496        if (!is_halted(libtrace)) {
497                trace_interrupt();
498        }
499
500        pthread_exit(NULL);
501}
502
503static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
504{
505        int ret;
506        uint32_t i;
507        /* Configure the set of receiver threads */
508
509        if (FORMAT_DATA->receivers == NULL) {
510                /* What if the number of threads changes between a pause and
511                 * a restart? Can this happen? */
512                FORMAT_DATA->receivers = (recvstream_t *)
513                                malloc(sizeof(recvstream_t) * maxthreads);
514        }
515
516        for (i = 0; i < maxthreads; i++) {
517                FORMAT_DATA->receivers[i].sources = NULL;
518                FORMAT_DATA->receivers[i].sourcecount = 0;
519                FORMAT_DATA->receivers[i].knownmonitors = NULL;
520                FORMAT_DATA->receivers[i].monitorcount = 0;
521                FORMAT_DATA->receivers[i].threadindex = i;
522                FORMAT_DATA->receivers[i].dropped_upstream = 0;
523                FORMAT_DATA->receivers[i].received_packets = 0;
524                FORMAT_DATA->receivers[i].missing_records = 0;
525                FORMAT_DATA->receivers[i].maxfd = -1;
526
527                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
528                                sizeof(ndag_internal_message_t));
529        }
530
531        /* Start the controller thread */
532        /* TODO consider affinity of this thread? */
533
534        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
535                        ndag_controller_run, libtrace);
536        if (ret != 0) {
537                return -1;
538        }
539        return maxthreads;
540}
541
542static int ndag_start_input(libtrace_t *libtrace) {
543        return ndag_start_threads(libtrace, 1);
544}
545
546static int ndag_pstart_input(libtrace_t *libtrace) {
547        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
548                        libtrace->perpkt_thread_count)
549                return 0;
550        return -1;
551}
552
553static void halt_ndag_receiver(recvstream_t *receiver) {
554        int j, i;
555        libtrace_message_queue_destroy(&(receiver->mqueue));
556
557        if (receiver->sources == NULL)
558                return;
559        for (i = 0; i < receiver->sourcecount; i++) {
560                streamsock_t src = receiver->sources[i];
561                if (src.saved) {
562                        for (j = 0; j < ENCAP_BUFFERS; j++) {
563                                if (src.saved[j]) {
564                                        free(src.saved[j]);
565                                }
566                        }
567                        free(src.saved);
568                }
569
570#if HAVE_DECL_RECVMMSG
571                for (j = 0; j < RECV_BATCH_SIZE; j++) {
572                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
573                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
574                        }
575                }
576#else
577                free(src.singlemsg.msg_iov);
578#endif
579
580                if (src.sock != -1) {
581                        close(src.sock);
582                }
583        }
584        if (receiver->knownmonitors) {
585                free(receiver->knownmonitors);
586        }
587
588        if (receiver->sources) {
589                free(receiver->sources);
590        }
591}
592
593static int ndag_pause_input(libtrace_t *libtrace) {
594        int i;
595
596        /* Close the existing receiver sockets */
597        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
598               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
599        }
600        return 0;
601}
602
603static int ndag_fin_input(libtrace_t *libtrace) {
604
605        if (FORMAT_DATA->receivers) {
606                free(FORMAT_DATA->receivers);
607        }
608        if (FORMAT_DATA->multicastgroup) {
609                free(FORMAT_DATA->multicastgroup);
610        }
611        if (FORMAT_DATA->portstr) {
612                free(FORMAT_DATA->portstr);
613        }
614        if (FORMAT_DATA->localiface) {
615                free(FORMAT_DATA->localiface);
616        }
617
618        free(libtrace->format_data);
619        return 0;
620}
621
622static int ndag_get_framing_length(const libtrace_packet_t *packet) {
623
624        libtrace_t *libtrace = packet->trace;
625
626        if (FORMAT_DATA->consterfframing >= 0) {
627                return FORMAT_DATA->consterfframing;
628        }
629        return erf_get_framing_length(packet);
630}
631
632static int ndag_prepare_packet_stream(libtrace_t *restrict libtrace,
633                recvstream_t *restrict rt,
634                streamsock_t *restrict ssock,
635                libtrace_packet_t *restrict packet,
636                uint32_t flags UNUSED) {
637
638        /* XXX flags is constant, so we can tell the compiler to not
639         * bother copying over the parameter
640         */
641
642        dag_record_t *erfptr;
643        ndag_encap_t *encaphdr;
644        uint16_t ndag_reccount = 0;
645        int nr;
646        uint16_t rlen;
647
648        /*
649        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
650                packet->buf_control = TRACE_CTRL_PACKET;
651        } else {
652                packet->buf_control = TRACE_CTRL_EXTERNAL;
653        }
654        */
655        packet->buf_control = TRACE_CTRL_EXTERNAL;
656
657        packet->trace = libtrace;
658        packet->buffer = ssock->nextread;
659        packet->header = ssock->nextread;
660        packet->type = TRACE_RT_DATA_ERF;
661
662        erfptr = (dag_record_t *)packet->header;
663
664        if (erfptr->flags.rxerror == 1) {
665                packet->payload = NULL;
666                if (FORMAT_DATA->consterfframing >= 0) {
667                        erfptr->rlen = htons(FORMAT_DATA->consterfframing & 0xffff);
668                } else {
669                        erfptr->rlen = htons(erf_get_framing_length(packet));
670                }
671        } else {
672                if (FORMAT_DATA->consterfframing >= 0) {
673                        packet->payload = (char *)packet->buffer +
674                                FORMAT_DATA->consterfframing;
675                } else {
676                        packet->payload = (char *)packet->buffer +
677                                erf_get_framing_length(packet);
678                }
679        }
680
681        /* Update upstream drops using lctr */
682
683        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
684                /* TODO */
685        } else {
686                if (rt->received_packets > 0) {
687                        rt->dropped_upstream += ntohs(erfptr->lctr);
688                }
689        }
690
691        rt->received_packets ++;
692        ssock->recordcount += 1;
693
694        nr = ssock->nextreadind;
695        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
696                        sizeof(ndag_common_t));
697
698        ndag_reccount = ntohs(encaphdr->recordcount);
699        if ((ndag_reccount & 0x8000) != 0) {
700                /* Record was truncated -- update rlen appropriately */
701                rlen = ssock->savedsize[nr] -
702                                (ssock->nextread - ssock->saved[nr]);
703                erfptr->rlen = htons(rlen);
704        } else {
705                rlen = ntohs(erfptr->rlen);
706        }
707        ssock->nextread += rlen;
708        ssock->nextts = 0;
709
710        if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) {
711                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the "
712                        "nDAG receive buffer, probably due to a invalid rlen, in ndag_prepare_packet_stream()");
713                return -1;
714        }
715
716        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
717                /* Read everything from this buffer, mark as empty and
718                 * move on. */
719                ssock->savedsize[nr] = 0;
720                ssock->bufwaiting ++;
721
722                nr ++;
723                if (nr == ENCAP_BUFFERS) {
724                        nr = 0;
725                }
726                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
727                                sizeof(ndag_encap_t);
728                ssock->nextreadind = nr;
729        }
730
731        packet->order = erf_get_erf_timestamp(packet);
732        packet->error = rlen;
733        return rlen;
734}
735
736static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
737                libtrace_packet_t *packet UNUSED,
738                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
739                uint32_t flags UNUSED) {
740
741        fprintf(stderr, "Sending nDAG records over RT doesn't make sense! Please stop\n");
742        return 0;
743
744}
745
746static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
747
748        ndag_monitor_t *mon;
749
750        if (rt->monitorcount == 0) {
751                rt->knownmonitors = (ndag_monitor_t *)
752                                malloc(sizeof(ndag_monitor_t) * 5);
753        } else {
754                rt->knownmonitors = (ndag_monitor_t *)
755                            realloc(rt->knownmonitors,
756                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
757        }
758
759        mon = &(rt->knownmonitors[rt->monitorcount]);
760        mon->monitorid = monid;
761        mon->laststart = 0;
762
763        rt->monitorcount ++;
764        return mon;
765}
766
767static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
768
769        streamsock_t *ssock = NULL;
770        ndag_monitor_t *mon = NULL;
771        int i;
772
773        /* TODO consider replacing this with a list or vector so we can
774         * easily remove sources that are no longer in use, rather than
775         * just setting the sock to -1 and having to check them every
776         * time we want to read a packet.
777         */
778        if (rt->sourcecount == 0) {
779                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
780        } else if ((rt->sourcecount % 10) == 0) {
781                rt->sources = (streamsock_t *)realloc(rt->sources,
782                        sizeof(streamsock_t) * (rt->sourcecount + 10));
783        }
784
785        ssock = &(rt->sources[rt->sourcecount]);
786
787        for (i = 0; i < rt->monitorcount; i++) {
788                if (rt->knownmonitors[i].monitorid == src.monitor) {
789                        mon = &(rt->knownmonitors[i]);
790                        break;
791                }
792        }
793
794        if (mon == NULL) {
795                mon = add_new_knownmonitor(rt, src.monitor);
796        }
797
798        ssock->port = src.port;
799        ssock->groupaddr = src.groupaddr;
800        ssock->expectedseq = 0;
801        ssock->monitorptr = mon;
802        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
803        ssock->bufavail = ENCAP_BUFFERS;
804        ssock->bufwaiting = 0;
805        ssock->startidle = 0;
806        ssock->nextts = 0;
807
808        for (i = 0; i < ENCAP_BUFFERS; i++) {
809                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
810                ssock->savedsize[i] = 0;
811        }
812
813        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
814                        NULL, src.port, &(ssock->srcaddr));
815
816        if (ssock->sock < 0) {
817                return -1;
818        }
819
820        if (ssock->sock > rt->maxfd) {
821                rt->maxfd = ssock->sock;
822        }
823
824#if HAVE_DECL_RECVMMSG
825        for (i = 0; i < RECV_BATCH_SIZE; i++) {
826                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
827                                malloc(sizeof(struct iovec));
828                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
829                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
830                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
831                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
832                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
833                ssock->mmsgbufs[i].msg_len = 0;
834        }
835#else
836        ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec));
837#endif
838
839        ssock->nextread = NULL;;
840        ssock->nextreadind = 0;
841        ssock->nextwriteind = 0;
842        ssock->recordcount = 0;
843        rt->sourcecount += 1;
844
845        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
846                        ssock->groupaddr, ssock->port, rt->threadindex);
847
848        return ssock->port;
849}
850
851static int receiver_read_messages(recvstream_t *rt) {
852
853        ndag_internal_message_t msg;
854
855        while (libtrace_message_queue_try_get(&(rt->mqueue),
856                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
857                switch(msg.type) {
858                        case NDAG_CLIENT_NEWGROUP:
859                                if (add_new_streamsock(rt, msg.contents) < 0) {
860                                        return -1;
861                                }
862                                break;
863                        case NDAG_CLIENT_HALT:
864                                return 0;
865                }
866        }
867        return 1;
868
869}
870
871static inline int readable_data(streamsock_t *ssock) {
872
873        if (ssock->sock == -1) {
874                return 0;
875        }
876        if (ssock->savedsize[ssock->nextreadind] == 0) {
877                return 0;
878        }
879        /*
880        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
881                        ssock->savedsize[ssock->nextreadind]) {
882                return 0;
883        }
884        */
885        return 1;
886
887
888}
889
890static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
891
892        int i;
893        for (i = 0; i < rt->sourcecount; i++) {
894                if (rt->sources[i].monitorptr == mon) {
895                        rt->sources[i].expectedseq = 0;
896                }
897        }
898
899}
900
901static int init_receivers(streamsock_t *ssock, int required) {
902
903        int wind = ssock->nextwriteind;
904        int i = 1;
905
906#if HAVE_DECL_RECVMMSG
907        for (i = 0; i < required; i++) {
908                if (i >= RECV_BATCH_SIZE) {
909                        break;
910                }
911
912                if (wind >= ENCAP_BUFFERS) {
913                        wind = 0;
914                }
915
916                ssock->mmsgbufs[i].msg_len = 0;
917                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
918                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
919                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
920
921                wind ++;
922        }
923#else
924        if (required <= 0) {
925                fprintf(stderr, "You are required to have atleast 1 receiver in init_receivers\n");
926                return TRACE_ERR_INIT_FAILED;
927        }
928        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
929        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
930        ssock->singlemsg.msg_iovlen = 1;
931#endif
932        return i;
933}
934
935static int check_ndag_received(streamsock_t *ssock, int index,
936                unsigned int msglen, recvstream_t *rt) {
937
938        ndag_encap_t *encaphdr;
939        ndag_monitor_t *mon;
940        uint8_t rectype;
941
942        /* Check that we have a valid nDAG encap record */
943        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
944
945        if (rectype == NDAG_PKT_KEEPALIVE) {
946                /* Keep-alive, reset startidle and carry on. Don't
947                 * change nextwrite -- we want to overwrite the
948                 * keep-alive with usable content. */
949                return 0;
950        } else if (rectype != NDAG_PKT_ENCAPERF) {
951                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
952                                ssock->groupaddr, ssock->port);
953                close(ssock->sock);
954                ssock->sock = -1;
955                return -1;
956        }
957
958        ssock->savedsize[index] = msglen;
959        ssock->nextwriteind ++;
960        ssock->bufavail --;
961
962        if (ssock->bufavail < 0) {
963                fprintf(stderr, "No space in buffer in check_ndag_received()\n");
964                return -1;
965        }
966        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
967                ssock->nextwriteind = 0;
968        }
969
970        /* Get the useful info from the encap header */
971        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
972
973        mon = ssock->monitorptr;
974
975        if (mon->laststart == 0) {
976                mon->laststart = bswap_be_to_host64(encaphdr->started);
977        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
978                mon->laststart = bswap_be_to_host64(encaphdr->started);
979                reset_expected_seqs(rt, mon);
980
981                /* TODO what is a good way to indicate this to clients?
982                 * set the loss counter in the ERF header? a bit rude?
983                 * use another bit in the ERF header?
984                 * add a queryable flag to libtrace_packet_t?
985                 */
986
987        }
988
989        if (ssock->expectedseq != 0) {
990                rt->missing_records += seq_cmp(
991                                ntohl(encaphdr->seqno), ssock->expectedseq);
992
993        }
994        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
995        if (ssock->expectedseq == 0) {
996                ssock->expectedseq ++;
997        }
998
999        if (ssock->nextread == NULL) {
1000                /* If this is our first read, set up 'nextread'
1001                 * by skipping past the nDAG headers */
1002                ssock->nextread = ssock->saved[0] +
1003                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
1004        }
1005        return 1;
1006
1007}
1008
1009static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
1010                int *gottime, recvstream_t *rt) {
1011
1012        int ret, ndagstat, avail;
1013        int toret = 0;
1014
1015#if HAVE_DECL_RECVMMSG
1016        int i;
1017#endif
1018
1019        avail = init_receivers(ssock, ssock->bufavail);
1020
1021#if HAVE_DECL_RECVMMSG
1022        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
1023                        MSG_DONTWAIT, NULL);
1024#else
1025        if (avail != 1) {
1026                return 0;
1027        }
1028
1029        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
1030#endif
1031        if (ret < 0) {
1032                /* Nothing to receive right now, but we should still
1033                 * count as 'ready' if at least one buffer is full */
1034                if (errno == EAGAIN || errno == EWOULDBLOCK) {
1035                        if (readable_data(ssock)) {
1036                                toret = 1;
1037                        }
1038                        if (!(*gottime)) {
1039                                gettimeofday(tv, NULL);
1040                                *gottime = 1;
1041                        }
1042                        if (ssock->startidle == 0) {
1043                                ssock->startidle = tv->tv_sec;
1044                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
1045                                fprintf(stderr,
1046                                        "Closing channel %s:%u due to inactivity.\n",
1047                                        ssock->groupaddr,
1048                                        ssock->port);
1049
1050                                close(ssock->sock);
1051                                ssock->sock = -1;
1052                        }
1053                } else {
1054
1055                        fprintf(stderr,
1056                                "Error receiving encapsulated records from %s:%u -- %s \n",
1057                                ssock->groupaddr, ssock->port,
1058                                strerror(errno));
1059                        close(ssock->sock);
1060                        ssock->sock = -1;
1061                }
1062                return toret;
1063        }
1064
1065        ssock->startidle = 0;
1066
1067#if HAVE_DECL_RECVMMSG
1068        for (i = 0; i < ret; i++) {
1069                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
1070                                ssock->mmsgbufs[i].msg_len, rt);
1071                if (ndagstat == -1) {
1072                        break;
1073                }
1074
1075                if (ndagstat == 1) {
1076                        toret = 1;
1077                }
1078        }
1079#else
1080        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1081        if (ndagstat <= 0) {
1082                toret = 0;
1083        } else {
1084                toret = 1;
1085        }
1086#endif
1087
1088        return toret;
1089}
1090
1091static int receive_from_sockets(recvstream_t *rt) {
1092
1093        int i, readybufs, gottime;
1094        struct timeval tv;
1095        fd_set fds;
1096        int maxfd = 0;
1097        struct timeval zerotv;
1098
1099        readybufs = 0;
1100        gottime = 0;
1101
1102        if (rt->maxfd == -1) {
1103                return 0;
1104        }
1105
1106        for (i = 0; i < rt->sourcecount; i++) {
1107                if (rt->sources[i].sock == -1) {
1108                        continue;
1109                }
1110
1111#if HAVE_DECL_RECVMMSG
1112                /* Plenty of full buffers, just use the packets in those */
1113                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
1114                        readybufs ++;
1115                        continue;
1116                }
1117#else
1118                if (rt->sources[i].bufavail == 0) {
1119                        readybufs ++;
1120                        continue;
1121                }
1122#endif
1123                if (maxfd == 0) {
1124                        FD_ZERO(&fds);
1125                }
1126                FD_SET(rt->sources[i].sock, &fds);
1127                if (maxfd < rt->sources[i].sock) {
1128                        maxfd = rt->sources[i].sock;
1129                }
1130        }
1131
1132
1133        if (maxfd <= 0) {
1134                return readybufs;
1135        }
1136
1137        zerotv.tv_sec = 0;
1138        zerotv.tv_usec = 0;
1139        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1140                /* log the error? XXX */
1141                return -1;
1142        }
1143
1144        for (i = 0; i < rt->sourcecount; i++) {
1145                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1146                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
1147                                readybufs ++;
1148                        }
1149                        continue;
1150                }
1151                readybufs += receive_from_single_socket(&(rt->sources[i]),
1152                                &tv, &gottime, rt);
1153        }
1154
1155        return readybufs;
1156
1157}
1158
1159
1160static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1161                libtrace_packet_t *packet) {
1162
1163        int iserr = 0;
1164
1165        if (packet->buf_control == TRACE_CTRL_PACKET) {
1166                free(packet->buffer);
1167                packet->buffer = NULL;
1168        }
1169
1170        do {
1171                /* Make sure we shouldn't be halting */
1172                if ((iserr = is_halted(libtrace)) != -1) {
1173                        return iserr;
1174                }
1175
1176                /* Check for any messages from the control thread */
1177                iserr = receiver_read_messages(rt);
1178
1179                if (iserr <= 0) {
1180                        return iserr;
1181                }
1182
1183                /* If blocking and no sources, sleep for a bit and then try
1184                 * checking for messages again.
1185                 */
1186                if (rt->sourcecount == 0) {
1187                        usleep(10000);
1188                        continue;
1189                }
1190
1191                if ((iserr = receive_from_sockets(rt)) < 0) {
1192                        return iserr;
1193                } else if (iserr > 0) {
1194                        /* At least one of our input sockets has available
1195                         * data, let's go ahead and use what we have. */
1196                        break;
1197                }
1198
1199                /* None of our sources have anything available, we can take
1200                 * a short break rather than immediately trying again.
1201                 */
1202                if (iserr == 0) {
1203                        usleep(100);
1204                }
1205
1206        } while (1);
1207
1208        return iserr;
1209}
1210
1211static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1212                libtrace_packet_t *packet) {
1213
1214        int iserr = 0;
1215
1216        if (packet->buf_control == TRACE_CTRL_PACKET) {
1217                free(packet->buffer);
1218                packet->buffer = NULL;
1219        }
1220
1221        /* Make sure we shouldn't be halting */
1222        if ((iserr = is_halted(libtrace)) != -1) {
1223                return iserr;
1224        }
1225
1226        /* If non-blocking and there are no sources, just break */
1227        if (rt->sourcecount == 0) {
1228                return 0;
1229        }
1230
1231        return receive_from_sockets(rt);
1232}
1233
1234static streamsock_t *select_next_packet(recvstream_t *rt) {
1235        int i;
1236        streamsock_t *ssock = NULL;
1237        uint64_t earliest = 0;
1238        uint64_t currentts = 0;
1239        dag_record_t *daghdr;
1240
1241        /* If we only have one source, then no need to do any
1242         * timestamp parsing or byteswapping.
1243         */
1244        if (rt->sourcecount == 1) {
1245                if (readable_data(&(rt->sources[0]))) {
1246                        return &(rt->sources[0]);
1247                }
1248                return NULL;
1249        }
1250
1251
1252        for (i = 0; i < rt->sourcecount; i ++) {
1253                if (!readable_data(&(rt->sources[i]))) {
1254                        continue;
1255                }
1256
1257                if (rt->sources[i].nextts == 0) {
1258                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1259                        currentts = bswap_le_to_host64(daghdr->ts);
1260                        rt->sources[i].nextts = currentts;
1261                } else {
1262                        currentts = rt->sources[i].nextts;
1263                }
1264
1265                if (earliest == 0 || earliest > currentts) {
1266                        earliest = currentts;
1267                        ssock = &(rt->sources[i]);
1268                }
1269        }
1270        return ssock;
1271}
1272
1273static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1274
1275        int rem, ret;
1276        streamsock_t *nextavail = NULL;
1277        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1278                        packet);
1279
1280        if (rem <= 0) {
1281                return rem;
1282        }
1283
1284        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1285        if (nextavail == NULL) {
1286                return 0;
1287        }
1288
1289        /* nextread should point at an ERF header, so prepare 'packet' to be
1290         * a libtrace ERF packet. */
1291
1292        ret = ndag_prepare_packet_stream(libtrace,
1293                        &(FORMAT_DATA->receivers[0]), nextavail,
1294                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1295        nextavail->bufavail += nextavail->bufwaiting;
1296        nextavail->bufwaiting = 0;
1297        return ret;
1298}
1299
1300static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1301                libtrace_packet_t **packets, size_t nb_packets) {
1302
1303        recvstream_t *rt;
1304        int rem, i;
1305        size_t read_packets = 0;
1306        streamsock_t *nextavail = NULL;
1307
1308        rt = (recvstream_t *)t->format_data;
1309
1310        do {
1311                /* Only check for messages once per batch */
1312                if (read_packets == 0) {
1313                        rem = receive_encap_records_block(libtrace, rt,
1314                                packets[read_packets]);
1315                        if (rem < 0) {
1316                                return rem;
1317                        }
1318
1319                        if (rem == 0) {
1320                                break;
1321                        }
1322                }
1323
1324                nextavail = select_next_packet(rt);
1325                if (nextavail == NULL) {
1326                        break;
1327                }
1328
1329                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1330                                packets[read_packets],
1331                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1332
1333                read_packets  ++;
1334                if (read_packets >= nb_packets) {
1335                        break;
1336                }
1337        } while (1);
1338
1339        for (i = 0; i < rt->sourcecount; i++) {
1340                streamsock_t *src = &(rt->sources[i]);
1341                src->bufavail += src->bufwaiting;
1342                src->bufwaiting = 0;
1343                if (src->bufavail < 0 || src->bufavail > ENCAP_BUFFERS) {
1344                        trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in ndag_pread_packets()");
1345                        return -1;
1346                }
1347        }
1348
1349        return read_packets;
1350
1351}
1352
1353static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1354                libtrace_packet_t *packet) {
1355
1356
1357        libtrace_eventobj_t event = {0,0,0.0,0};
1358        int rem, i;
1359        streamsock_t *nextavail = NULL;
1360
1361        /* Only check for messages once per call */
1362        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1363        if (rem <= 0) {
1364                event.type = TRACE_EVENT_TERMINATE;
1365                return event;
1366        }
1367
1368        do {
1369                rem = receive_encap_records_nonblock(libtrace,
1370                                &(FORMAT_DATA->receivers[0]), packet);
1371
1372                if (rem < 0) {
1373                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1374                                "Received invalid nDAG records.");
1375                        event.type = TRACE_EVENT_TERMINATE;
1376                        break;
1377                }
1378
1379                if (rem == 0) {
1380                        /* Either we've been halted or we've got no packets
1381                         * right now. */
1382                        if (is_halted(libtrace) == 0) {
1383                                event.type = TRACE_EVENT_TERMINATE;
1384                                break;
1385                        }
1386                        event.type = TRACE_EVENT_SLEEP;
1387                        event.seconds = 0.0001;
1388                        break;
1389                }
1390
1391                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1392                if (nextavail == NULL) {
1393                        event.type = TRACE_EVENT_SLEEP;
1394                        event.seconds = 0.0001;
1395                        break;
1396                }
1397
1398                event.type = TRACE_EVENT_PACKET;
1399                ndag_prepare_packet_stream(libtrace,
1400                                &(FORMAT_DATA->receivers[0]), nextavail,
1401                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1402                event.size = trace_get_capture_length(packet) +
1403                                trace_get_framing_length(packet);
1404
1405                if (libtrace->filter) {
1406                        int filtret = trace_apply_filter(libtrace->filter,
1407                                        packet);
1408                        if (filtret == -1) {
1409                                trace_set_err(libtrace,
1410                                                TRACE_ERR_BAD_FILTER,
1411                                                "Bad BPF Filter");
1412                                event.type = TRACE_EVENT_TERMINATE;
1413                                break;
1414                        }
1415
1416                        if (filtret == 0) {
1417                                /* Didn't match filter, try next one */
1418                                libtrace->filtered_packets ++;
1419                                trace_clear_cache(packet);
1420                                continue;
1421                        }
1422                }
1423
1424                if (libtrace->snaplen > 0) {
1425                        trace_set_capture_length(packet, libtrace->snaplen);
1426                }
1427                libtrace->accepted_packets ++;
1428                break;
1429        } while (1);
1430
1431        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1432                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1433                src->bufavail += src->bufwaiting;
1434                src->bufwaiting = 0;
1435                if (src->bufavail < 0 || src->bufavail > ENCAP_BUFFERS) {
1436                        trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in trace_event_ndag()");
1437                        break;
1438                }
1439        }
1440
1441        return event;
1442}
1443
1444static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1445
1446        int i;
1447
1448        stat->dropped_valid = 1;
1449        stat->dropped = 0;
1450        stat->received_valid = 1;
1451        stat->received = 0;
1452        stat->missing_valid = 1;
1453        stat->missing = 0;
1454
1455        /* TODO Is this thread safe? */
1456        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1457                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1458                stat->received += FORMAT_DATA->receivers[i].received_packets;
1459                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1460        }
1461
1462}
1463
1464static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1465                libtrace_stat_t *stat) {
1466
1467        recvstream_t *recvr = (recvstream_t *)t->format_data;
1468
1469        if (libtrace == NULL)
1470                return;
1471        /* TODO Is this thread safe */
1472        stat->dropped_valid = 1;
1473        stat->dropped = recvr->dropped_upstream;
1474
1475        stat->received_valid = 1;
1476        stat->received = recvr->received_packets;
1477
1478        stat->missing_valid = 1;
1479        stat->missing = recvr->missing_records;
1480
1481}
1482
1483static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1484                bool reader) {
1485        recvstream_t *recvr;
1486
1487        if (!reader || t->type != THREAD_PERPKT) {
1488                return 0;
1489        }
1490
1491        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1492        t->format_data = recvr;
1493
1494        return 0;
1495}
1496
1497static struct libtrace_format_t ndag = {
1498
1499        "ndag",
1500        "",
1501        TRACE_FORMAT_NDAG,
1502        NULL,                   /* probe filename */
1503        NULL,                   /* probe magic */
1504        ndag_init_input,        /* init_input */
1505        ndag_config_input,      /* config_input */
1506        ndag_start_input,       /* start_input */
1507        ndag_pause_input,       /* pause_input */
1508        NULL,                   /* init_output */
1509        NULL,                   /* config_output */
1510        NULL,                   /* start_output */
1511        ndag_fin_input,         /* fin_input */
1512        NULL,                   /* fin_output */
1513        ndag_read_packet,       /* read_packet */
1514        ndag_prepare_packet,    /* prepare_packet */
1515        NULL,                   /* fin_packet */
1516        NULL,                   /* write_packet */
1517        NULL,                   /* flush_output */
1518        erf_get_link_type,      /* get_link_type */
1519        erf_get_direction,      /* get_direction */
1520        erf_set_direction,      /* set_direction */
1521        erf_get_erf_timestamp,  /* get_erf_timestamp */
1522        NULL,                   /* get_timeval */
1523        NULL,                   /* get_seconds */
1524        NULL,                   /* get_timespec */
1525        NULL,                   /* seek_erf */
1526        NULL,                   /* seek_timeval */
1527        NULL,                   /* seek_seconds */
1528        erf_get_capture_length, /* get_capture_length */
1529        erf_get_wire_length,    /* get_wire_length */
1530        ndag_get_framing_length, /* get_framing_length */
1531        erf_set_capture_length, /* set_capture_length */
1532        NULL,                   /* get_received_packets */
1533        NULL,                   /* get_filtered_packets */
1534        NULL,                   /* get_dropped_packets */
1535        ndag_get_statistics,    /* get_statistics */
1536        NULL,                   /* get_fd */
1537        trace_event_ndag,       /* trace_event */
1538        NULL,                   /* help */
1539        NULL,                   /* next pointer */
1540        {true, 0},              /* live packet capture */
1541        ndag_pstart_input,      /* parallel start */
1542        ndag_pread_packets,     /* parallel read */
1543        ndag_pause_input,       /* parallel pause */
1544        NULL,
1545        ndag_pregister_thread,  /* register thread */
1546        NULL,
1547        ndag_get_thread_stats   /* per-thread stats */
1548};
1549
1550void ndag_constructor(void) {
1551        register_format(&ndag);
1552}
Note: See TracBrowser for help on using the repository browser.