source: lib/format_ndag.c @ ef5ba20

develop
Last change on this file since ef5ba20 was ef5ba20, checked in by Jacob Van Walraven <jcv9@…>, 22 months ago

add abilty to get custom option from meta packets, add abilty to get entire section from meta packet, meta api now returns libtrace_meta_t structure

  • Property mode set to 100644
File size: 48.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <errno.h>
38#include <fcntl.h>
39#include <stdio.h>
40#include <string.h>
41#include <unistd.h>
42#include <stdlib.h>
43#include <net/if.h>
44#include <sys/types.h>
45#include <sys/socket.h>
46#include <netdb.h>
47
48#include "format_ndag.h"
49
50#define NDAG_IDLE_TIMEOUT (600)
51#define ENCAP_BUFSIZE (10000)
52#define CTRL_BUF_SIZE (10000)
53#define ENCAP_BUFFERS (1000)
54
55#define RECV_BATCH_SIZE (50)
56
57#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
58
59static struct libtrace_format_t ndag;
60
61volatile int ndag_paused = 0;
62
63typedef struct monitor {
64        uint16_t monitorid;
65        uint64_t laststart;
66} ndag_monitor_t;
67
68
69typedef struct streamsource {
70        uint16_t monitor;
71        char *groupaddr;
72        char *localiface;
73        uint16_t port;
74} streamsource_t;
75
76typedef struct streamsock {
77        char *groupaddr;
78        int sock;
79        struct addrinfo *srcaddr;
80        uint16_t port;
81        uint32_t expectedseq;
82        ndag_monitor_t *monitorptr;
83        char **saved;
84        char *nextread;
85        int nextreadind;
86        int nextwriteind;
87        int savedsize[ENCAP_BUFFERS];
88        uint64_t nextts;
89        uint32_t startidle;
90        uint64_t recordcount;
91
92        int bufavail;
93        int bufwaiting;
94
95#if HAVE_DECL_RECVMMSG
96        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
97#else
98        struct msghdr singlemsg;
99#endif
100
101} streamsock_t;
102
103typedef struct recvstream {
104        streamsock_t *sources;
105        uint16_t sourcecount;
106        libtrace_message_queue_t mqueue;
107        int threadindex;
108        ndag_monitor_t *knownmonitors;
109        uint16_t monitorcount;
110
111        uint64_t dropped_upstream;
112        uint64_t missing_records;
113        uint64_t received_packets;
114
115        int maxfd;
116} recvstream_t;
117
118typedef struct ndag_format_data {
119        char *multicastgroup;
120        char *portstr;
121        char *localiface;
122        uint16_t nextthreadid;
123        recvstream_t *receivers;
124
125        pthread_t controlthread;
126        libtrace_message_queue_t controlqueue;
127        int consterfframing;
128} ndag_format_data_t;
129
130enum {
131        NDAG_CLIENT_HALT = 0x01,
132        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
133        NDAG_CLIENT_NEWGROUP = 0x03
134};
135
136typedef struct ndagreadermessage {
137        uint8_t type;
138        streamsource_t contents;
139} ndag_internal_message_t;
140
141
142static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
143
144        /* Calculate seq_a - seq_b, taking wraparound into account */
145        if (seq_a == seq_b) return 0;
146
147        if (seq_a > seq_b) {
148                return (int) (seq_a - seq_b);
149        }
150
151        /* -1 for the wrap and another -1 because we don't use zero */
152        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
153}
154
155static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
156        ndag_common_t *header = (ndag_common_t *)msgbuf;
157
158        if (msgsize < sizeof(ndag_common_t)) {
159                fprintf(stderr,
160                        "nDAG message does not have a complete nDAG header.\n");
161                return 0;
162        }
163
164        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
165                fprintf(stderr,
166                        "nDAG message does not have a valid magic number.\n");
167                return 0;
168        }
169
170        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
171                fprintf(stderr,
172                        "nDAG message has an invalid header version: %u\n",
173                                header->version);
174                return 0;
175        }
176
177        return header->type;
178}
179
180static int join_multicast_group(char *groupaddr, char *localiface,
181        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
182
183        struct addrinfo hints;
184        struct addrinfo *gotten;
185        struct addrinfo *group;
186        unsigned int interface;
187        char pstr[16];
188        struct group_req greq;
189        int bufsize, val;
190
191        int sock;
192
193        if (portstr == NULL) {
194                snprintf(pstr, 15, "%u", portnum);
195                portstr = pstr;
196        }
197
198        interface = if_nametoindex(localiface);
199        if (interface == 0) {
200                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
201                                localiface, strerror(errno));
202                return -1;
203        }
204
205        hints.ai_family = PF_UNSPEC;
206        hints.ai_socktype = SOCK_DGRAM;
207        hints.ai_flags = AI_PASSIVE;
208        hints.ai_protocol = 0;
209
210        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
211                fprintf(stderr,
212                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
213                                portstr, strerror(errno));
214                return -1;
215        }
216
217        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
218                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
219                                groupaddr, strerror(errno));
220                return -1;
221        }
222
223        *srcinfo = gotten;
224        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
225        if (sock < 0) {
226                fprintf(stderr,
227                        "Failed to create multicast socket for %s:%s -- %s\n",
228                                groupaddr, portstr, strerror(errno));
229                goto sockcreateover;
230        }
231
232        val = 1;
233        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
234                fprintf(stderr,
235                        "Failed to set REUSEADDR socket option for %s:%s -- %s\n",
236                                groupaddr, portstr, strerror(errno));
237                goto sockcreateover;
238        }
239
240        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
241        {
242                fprintf(stderr,
243                        "Failed to bind to multicast socket %s:%s -- %s\n",
244                                groupaddr, portstr, strerror(errno));
245                sock = -1;
246                goto sockcreateover;
247        }
248
249        greq.gr_interface = interface;
250        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
251
252        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
253                        sizeof(greq)) < 0) {
254                fprintf(stderr,
255                        "Failed to join multicast group %s:%s -- %s\n",
256                                groupaddr, portstr, strerror(errno));
257                close(sock);
258                sock = -1;
259                goto sockcreateover;
260        }
261
262        bufsize = 16 * 1024 * 1024;
263        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
264                                (socklen_t)sizeof(int)) < 0) {
265
266                fprintf(stderr,
267                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
268                                groupaddr, portstr, strerror(errno));
269                close(sock);
270                sock = -1;
271                goto sockcreateover;
272        }
273
274sockcreateover:
275        freeaddrinfo(group);
276        return sock;
277}
278
279
280static int ndag_init_input(libtrace_t *libtrace) {
281
282        char *scan = NULL;
283        char *next = NULL;
284
285        libtrace->format_data = (ndag_format_data_t *)malloc(
286                        sizeof(ndag_format_data_t));
287
288        if (!libtrace->format_data) {
289                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
290                        "format data inside ndag_init_input()");
291                return -1;
292        }
293
294        FORMAT_DATA->multicastgroup = NULL;
295        FORMAT_DATA->portstr = NULL;
296        FORMAT_DATA->localiface = NULL;
297        FORMAT_DATA->nextthreadid = 0;
298        FORMAT_DATA->receivers = NULL;
299        FORMAT_DATA->consterfframing = -1;
300
301        scan = strchr(libtrace->uridata, ',');
302        if (scan == NULL) {
303                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
304                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
305                return -1;
306        }
307        FORMAT_DATA->localiface = strndup(libtrace->uridata,
308                        (size_t)(scan - libtrace->uridata));
309        next = scan + 1;
310
311        scan = strchr(next, ',');
312        if (scan == NULL) {
313                FORMAT_DATA->portstr = strdup("9001");
314                FORMAT_DATA->multicastgroup = strdup(next);
315        } else {
316                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
317
318                FORMAT_DATA->portstr = strdup(scan + 1);
319        }
320        return 0;
321}
322
323static int ndag_config_input(libtrace_t *libtrace, trace_option_t option,
324                void *value) {
325
326        switch(option) {
327                case TRACE_OPTION_CONSTANT_ERF_FRAMING:
328                        FORMAT_DATA->consterfframing = *(int *)value;
329                        break;
330                case TRACE_OPTION_EVENT_REALTIME:
331                case TRACE_OPTION_SNAPLEN:
332                case TRACE_OPTION_PROMISC:
333                case TRACE_OPTION_FILTER:
334                case TRACE_OPTION_META_FREQ:
335                default:
336                        trace_set_err(libtrace, TRACE_ERR_OPTION_UNAVAIL,
337                                        "Unsupported option");
338                        return -1;
339        }
340
341        return 0;
342}
343
344static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
345                uint16_t portnum, uint16_t monid) {
346
347        ndag_internal_message_t alert;
348
349        alert.type = NDAG_CLIENT_NEWGROUP;
350        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
351        alert.contents.localiface = FORMAT_DATA->localiface;
352        alert.contents.port = portnum;
353        alert.contents.monitor = monid;
354
355        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
356                        (void *)&alert);
357
358}
359
360static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
361                int msgsize, uint16_t *ptmap) {
362
363        int i;
364        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
365        uint8_t msgtype;
366
367        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
368        if (msgtype == 0) {
369                return -1;
370        }
371
372        msgsize -= sizeof(ndag_common_t);
373        if (msgtype == NDAG_PKT_BEACON) {
374                /* If message is a beacon, make sure every port included in the
375                 * beacon is assigned to a receive thread.
376                 */
377                uint16_t *ptr, numstreams;
378
379                if ((uint32_t)msgsize < sizeof(uint16_t)) {
380                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
381                        return -1;
382                }
383
384                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
385                numstreams = ntohs(*ptr);
386                ptr ++;
387
388                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
389                {
390                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
391                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
392                        return -1;
393                }
394
395                for (i = 0; i < numstreams; i++) {
396                        uint16_t streamport = ntohs(*ptr);
397
398                        if (ptmap[streamport] == 0xffff) {
399                                new_group_alert(libtrace,
400                                        FORMAT_DATA->nextthreadid, streamport,
401                                        ntohs(ndaghdr->monitorid));
402
403                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
404
405                                if (libtrace->perpkt_thread_count == 0) {
406                                        FORMAT_DATA->nextthreadid = 0;
407                                } else {
408                                        FORMAT_DATA->nextthreadid =
409                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
410                                }
411                        }
412
413                        ptr ++;
414                }
415        } else {
416                fprintf(stderr,
417                        "Unexpected message type on control channel: %u\n",
418                         msgtype);
419                return -1;
420        }
421
422        return 0;
423
424}
425
426static void *ndag_controller_run(void *tdata) {
427
428        libtrace_t *libtrace = (libtrace_t *)tdata;
429        uint16_t ptmap[65536];
430        int sock = -1;
431        struct addrinfo *receiveaddr = NULL;
432        fd_set listening;
433        struct timeval timeout;
434
435        /* ptmap is a dirty hack to allow us to quickly check if we've already
436         * assigned a stream to a thread.
437         */
438        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
439
440        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
441                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
442                        &receiveaddr);
443        if (sock == -1) {
444                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
445                        "Unable to join multicast group for nDAG control channel");
446                trace_interrupt();
447                pthread_exit(NULL);
448        }
449
450        ndag_paused = 0;
451        while ((is_halted(libtrace) == -1) && !ndag_paused) {
452                int ret;
453                char buf[CTRL_BUF_SIZE];
454
455                FD_ZERO(&listening);
456                FD_SET(sock, &listening);
457
458                timeout.tv_sec = 0;
459                timeout.tv_usec = 500000;
460
461                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
462                if (ret < 0) {
463                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
464                        break;
465                }
466
467                if (!FD_ISSET(sock, &listening)) {
468                        continue;
469                }
470
471                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
472                                receiveaddr->ai_addr,
473                                &(receiveaddr->ai_addrlen));
474                if (ret < 0) {
475                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
476                        break;
477                }
478
479                if (ret == 0) {
480                        break;
481                }
482
483                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
484                        fprintf(stderr, "Error while parsing nDAG control message.\n");
485                        continue;
486                }
487        }
488
489        if (sock >= 0) {
490                close(sock);
491        }
492
493        /* Control channel has fallen over, should probably encourage libtrace
494         * to halt the receiver threads as well.
495         */
496        if (!is_halted(libtrace)) {
497                trace_interrupt();
498        }
499
500        pthread_exit(NULL);
501}
502
503static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
504{
505        int ret;
506        uint32_t i;
507        /* Configure the set of receiver threads */
508
509        if (FORMAT_DATA->receivers == NULL) {
510                /* What if the number of threads changes between a pause and
511                 * a restart? Can this happen? */
512                FORMAT_DATA->receivers = (recvstream_t *)
513                                malloc(sizeof(recvstream_t) * maxthreads);
514        }
515
516        for (i = 0; i < maxthreads; i++) {
517                FORMAT_DATA->receivers[i].sources = NULL;
518                FORMAT_DATA->receivers[i].sourcecount = 0;
519                FORMAT_DATA->receivers[i].knownmonitors = NULL;
520                FORMAT_DATA->receivers[i].monitorcount = 0;
521                FORMAT_DATA->receivers[i].threadindex = i;
522                FORMAT_DATA->receivers[i].dropped_upstream = 0;
523                FORMAT_DATA->receivers[i].received_packets = 0;
524                FORMAT_DATA->receivers[i].missing_records = 0;
525                FORMAT_DATA->receivers[i].maxfd = -1;
526
527                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
528                                sizeof(ndag_internal_message_t));
529        }
530
531        /* Start the controller thread */
532        /* TODO consider affinity of this thread? */
533
534        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
535                        ndag_controller_run, libtrace);
536        if (ret != 0) {
537                return -1;
538        }
539        return maxthreads;
540}
541
542static int ndag_start_input(libtrace_t *libtrace) {
543        return ndag_start_threads(libtrace, 1);
544}
545
546static int ndag_pstart_input(libtrace_t *libtrace) {
547        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
548                        libtrace->perpkt_thread_count)
549                return 0;
550        return -1;
551}
552
553static void halt_ndag_receiver(recvstream_t *receiver) {
554        int j, i;
555        libtrace_message_queue_destroy(&(receiver->mqueue));
556
557        if (receiver->sources == NULL)
558                return;
559        for (i = 0; i < receiver->sourcecount; i++) {
560                streamsock_t src = receiver->sources[i];
561                if (src.saved) {
562                        for (j = 0; j < ENCAP_BUFFERS; j++) {
563                                if (src.saved[j]) {
564                                        free(src.saved[j]);
565                                }
566                        }
567                        free(src.saved);
568                }
569
570#if HAVE_DECL_RECVMMSG
571                for (j = 0; j < RECV_BATCH_SIZE; j++) {
572                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
573                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
574                        }
575                }
576#else
577                free(src.singlemsg.msg_iov);
578#endif
579
580                if (src.sock != -1) {
581                        close(src.sock);
582                }
583        }
584        if (receiver->knownmonitors) {
585                free(receiver->knownmonitors);
586        }
587
588        if (receiver->sources) {
589                free(receiver->sources);
590        }
591}
592
593static int ndag_pause_input(libtrace_t *libtrace) {
594        int i;
595
596        /* Close the existing receiver sockets */
597        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
598               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
599        }
600        return 0;
601}
602
603static int ndag_fin_input(libtrace_t *libtrace) {
604
605        if (FORMAT_DATA->receivers) {
606                free(FORMAT_DATA->receivers);
607        }
608        if (FORMAT_DATA->multicastgroup) {
609                free(FORMAT_DATA->multicastgroup);
610        }
611        if (FORMAT_DATA->portstr) {
612                free(FORMAT_DATA->portstr);
613        }
614        if (FORMAT_DATA->localiface) {
615                free(FORMAT_DATA->localiface);
616        }
617
618        free(libtrace->format_data);
619        return 0;
620}
621
622static int ndag_get_framing_length(const libtrace_packet_t *packet) {
623
624        libtrace_t *libtrace = packet->trace;
625
626        if (FORMAT_DATA->consterfframing >= 0) {
627                return FORMAT_DATA->consterfframing;
628        }
629        return erf_get_framing_length(packet);
630}
631
632static int ndag_prepare_packet_stream(libtrace_t *restrict libtrace,
633                recvstream_t *restrict rt,
634                streamsock_t *restrict ssock,
635                libtrace_packet_t *restrict packet,
636                uint32_t flags UNUSED) {
637
638        /* XXX flags is constant, so we can tell the compiler to not
639         * bother copying over the parameter
640         */
641
642        dag_record_t *erfptr;
643        ndag_encap_t *encaphdr;
644        uint16_t ndag_reccount = 0;
645        int nr;
646        uint16_t rlen;
647
648        /*
649        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
650                packet->buf_control = TRACE_CTRL_PACKET;
651        } else {
652                packet->buf_control = TRACE_CTRL_EXTERNAL;
653        }
654        */
655        packet->buf_control = TRACE_CTRL_EXTERNAL;
656
657        packet->trace = libtrace;
658        packet->buffer = ssock->nextread;
659        packet->header = ssock->nextread;
660        packet->type = TRACE_RT_DATA_ERF;
661
662        erfptr = (dag_record_t *)packet->header;
663
664        if (erfptr->flags.rxerror == 1) {
665                packet->payload = NULL;
666                if (FORMAT_DATA->consterfframing >= 0) {
667                        erfptr->rlen = htons(FORMAT_DATA->consterfframing & 0xffff);
668                } else {
669                        erfptr->rlen = htons(erf_get_framing_length(packet));
670                }
671        } else {
672                if (FORMAT_DATA->consterfframing >= 0) {
673                        packet->payload = (char *)packet->buffer +
674                                FORMAT_DATA->consterfframing;
675                } else {
676                        packet->payload = (char *)packet->buffer +
677                                erf_get_framing_length(packet);
678                }
679        }
680
681        /* Update upstream drops using lctr */
682
683        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
684                /* TODO */
685        } else {
686                if (rt->received_packets > 0) {
687                        rt->dropped_upstream += ntohs(erfptr->lctr);
688                }
689        }
690
691        rt->received_packets ++;
692        ssock->recordcount += 1;
693
694        nr = ssock->nextreadind;
695        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
696                        sizeof(ndag_common_t));
697
698        ndag_reccount = ntohs(encaphdr->recordcount);
699        if ((ndag_reccount & 0x8000) != 0) {
700                /* Record was truncated -- update rlen appropriately */
701                rlen = ssock->savedsize[nr] -
702                                (ssock->nextread - ssock->saved[nr]);
703                erfptr->rlen = htons(rlen);
704        } else {
705                rlen = ntohs(erfptr->rlen);
706        }
707        ssock->nextread += rlen;
708        ssock->nextts = 0;
709
710        if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) {
711                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the "
712                        "nDAG receive buffer, probably due to a invalid rlen, in ndag_prepare_packet_stream()");
713                return -1;
714        }
715
716        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
717                /* Read everything from this buffer, mark as empty and
718                 * move on. */
719                ssock->savedsize[nr] = 0;
720                ssock->bufwaiting ++;
721
722                nr ++;
723                if (nr == ENCAP_BUFFERS) {
724                        nr = 0;
725                }
726                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
727                                sizeof(ndag_encap_t);
728                ssock->nextreadind = nr;
729        }
730
731        packet->order = erf_get_erf_timestamp(packet);
732        packet->error = rlen;
733        return rlen;
734}
735
736static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
737                libtrace_packet_t *packet UNUSED,
738                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
739                uint32_t flags UNUSED) {
740
741        fprintf(stderr, "Sending nDAG records over RT doesn't make sense! Please stop\n");
742        return 0;
743
744}
745
746static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
747
748        ndag_monitor_t *mon;
749
750        if (rt->monitorcount == 0) {
751                rt->knownmonitors = (ndag_monitor_t *)
752                                malloc(sizeof(ndag_monitor_t) * 5);
753        } else {
754                rt->knownmonitors = (ndag_monitor_t *)
755                            realloc(rt->knownmonitors,
756                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
757        }
758
759        mon = &(rt->knownmonitors[rt->monitorcount]);
760        mon->monitorid = monid;
761        mon->laststart = 0;
762
763        rt->monitorcount ++;
764        return mon;
765}
766
767static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
768
769        streamsock_t *ssock = NULL;
770        ndag_monitor_t *mon = NULL;
771        int i;
772
773        /* TODO consider replacing this with a list or vector so we can
774         * easily remove sources that are no longer in use, rather than
775         * just setting the sock to -1 and having to check them every
776         * time we want to read a packet.
777         */
778        if (rt->sourcecount == 0) {
779                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
780        } else if ((rt->sourcecount % 10) == 0) {
781                rt->sources = (streamsock_t *)realloc(rt->sources,
782                        sizeof(streamsock_t) * (rt->sourcecount + 10));
783        }
784
785        ssock = &(rt->sources[rt->sourcecount]);
786
787        for (i = 0; i < rt->monitorcount; i++) {
788                if (rt->knownmonitors[i].monitorid == src.monitor) {
789                        mon = &(rt->knownmonitors[i]);
790                        break;
791                }
792        }
793
794        if (mon == NULL) {
795                mon = add_new_knownmonitor(rt, src.monitor);
796        }
797
798        ssock->port = src.port;
799        ssock->groupaddr = src.groupaddr;
800        ssock->expectedseq = 0;
801        ssock->monitorptr = mon;
802        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
803        ssock->bufavail = ENCAP_BUFFERS;
804        ssock->bufwaiting = 0;
805        ssock->startidle = 0;
806        ssock->nextts = 0;
807
808        for (i = 0; i < ENCAP_BUFFERS; i++) {
809                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
810                ssock->savedsize[i] = 0;
811        }
812
813        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
814                        NULL, src.port, &(ssock->srcaddr));
815
816        if (ssock->sock < 0) {
817                return -1;
818        }
819
820        if (ssock->sock > rt->maxfd) {
821                rt->maxfd = ssock->sock;
822        }
823
824#if HAVE_DECL_RECVMMSG
825        for (i = 0; i < RECV_BATCH_SIZE; i++) {
826                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
827                                malloc(sizeof(struct iovec));
828                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
829                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
830                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
831                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
832                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
833                ssock->mmsgbufs[i].msg_len = 0;
834        }
835#else
836        ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec));
837#endif
838
839        ssock->nextread = NULL;;
840        ssock->nextreadind = 0;
841        ssock->nextwriteind = 0;
842        ssock->recordcount = 0;
843        rt->sourcecount += 1;
844
845        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
846                        ssock->groupaddr, ssock->port, rt->threadindex);
847
848        return ssock->port;
849}
850
851static int receiver_read_messages(recvstream_t *rt) {
852
853        ndag_internal_message_t msg;
854
855        while (libtrace_message_queue_try_get(&(rt->mqueue),
856                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
857                switch(msg.type) {
858                        case NDAG_CLIENT_NEWGROUP:
859                                if (add_new_streamsock(rt, msg.contents) < 0) {
860                                        return -1;
861                                }
862                                break;
863                        case NDAG_CLIENT_HALT:
864                                return 0;
865                }
866        }
867        return 1;
868
869}
870
871static inline int readable_data(streamsock_t *ssock) {
872
873        if (ssock->sock == -1) {
874                return 0;
875        }
876        if (ssock->savedsize[ssock->nextreadind] == 0) {
877                return 0;
878        }
879        /*
880        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
881                        ssock->savedsize[ssock->nextreadind]) {
882                return 0;
883        }
884        */
885        return 1;
886
887
888}
889
890static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
891
892        int i;
893        for (i = 0; i < rt->sourcecount; i++) {
894                if (rt->sources[i].monitorptr == mon) {
895                        rt->sources[i].expectedseq = 0;
896                }
897        }
898
899}
900
901static int init_receivers(streamsock_t *ssock, int required) {
902
903        int wind = ssock->nextwriteind;
904        int i = 1;
905
906#if HAVE_DECL_RECVMMSG
907        for (i = 0; i < required; i++) {
908                if (i >= RECV_BATCH_SIZE) {
909                        break;
910                }
911
912                if (wind >= ENCAP_BUFFERS) {
913                        wind = 0;
914                }
915
916                ssock->mmsgbufs[i].msg_len = 0;
917                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
918                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
919                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
920
921                wind ++;
922        }
923#else
924        if (required <= 0) {
925                fprintf(stderr, "You are required to have atleast 1 receiver in init_receivers\n");
926                return TRACE_ERR_INIT_FAILED;
927        }
928        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
929        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
930        ssock->singlemsg.msg_iovlen = 1;
931#endif
932        return i;
933}
934
935static int check_ndag_received(streamsock_t *ssock, int index,
936                unsigned int msglen, recvstream_t *rt) {
937
938        ndag_encap_t *encaphdr;
939        ndag_monitor_t *mon;
940        uint8_t rectype;
941
942        /* Check that we have a valid nDAG encap record */
943        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
944
945        if (rectype == NDAG_PKT_KEEPALIVE) {
946                /* Keep-alive, reset startidle and carry on. Don't
947                 * change nextwrite -- we want to overwrite the
948                 * keep-alive with usable content. */
949                return 0;
950        } else if (rectype != NDAG_PKT_ENCAPERF) {
951                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
952                                ssock->groupaddr, ssock->port);
953                close(ssock->sock);
954                ssock->sock = -1;
955                return -1;
956        }
957
958        ssock->savedsize[index] = msglen;
959        ssock->nextwriteind ++;
960        ssock->bufavail --;
961
962        if (ssock->bufavail < 0) {
963                fprintf(stderr, "No space in buffer in check_ndag_received()\n");
964                return -1;
965        }
966        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
967                ssock->nextwriteind = 0;
968        }
969
970        /* Get the useful info from the encap header */
971        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
972
973        mon = ssock->monitorptr;
974
975        if (mon->laststart == 0) {
976                mon->laststart = bswap_be_to_host64(encaphdr->started);
977        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
978                mon->laststart = bswap_be_to_host64(encaphdr->started);
979                reset_expected_seqs(rt, mon);
980
981                /* TODO what is a good way to indicate this to clients?
982                 * set the loss counter in the ERF header? a bit rude?
983                 * use another bit in the ERF header?
984                 * add a queryable flag to libtrace_packet_t?
985                 */
986
987        }
988
989        if (ssock->expectedseq != 0) {
990                rt->missing_records += seq_cmp(
991                                ntohl(encaphdr->seqno), ssock->expectedseq);
992
993        }
994        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
995        if (ssock->expectedseq == 0) {
996                ssock->expectedseq ++;
997        }
998
999        if (ssock->nextread == NULL) {
1000                /* If this is our first read, set up 'nextread'
1001                 * by skipping past the nDAG headers */
1002                ssock->nextread = ssock->saved[0] +
1003                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
1004        }
1005        return 1;
1006
1007}
1008
1009static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
1010                int *gottime, recvstream_t *rt) {
1011
1012        int ret, ndagstat, avail;
1013        int toret = 0;
1014
1015#if HAVE_DECL_RECVMMSG
1016        int i;
1017#endif
1018
1019        avail = init_receivers(ssock, ssock->bufavail);
1020
1021#if HAVE_DECL_RECVMMSG
1022        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
1023                        MSG_DONTWAIT, NULL);
1024#else
1025        if (avail != 1) {
1026                return 0;
1027        }
1028
1029        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
1030#endif
1031        if (ret < 0) {
1032                /* Nothing to receive right now, but we should still
1033                 * count as 'ready' if at least one buffer is full */
1034                if (errno == EAGAIN || errno == EWOULDBLOCK) {
1035                        if (readable_data(ssock)) {
1036                                toret = 1;
1037                        }
1038                        if (!(*gottime)) {
1039                                gettimeofday(tv, NULL);
1040                                *gottime = 1;
1041                        }
1042                        if (ssock->startidle == 0) {
1043                                ssock->startidle = tv->tv_sec;
1044                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
1045                                fprintf(stderr,
1046                                        "Closing channel %s:%u due to inactivity.\n",
1047                                        ssock->groupaddr,
1048                                        ssock->port);
1049
1050                                close(ssock->sock);
1051                                ssock->sock = -1;
1052                        }
1053                } else {
1054
1055                        fprintf(stderr,
1056                                "Error receiving encapsulated records from %s:%u -- %s \n",
1057                                ssock->groupaddr, ssock->port,
1058                                strerror(errno));
1059                        close(ssock->sock);
1060                        ssock->sock = -1;
1061                }
1062                return toret;
1063        }
1064
1065        ssock->startidle = 0;
1066
1067#if HAVE_DECL_RECVMMSG
1068        for (i = 0; i < ret; i++) {
1069                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
1070                                ssock->mmsgbufs[i].msg_len, rt);
1071                if (ndagstat == -1) {
1072                        break;
1073                }
1074
1075                if (ndagstat == 1) {
1076                        toret = 1;
1077                }
1078        }
1079#else
1080        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1081        if (ndagstat <= 0) {
1082                toret = 0;
1083        } else {
1084                toret = 1;
1085        }
1086#endif
1087
1088        return toret;
1089}
1090
1091static int receive_from_sockets(recvstream_t *rt) {
1092
1093        int i, readybufs, gottime;
1094        struct timeval tv;
1095        fd_set fds;
1096        int maxfd = 0;
1097        struct timeval zerotv;
1098
1099        readybufs = 0;
1100        gottime = 0;
1101
1102        if (rt->maxfd == -1) {
1103                return 0;
1104        }
1105
1106        for (i = 0; i < rt->sourcecount; i++) {
1107                if (rt->sources[i].sock == -1) {
1108                        continue;
1109                }
1110
1111#if HAVE_DECL_RECVMMSG
1112                /* Plenty of full buffers, just use the packets in those */
1113                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
1114                        readybufs ++;
1115                        continue;
1116                }
1117#else
1118                if (rt->sources[i].bufavail == 0) {
1119                        readybufs ++;
1120                        continue;
1121                }
1122#endif
1123                if (maxfd == 0) {
1124                        FD_ZERO(&fds);
1125                }
1126                FD_SET(rt->sources[i].sock, &fds);
1127                if (maxfd < rt->sources[i].sock) {
1128                        maxfd = rt->sources[i].sock;
1129                }
1130        }
1131
1132
1133        if (maxfd <= 0) {
1134                return readybufs;
1135        }
1136
1137        zerotv.tv_sec = 0;
1138        zerotv.tv_usec = 0;
1139        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
1140                /* log the error? XXX */
1141                return -1;
1142        }
1143
1144        for (i = 0; i < rt->sourcecount; i++) {
1145                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
1146                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
1147                                readybufs ++;
1148                        }
1149                        continue;
1150                }
1151                readybufs += receive_from_single_socket(&(rt->sources[i]),
1152                                &tv, &gottime, rt);
1153        }
1154
1155        return readybufs;
1156
1157}
1158
1159
1160static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1161                libtrace_packet_t *packet) {
1162
1163        int iserr = 0;
1164
1165        if (packet->buf_control == TRACE_CTRL_PACKET) {
1166                free(packet->buffer);
1167                packet->buffer = NULL;
1168        }
1169
1170        do {
1171                /* Make sure we shouldn't be halting */
1172                if ((iserr = is_halted(libtrace)) != -1) {
1173                        return iserr;
1174                }
1175
1176                /* Check for any messages from the control thread */
1177                iserr = receiver_read_messages(rt);
1178
1179                if (iserr <= 0) {
1180                        return iserr;
1181                }
1182
1183                /* If blocking and no sources, sleep for a bit and then try
1184                 * checking for messages again.
1185                 */
1186                if (rt->sourcecount == 0) {
1187                        usleep(10000);
1188                        continue;
1189                }
1190
1191                if ((iserr = receive_from_sockets(rt)) < 0) {
1192                        return iserr;
1193                } else if (iserr > 0) {
1194                        /* At least one of our input sockets has available
1195                         * data, let's go ahead and use what we have. */
1196                        break;
1197                }
1198
1199                /* None of our sources have anything available, we can take
1200                 * a short break rather than immediately trying again.
1201                 */
1202                if (iserr == 0) {
1203                        usleep(100);
1204                }
1205
1206        } while (1);
1207
1208        return iserr;
1209}
1210
1211static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1212                libtrace_packet_t *packet) {
1213
1214        int iserr = 0;
1215
1216        if (packet->buf_control == TRACE_CTRL_PACKET) {
1217                free(packet->buffer);
1218                packet->buffer = NULL;
1219        }
1220
1221        /* Make sure we shouldn't be halting */
1222        if ((iserr = is_halted(libtrace)) != -1) {
1223                return iserr;
1224        }
1225
1226        /* If non-blocking and there are no sources, just break */
1227        if (rt->sourcecount == 0) {
1228                return 0;
1229        }
1230
1231        return receive_from_sockets(rt);
1232}
1233
1234static streamsock_t *select_next_packet(recvstream_t *rt) {
1235        int i;
1236        streamsock_t *ssock = NULL;
1237        uint64_t earliest = 0;
1238        uint64_t currentts = 0;
1239        dag_record_t *daghdr;
1240
1241        /* If we only have one source, then no need to do any
1242         * timestamp parsing or byteswapping.
1243         */
1244        if (rt->sourcecount == 1) {
1245                if (readable_data(&(rt->sources[0]))) {
1246                        return &(rt->sources[0]);
1247                }
1248                return NULL;
1249        }
1250
1251
1252        for (i = 0; i < rt->sourcecount; i ++) {
1253                if (!readable_data(&(rt->sources[i]))) {
1254                        continue;
1255                }
1256
1257                if (rt->sources[i].nextts == 0) {
1258                        daghdr = (dag_record_t *)(rt->sources[i].nextread);
1259                        currentts = bswap_le_to_host64(daghdr->ts);
1260                        rt->sources[i].nextts = currentts;
1261                } else {
1262                        currentts = rt->sources[i].nextts;
1263                }
1264
1265                if (earliest == 0 || earliest > currentts) {
1266                        earliest = currentts;
1267                        ssock = &(rt->sources[i]);
1268                }
1269        }
1270        return ssock;
1271}
1272
1273static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1274
1275        int rem, ret;
1276        streamsock_t *nextavail = NULL;
1277        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1278                        packet);
1279
1280        if (rem <= 0) {
1281                return rem;
1282        }
1283
1284        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1285        if (nextavail == NULL) {
1286                return 0;
1287        }
1288
1289        /* nextread should point at an ERF header, so prepare 'packet' to be
1290         * a libtrace ERF packet. */
1291
1292        ret = ndag_prepare_packet_stream(libtrace,
1293                        &(FORMAT_DATA->receivers[0]), nextavail,
1294                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1295        nextavail->bufavail += nextavail->bufwaiting;
1296        nextavail->bufwaiting = 0;
1297        return ret;
1298}
1299
1300static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1301                libtrace_packet_t **packets, size_t nb_packets) {
1302
1303        recvstream_t *rt;
1304        int rem, i;
1305        size_t read_packets = 0;
1306        streamsock_t *nextavail = NULL;
1307
1308        rt = (recvstream_t *)t->format_data;
1309
1310        do {
1311                /* Only check for messages once per batch */
1312                if (read_packets == 0) {
1313                        rem = receive_encap_records_block(libtrace, rt,
1314                                packets[read_packets]);
1315                        if (rem < 0) {
1316                                return rem;
1317                        }
1318
1319                        if (rem == 0) {
1320                                break;
1321                        }
1322                }
1323
1324                nextavail = select_next_packet(rt);
1325                if (nextavail == NULL) {
1326                        break;
1327                }
1328
1329                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1330                                packets[read_packets],
1331                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1332
1333                read_packets  ++;
1334                if (read_packets >= nb_packets) {
1335                        break;
1336                }
1337        } while (1);
1338
1339        for (i = 0; i < rt->sourcecount; i++) {
1340                streamsock_t *src = &(rt->sources[i]);
1341                src->bufavail += src->bufwaiting;
1342                src->bufwaiting = 0;
1343                if (src->bufavail < 0 || src->bufavail > ENCAP_BUFFERS) {
1344                        trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in ndag_pread_packets()");
1345                        return -1;
1346                }
1347        }
1348
1349        return read_packets;
1350
1351}
1352
1353static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1354                libtrace_packet_t *packet) {
1355
1356
1357        libtrace_eventobj_t event = {0,0,0.0,0};
1358        int rem, i;
1359        streamsock_t *nextavail = NULL;
1360
1361        /* Only check for messages once per call */
1362        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1363        if (rem <= 0) {
1364                event.type = TRACE_EVENT_TERMINATE;
1365                return event;
1366        }
1367
1368        do {
1369                rem = receive_encap_records_nonblock(libtrace,
1370                                &(FORMAT_DATA->receivers[0]), packet);
1371
1372                if (rem < 0) {
1373                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1374                                "Received invalid nDAG records.");
1375                        event.type = TRACE_EVENT_TERMINATE;
1376                        break;
1377                }
1378
1379                if (rem == 0) {
1380                        /* Either we've been halted or we've got no packets
1381                         * right now. */
1382                        if (is_halted(libtrace) == 0) {
1383                                event.type = TRACE_EVENT_TERMINATE;
1384                                break;
1385                        }
1386                        event.type = TRACE_EVENT_SLEEP;
1387                        event.seconds = 0.0001;
1388                        break;
1389                }
1390
1391                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1392                if (nextavail == NULL) {
1393                        event.type = TRACE_EVENT_SLEEP;
1394                        event.seconds = 0.0001;
1395                        break;
1396                }
1397
1398                event.type = TRACE_EVENT_PACKET;
1399                ndag_prepare_packet_stream(libtrace,
1400                                &(FORMAT_DATA->receivers[0]), nextavail,
1401                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1402                event.size = trace_get_capture_length(packet) +
1403                                trace_get_framing_length(packet);
1404
1405                if (libtrace->filter) {
1406                        int filtret = trace_apply_filter(libtrace->filter,
1407                                        packet);
1408                        if (filtret == -1) {
1409                                trace_set_err(libtrace,
1410                                                TRACE_ERR_BAD_FILTER,
1411                                                "Bad BPF Filter");
1412                                event.type = TRACE_EVENT_TERMINATE;
1413                                break;
1414                        }
1415
1416                        if (filtret == 0) {
1417                                /* Didn't match filter, try next one */
1418                                libtrace->filtered_packets ++;
1419                                trace_clear_cache(packet);
1420                                continue;
1421                        }
1422                }
1423
1424                if (libtrace->snaplen > 0) {
1425                        trace_set_capture_length(packet, libtrace->snaplen);
1426                }
1427                libtrace->accepted_packets ++;
1428                break;
1429        } while (1);
1430
1431        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
1432                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
1433                src->bufavail += src->bufwaiting;
1434                src->bufwaiting = 0;
1435                if (src->bufavail < 0 || src->bufavail > ENCAP_BUFFERS) {
1436                        trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in trace_event_ndag()");
1437                        break;
1438                }
1439        }
1440
1441        return event;
1442}
1443
1444static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1445
1446        int i;
1447
1448        stat->dropped_valid = 1;
1449        stat->dropped = 0;
1450        stat->received_valid = 1;
1451        stat->received = 0;
1452        stat->missing_valid = 1;
1453        stat->missing = 0;
1454
1455        /* TODO Is this thread safe? */
1456        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1457                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1458                stat->received += FORMAT_DATA->receivers[i].received_packets;
1459                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1460        }
1461
1462}
1463
1464static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1465                libtrace_stat_t *stat) {
1466
1467        recvstream_t *recvr = (recvstream_t *)t->format_data;
1468
1469        if (libtrace == NULL)
1470                return;
1471        /* TODO Is this thread safe */
1472        stat->dropped_valid = 1;
1473        stat->dropped = recvr->dropped_upstream;
1474
1475        stat->received_valid = 1;
1476        stat->received = recvr->received_packets;
1477
1478        stat->missing_valid = 1;
1479        stat->missing = recvr->missing_records;
1480
1481}
1482
1483static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1484                bool reader) {
1485        recvstream_t *recvr;
1486
1487        if (!reader || t->type != THREAD_PERPKT) {
1488                return 0;
1489        }
1490
1491        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1492        t->format_data = recvr;
1493
1494        return 0;
1495}
1496
1497static struct libtrace_format_t ndag = {
1498
1499        "ndag",
1500        "",
1501        TRACE_FORMAT_NDAG,
1502        NULL,                   /* probe filename */
1503        NULL,                   /* probe magic */
1504        ndag_init_input,        /* init_input */
1505        ndag_config_input,      /* config_input */
1506        ndag_start_input,       /* start_input */
1507        ndag_pause_input,       /* pause_input */
1508        NULL,                   /* init_output */
1509        NULL,                   /* config_output */
1510        NULL,                   /* start_output */
1511        ndag_fin_input,         /* fin_input */
1512        NULL,                   /* fin_output */
1513        ndag_read_packet,       /* read_packet */
1514        ndag_prepare_packet,    /* prepare_packet */
1515        NULL,                   /* fin_packet */
1516        NULL,                   /* write_packet */
1517        NULL,                   /* flush_output */
1518        erf_get_link_type,      /* get_link_type */
1519        erf_get_direction,      /* get_direction */
1520        erf_set_direction,      /* set_direction */
1521        erf_get_erf_timestamp,  /* get_erf_timestamp */
1522        NULL,                   /* get_timeval */
1523        NULL,                   /* get_seconds */
1524        NULL,                   /* get_timespec */
1525        NULL,                   /* get_meta_section */
1526        NULL,                   /* get_meta_section_item */
1527        NULL,                   /* seek_erf */
1528        NULL,                   /* seek_timeval */
1529        NULL,                   /* seek_seconds */
1530        erf_get_capture_length, /* get_capture_length */
1531        erf_get_wire_length,    /* get_wire_length */
1532        ndag_get_framing_length, /* get_framing_length */
1533        erf_set_capture_length, /* set_capture_length */
1534        NULL,                   /* get_received_packets */
1535        NULL,                   /* get_filtered_packets */
1536        NULL,                   /* get_dropped_packets */
1537        ndag_get_statistics,    /* get_statistics */
1538        NULL,                   /* get_fd */
1539        trace_event_ndag,       /* trace_event */
1540        NULL,                   /* help */
1541        NULL,                   /* next pointer */
1542        {true, 0},              /* live packet capture */
1543        ndag_pstart_input,      /* parallel start */
1544        ndag_pread_packets,     /* parallel read */
1545        ndag_pause_input,       /* parallel pause */
1546        NULL,
1547        ndag_pregister_thread,  /* register thread */
1548        NULL,
1549        ndag_get_thread_stats   /* per-thread stats */
1550};
1551
1552void ndag_constructor(void) {
1553        register_format(&ndag);
1554}
Note: See TracBrowser for help on using the repository browser.