source: lib/format_ndag.c @ 8fa0167

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 8fa0167 was 8fa0167, checked in by Shane Alcock <salcock@…>, 3 years ago

Fix bad placement of FD_ZERO.

Also increment 'missing' by the total sequence number gap, not
1 each time the gap is non-zero.

  • Property mode set to 100644
File size: 35.9 KB
Line 
1
2#define _GNU_SOURCE
3
4#include "config.h"
5#include "common.h"
6#include "libtrace.h"
7#include "libtrace_int.h"
8#include "format_helper.h"
9#include "format_erf.h"
10
11#include <assert.h>
12#include <errno.h>
13#include <fcntl.h>
14#include <stdio.h>
15#include <string.h>
16#include <unistd.h>
17#include <stdlib.h>
18#include <net/if.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netdb.h>
22
23#include "format_ndag.h"
24
25#define ENCAP_BUFSIZE (10000)
26#define CTRL_BUF_SIZE (10000)
27
28#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
29
30static struct libtrace_format_t ndag;
31
32volatile int ndag_paused = 0;
33
34typedef struct streamsource {
35        uint16_t monitor;
36        char *groupaddr;
37        char *localiface;
38        uint16_t port;
39} streamsource_t;
40
41typedef struct streamsock {
42        char *groupaddr;
43        int sock;
44        struct addrinfo *srcaddr;
45        uint16_t port;
46        uint32_t expectedseq;
47        uint16_t monitorid;
48
49        char *saved;
50        char *nextread;
51        int savedsize;
52        ndag_encap_t *encaphdr;
53        uint64_t recordcount;
54
55} streamsock_t;
56
57typedef struct recvstream {
58        streamsock_t *sources;
59        uint16_t sourcecount;
60        libtrace_message_queue_t mqueue;
61        int threadindex;
62
63        uint64_t dropped_upstream;
64        uint64_t missing_records;
65        uint64_t received_packets;
66} recvstream_t;
67
68typedef struct ndag_format_data {
69        char *multicastgroup;
70        char *portstr;
71        char *localiface;
72        uint16_t nextthreadid;
73        recvstream_t *receivers;
74
75        pthread_t controlthread;
76        libtrace_message_queue_t controlqueue;
77} ndag_format_data_t;
78
79enum {
80        NDAG_CLIENT_HALT = 0x01,
81        NDAG_CLIENT_RESTARTED = 0x02,
82        NDAG_CLIENT_NEWGROUP = 0x03
83};
84
85typedef struct ndagreadermessage {
86        uint8_t type;
87        streamsource_t contents;
88} ndag_internal_message_t;
89
90
91static inline uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
92        ndag_common_t *header = (ndag_common_t *)msgbuf;
93
94        if (msgsize < sizeof(ndag_common_t)) {
95                fprintf(stderr,
96                        "nDAG message does not have a complete nDAG header.\n");
97                return 0;
98        }
99
100        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
101                fprintf(stderr,
102                        "nDAG message does not have a valid magic number.\n");
103                return 0;
104        }
105
106        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
107                fprintf(stderr,
108                        "nDAG message has an invalid header version: %u\n",
109                                header->version);
110                return 0;
111        }
112
113        return header->type;
114}
115
116static int join_multicast_group(char *groupaddr, char *localiface,
117        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
118
119        struct addrinfo hints;
120        struct addrinfo *gotten;
121        struct addrinfo *group;
122        unsigned int interface;
123        char pstr[16];
124        struct group_req greq;
125
126        int sock;
127
128        if (portstr == NULL) {
129                snprintf(pstr, 15, "%u", portnum);
130                portstr = pstr;
131        }
132
133        interface = if_nametoindex(localiface);
134        if (interface == 0) {
135                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
136                                localiface, strerror(errno));
137                return -1;
138        }
139
140        hints.ai_family = PF_UNSPEC;
141        hints.ai_socktype = SOCK_DGRAM;
142        hints.ai_flags = AI_PASSIVE;
143        hints.ai_protocol = 0;
144
145        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
146                fprintf(stderr,
147                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
148                                portstr, strerror(errno));
149                return -1;
150        }
151
152        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
153                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
154                                groupaddr, strerror(errno));
155                return -1;
156        }
157
158        *srcinfo = gotten;
159        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
160        if (sock < 0) {
161                fprintf(stderr,
162                        "Failed to create multicast socket for %s:%s -- %s\n",
163                                groupaddr, portstr, strerror(errno));
164                goto sockcreateover;
165        }
166
167        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
168        {
169                fprintf(stderr,
170                        "Failed to bind to multicast socket %s:%s -- %s\n",
171                                groupaddr, portstr, strerror(errno));
172                close(sock);
173                sock = -1;
174                goto sockcreateover;
175        }
176
177        greq.gr_interface = interface;
178        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
179
180        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
181                        sizeof(greq)) < 0) {
182                fprintf(stderr,
183                        "Failed to join multicast group %s:%s -- %s\n",
184                                groupaddr, portstr, strerror(errno));
185                close(sock);
186                sock = -1;
187                goto sockcreateover;
188        }
189
190sockcreateover:
191        freeaddrinfo(group);
192        return sock;
193}
194
195
196static int ndag_init_input(libtrace_t *libtrace) {
197
198        char *scan = NULL;
199        char *next = NULL;
200
201        libtrace->format_data = (ndag_format_data_t *)malloc(
202                        sizeof(ndag_format_data_t));
203
204        FORMAT_DATA->multicastgroup = NULL;
205        FORMAT_DATA->portstr = NULL;
206        FORMAT_DATA->localiface = NULL;
207        FORMAT_DATA->nextthreadid = 0;
208        FORMAT_DATA->receivers = NULL;
209
210        scan = strchr(libtrace->uridata, ',');
211        if (scan == NULL) {
212                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
213                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
214                return -1;
215        }
216        FORMAT_DATA->localiface = strndup(libtrace->uridata,
217                        (size_t)(scan - libtrace->uridata));
218        next = scan + 1;
219
220        scan = strchr(next, ',');
221        if (scan == NULL) {
222                FORMAT_DATA->portstr = strdup("9001");
223                FORMAT_DATA->multicastgroup = strdup(next);
224        } else {
225                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
226
227                FORMAT_DATA->portstr = strdup(scan + 1);
228        }
229        return 0;
230}
231
232static inline void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
233                uint16_t portnum, uint16_t monid) {
234
235        ndag_internal_message_t alert;
236
237        alert.type = NDAG_CLIENT_NEWGROUP;
238        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
239        alert.contents.localiface = FORMAT_DATA->localiface;
240        alert.contents.port = portnum;
241        alert.contents.monitor = monid;
242
243        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
244                        (void *)&alert);
245
246}
247       
248static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
249                int msgsize, uint16_t *ptmap) {
250
251        int i;
252        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
253        uint8_t msgtype;
254
255        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
256        if (msgtype == 0) {
257                return -1;
258        }
259
260        msgsize -= sizeof(ndag_common_t);
261        if (msgtype == NDAG_PKT_BEACON) {
262                /* If message is a beacon, make sure every port included in the
263                 * beacon is assigned to a receive thread.
264                 */
265                uint16_t *ptr, numstreams;
266
267                if ((uint32_t)msgsize < sizeof(uint16_t)) {
268                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
269                        return -1;
270                }
271
272                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
273                numstreams = ntohs(*ptr);
274                ptr ++;
275
276                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
277                {
278                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
279                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
280                        return -1;
281                }
282
283                for (i = 0; i < numstreams; i++) {
284                        uint16_t streamport = ntohs(*ptr);
285
286                        if (ptmap[streamport] == 0xffff) {
287                                new_group_alert(libtrace,
288                                        FORMAT_DATA->nextthreadid, streamport,
289                                        ntohs(ndaghdr->monitorid));
290
291                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
292
293                                if (libtrace->perpkt_thread_count == 0) {
294                                        FORMAT_DATA->nextthreadid = 0;
295                                } else {
296                                        FORMAT_DATA->nextthreadid =
297                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
298                                }
299                        }
300
301                        ptr ++;
302                }
303        } else if (msgtype == NDAG_PKT_RESTARTED) {
304                /* If message is a restart, push that to all active message
305                 * queues. */
306                ndag_internal_message_t alert;
307                alert.type = NDAG_CLIENT_RESTARTED;
308                alert.contents.monitor = ntohs(ndaghdr->monitorid);
309                alert.contents.groupaddr = NULL;
310                alert.contents.localiface = NULL;
311                alert.contents.port = 0;
312                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
313                        libtrace_message_queue_put(
314                                        &(FORMAT_DATA->receivers[i].mqueue),
315                                        (void *)&alert);
316                }
317        } else {
318                fprintf(stderr,
319                        "Unexpected message type on control channel: %u\n",
320                         msgtype);
321                return -1;
322        }
323
324        return 0;
325
326}
327
328static void *ndag_controller_run(void *tdata) {
329
330        libtrace_t *libtrace = (libtrace_t *)tdata;
331        uint16_t ptmap[65536];
332        int sock = -1;
333        struct addrinfo *receiveaddr = NULL;
334        fd_set listening;
335        struct timeval timeout;
336
337        /* ptmap is a dirty hack to allow us to quickly check if we've already
338         * assigned a stream to a thread.
339         */
340        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
341
342        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
343                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
344                        &receiveaddr);
345        if (sock == -1) {
346                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
347                        "Unable to join multicast group for nDAG control channel");
348                trace_interrupt();
349        }
350
351        ndag_paused = 0;
352        while ((is_halted(libtrace) == -1) && !ndag_paused) {
353                int ret;
354                char buf[CTRL_BUF_SIZE];
355
356                FD_ZERO(&listening);
357                FD_SET(sock, &listening);
358
359                timeout.tv_sec = 0;
360                timeout.tv_usec = 500000;
361
362                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
363                if (ret < 0) {
364                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
365                        break;
366                }
367
368                if (!FD_ISSET(sock, &listening)) {
369                        continue;
370                }
371
372                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
373                                receiveaddr->ai_addr,
374                                &(receiveaddr->ai_addrlen));
375                if (ret < 0) {
376                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
377                        break;
378                }
379
380                if (ret == 0) {
381                        break;
382                }
383
384                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
385                        fprintf(stderr, "Error while parsing nDAG control message.\n");
386                        continue;
387                }
388        }
389
390        if (sock >= 0) {
391                close(sock);
392        }
393
394        /* Control channel has fallen over, should probably encourage libtrace
395         * to halt the receiver threads as well.
396         */
397        if (!is_halted(libtrace)) {
398                trace_interrupt();
399        }
400
401        pthread_exit(NULL);
402}
403
404static inline int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
405{
406        int ret;
407        uint32_t i;
408        /* Configure the set of receiver threads */
409
410        if (FORMAT_DATA->receivers == NULL) {
411                /* What if the number of threads changes between a pause and
412                 * a restart? Can this happen? */
413                FORMAT_DATA->receivers = (recvstream_t *)
414                                malloc(sizeof(recvstream_t) * maxthreads);
415        }
416
417        for (i = 0; i < maxthreads; i++) {
418                FORMAT_DATA->receivers[i].sources = NULL;
419                FORMAT_DATA->receivers[i].sourcecount = 0;
420                FORMAT_DATA->receivers[i].threadindex = i;
421                FORMAT_DATA->receivers[i].dropped_upstream = 0;
422                FORMAT_DATA->receivers[i].received_packets = 0;
423                FORMAT_DATA->receivers[i].missing_records = 0;
424
425                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
426                                sizeof(ndag_internal_message_t));
427        }
428
429        /* Start the controller thread */
430        /* TODO consider affinity of this thread? */
431
432        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
433                        ndag_controller_run, libtrace);
434        if (ret != 0) {
435                return -1;
436        }
437        return maxthreads;
438}
439
440static int ndag_start_input(libtrace_t *libtrace) {
441        return ndag_start_threads(libtrace, 1);
442}
443
444static int ndag_pstart_input(libtrace_t *libtrace) {
445        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
446                        libtrace->perpkt_thread_count)
447                return 0;
448        return -1;
449}
450
451static void halt_ndag_receiver(recvstream_t *receiver) {
452        int i;
453        libtrace_message_queue_destroy(&(receiver->mqueue));
454
455        if (receiver->sources == NULL)
456                return;
457        for (i = 0; i < receiver->sourcecount; i++) {
458                streamsock_t src = receiver->sources[i];
459                if (src.saved) {
460                        free(src.saved);
461                }
462                close(src.sock);
463        }
464        if (receiver->sources) {
465                free(receiver->sources);
466        }
467}
468
469static int ndag_pause_input(libtrace_t *libtrace) {
470        int i;
471
472        /* Close the existing receiver sockets */
473        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
474               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
475        }
476        return 0;
477}
478
479static int ndag_fin_input(libtrace_t *libtrace) {
480
481        if (FORMAT_DATA->receivers) {
482                free(FORMAT_DATA->receivers);
483        }
484        if (FORMAT_DATA->multicastgroup) {
485                free(FORMAT_DATA->multicastgroup);
486        }
487        if (FORMAT_DATA->portstr) {
488                free(FORMAT_DATA->portstr);
489        }
490        if (FORMAT_DATA->localiface) {
491                free(FORMAT_DATA->localiface);
492        }
493
494        free(libtrace->format_data);
495        return 0;
496}
497
498static int ndag_prepare_packet_stream(libtrace_t *libtrace,
499                recvstream_t *rt,
500                streamsock_t *ssock, libtrace_packet_t *packet,
501                uint32_t flags) {
502
503        dag_record_t *erfptr;
504        uint16_t ndag_reccount = 0;
505
506        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
507                packet->buf_control = TRACE_CTRL_PACKET;
508        } else {
509                packet->buf_control = TRACE_CTRL_EXTERNAL;
510        }
511
512        packet->trace = libtrace;
513        packet->buffer = ssock->nextread;
514        packet->header = ssock->nextread;
515        packet->type = TRACE_RT_DATA_ERF;
516
517        erfptr = (dag_record_t *)packet->header;
518
519        if (erfptr->flags.rxerror == 1) {
520                packet->payload = NULL;
521                erfptr->rlen = htons(erf_get_framing_length(packet));
522        } else {
523                packet->payload = (char *)packet->buffer +
524                                erf_get_framing_length(packet);
525        }
526
527        /* Update upstream drops using lctr */
528
529        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
530                /* TODO */
531        } else {
532                if (rt->received_packets > 0) {
533                        rt->dropped_upstream += ntohs(erfptr->lctr);
534                }
535        }
536
537        rt->received_packets ++;
538        ssock->recordcount += 1;
539
540        ndag_reccount = ntohs(ssock->encaphdr->recordcount);
541        if ((ndag_reccount & 0x8000) != 0) {
542                /* Record was truncated -- update rlen appropriately */
543                erfptr->rlen = htons(ssock->savedsize -
544                                (ssock->nextread - ssock->saved));
545        }
546        ssock->nextread += ntohs(erfptr->rlen);
547
548        packet->order = erf_get_erf_timestamp(packet);
549        packet->error = packet->payload ? ntohs(erfptr->rlen) :
550                        erf_get_framing_length(packet);
551
552        return ntohs(erfptr->rlen);
553}
554
555static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
556                libtrace_packet_t *packet UNUSED,
557                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
558                uint32_t flags UNUSED) {
559
560        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
561        return 0;
562
563}
564
565static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
566
567        streamsock_t *ssock = NULL;
568
569        if (rt->sourcecount == 0) {
570                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
571        } else if ((rt->sourcecount % 10) == 0) {
572                rt->sources = (streamsock_t *)realloc(rt->sources,
573                        sizeof(streamsock_t) * (rt->sourcecount + 10));
574        }
575
576        ssock = &(rt->sources[rt->sourcecount]);
577
578        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
579                        NULL, src.port, &(ssock->srcaddr));
580
581        if (ssock->sock < 0) {
582                return -1;
583        }
584
585        ssock->port = src.port;
586        ssock->groupaddr = src.groupaddr;
587        ssock->expectedseq = 0;
588        ssock->monitorid = src.monitor;
589        ssock->saved = (char *)malloc(ENCAP_BUFSIZE);
590        ssock->nextread = ssock->saved;
591        ssock->savedsize = 0;
592        ssock->encaphdr = NULL;
593        ssock->recordcount = 0;
594        rt->sourcecount += 1;
595
596        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
597                        ssock->groupaddr, ssock->port, rt->threadindex);
598
599        return ssock->port;
600}
601
602static int receiver_read_messages(recvstream_t *rt) {
603
604        ndag_internal_message_t msg;
605
606        while (libtrace_message_queue_try_get(&(rt->mqueue),
607                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
608                switch(msg.type) {
609                        case NDAG_CLIENT_NEWGROUP:
610                                if (add_new_streamsock(rt, msg.contents) < 0) {
611                                        return -1;
612                                }
613                                break;
614                        case NDAG_CLIENT_RESTARTED:
615                                /* TODO */
616
617                                break;
618                        case NDAG_CLIENT_HALT:
619                                return 0;
620                }
621        }
622        return 1;
623
624}
625
626static inline int read_required(streamsock_t ssock) {
627        if (ssock.sock == -1)
628                return 0;
629        if (ssock.savedsize == 0)
630                return 1;
631        if (ssock.nextread - ssock.saved >= ssock.savedsize)
632                return 1;
633        return 0;
634}
635
636
637static int receive_from_sockets(recvstream_t *rt) {
638
639        fd_set allgroups;
640        int maxfd, i, ret, readybufs, availsocks;
641        struct timeval timeout;
642
643        maxfd = 0;
644        timeout.tv_sec = 0;
645        timeout.tv_usec = 10000;
646
647        /* TODO maybe need a way to tidy up "dead" sockets and signal back
648         * to the control thread that we've killed the socket for a particular
649         * port.
650         */
651
652        readybufs = 0;
653        availsocks = 0;
654
655        FD_ZERO(&allgroups);
656        for (i = 0; i < rt->sourcecount; i ++) {
657                if (read_required(rt->sources[i])) {
658                        FD_SET(rt->sources[i].sock, &allgroups);
659                        if (rt->sources[i].sock > maxfd) {
660                                maxfd = rt->sources[i].sock;
661                        }
662                        availsocks += 1;
663                } else if (rt->sources[i].sock != -1) {
664                        readybufs += 1;
665                        availsocks += 1;
666                }
667        }
668
669        /* If all of our sockets already have data sitting in their
670         * buffers then we can save ourselves some 'select'ing.
671         */
672        if (availsocks == readybufs) {
673                return readybufs;
674        }
675
676        /* Otherwise, at least one active socket has an empty buffer so
677         * we better try to read some data from those sockets (just in
678         * case the correct 'next' packet is waiting on one of those
679         * sockets.
680         */
681        if (select(maxfd + 1, &allgroups, NULL, NULL, &timeout) < 0) {
682                fprintf(stderr, "Error waiting to receive records: %s\n",
683                                strerror(errno));
684                return -1;
685        }
686
687
688        for (i = 0; i < rt->sourcecount; i ++) {
689                if (!read_required(rt->sources[i])) {
690                        if (rt->sources[i].sock != -1) {
691                                readybufs ++;
692                        }
693                        continue;
694                }
695
696                rt->sources[i].savedsize = 0;
697                rt->sources[i].nextread = rt->sources[i].saved;
698
699                if (!FD_ISSET(rt->sources[i].sock, &allgroups)) {
700                        continue;
701                }
702
703                ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved,
704                                ENCAP_BUFSIZE, 0,
705                                rt->sources[i].srcaddr->ai_addr,
706                                &(rt->sources[i].srcaddr->ai_addrlen));
707                if (ret < 0) {
708                        fprintf(stderr,
709                                        "Error receiving encapsulated records from %s:%u -- %s\n",
710                                        rt->sources[i].groupaddr,
711                                        rt->sources[i].port,
712                                        strerror(errno));
713                        close(rt->sources[i].sock);
714                        rt->sources[i].sock = -1;
715                        continue;
716                }
717
718                if (ret == 0) {
719                        fprintf(stderr, "Received zero bytes on the channel for %s:%u.\n",
720                                        rt->sources[i].groupaddr,
721                                        rt->sources[i].port);
722                        close(rt->sources[i].sock);
723                        rt->sources[i].sock = -1;
724                        continue;
725                }
726
727                rt->sources[i].savedsize = ret;
728
729                /* Check that we have a valid nDAG encap record */
730                if (check_ndag_header(rt->sources[i].saved, ret) !=
731                                        NDAG_PKT_ENCAPERF) {
732                        fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
733                                        rt->sources[i].groupaddr,
734                                        rt->sources[i].port);
735                        close(rt->sources[i].sock);
736                        rt->sources[i].sock = -1;
737                        continue;
738                }
739
740                /* Save the useful info from the encap header */
741                rt->sources[i].encaphdr = (ndag_encap_t *)
742                                (rt->sources[i].saved + sizeof(ndag_common_t));
743
744                if (rt->sources[i].expectedseq != 0) {
745                        if (ntohl(rt->sources[i].encaphdr->seqno) !=
746                                        rt->sources[i].expectedseq) {
747                                /* TODO deal with wrapping */
748                                rt->missing_records += (ntohl(rt->sources[i].encaphdr->seqno) - rt->sources[i].expectedseq);
749                        }
750                }
751                rt->sources[i].expectedseq =
752                                ntohl(rt->sources[i].encaphdr->seqno) + 1;
753
754                /* If all good, skip past the nDAG headers */
755                rt->sources[i].nextread = rt->sources[i].saved +
756                                sizeof(ndag_common_t) + sizeof(ndag_encap_t);
757
758                readybufs += 1;
759        }
760
761        return readybufs;
762
763}
764
765
766static int receive_encap_records(libtrace_t *libtrace, recvstream_t *rt,
767                libtrace_packet_t *packet, int block) {
768
769        int iserr = 0;
770
771        if (packet->buf_control == TRACE_CTRL_PACKET) {
772                free(packet->buffer);
773                packet->buffer = NULL;
774        }
775
776        do {
777                /* Make sure we shouldn't be halting */
778                if ((iserr = is_halted(libtrace)) != -1) {
779                        return iserr;
780                }
781
782                /* First, check for any messages from the control thread */
783                iserr = receiver_read_messages(rt);
784
785                if (iserr <= 0) {
786                        return iserr;
787                }
788
789                /* If non-blocking and there are no sources, just break */
790                if (!block && rt->sourcecount == 0) {
791                        iserr = 0;
792                        break;
793                }
794
795                /* If blocking and no sources, sleep for a bit and then try
796                 * checking for messages again.
797                 */
798                if (block && rt->sourcecount == 0) {
799                        usleep(10000);
800                        continue;
801                }
802
803                if ((iserr = receive_from_sockets(rt)) < 0) {
804                        return iserr;
805                } else if (iserr > 0) {
806                        /* At least one of our input sockets has available
807                         * data, let's go ahead and use what we have. */
808                        break;
809                }
810
811        } while (block);
812
813        return iserr;
814}
815
816static streamsock_t *select_next_packet(recvstream_t *rt) {
817        int i;
818        streamsock_t *ssock = NULL;
819        uint64_t earliest = 0;
820        uint64_t currentts = 0;
821        dag_record_t *daghdr;
822
823        for (i = 0; i < rt->sourcecount; i ++) {
824                if (read_required(rt->sources[i])) {
825                        continue;
826                }
827
828                daghdr = (dag_record_t *)rt->sources[i].nextread;
829                currentts = bswap_le_to_host64(daghdr->ts);
830
831                if (earliest == 0 || earliest > currentts) {
832                        earliest = currentts;
833                        ssock = &(rt->sources[i]);
834                }
835                /*
836                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
837                                i, currentts,
838                                rt->sources[i].recordcount,
839                                rt->missing_records);
840                */
841        }
842        return ssock;
843}
844
845static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
846
847        int rem;
848        streamsock_t *nextavail = NULL;
849        rem = receive_encap_records(libtrace, &(FORMAT_DATA->receivers[0]),
850                        packet, 1);
851
852        if (rem <= 0) {
853                return rem;
854        }
855
856        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
857        if (nextavail == NULL) {
858                return 0;
859        }
860
861        /* nextread should point at an ERF header, so prepare 'packet' to be
862         * a libtrace ERF packet. */
863
864        return ndag_prepare_packet_stream(libtrace,
865                        &(FORMAT_DATA->receivers[0]), nextavail,
866                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
867}
868
869static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
870                libtrace_packet_t **packets, size_t nb_packets) {
871
872        recvstream_t *rt;
873        int rem;
874        size_t read_packets = 0;
875        streamsock_t *nextavail = NULL;
876
877        rt = (recvstream_t *)t->format_data;
878
879        do {
880                rem = receive_encap_records(libtrace, rt,
881                                packets[read_packets],
882                                read_packets == 0 ? 1 : 0);
883                if (rem < 0) {
884                        return rem;
885                }
886
887                if (rem == 0) {
888                        break;
889                }
890
891                nextavail = select_next_packet(rt);
892                if (nextavail == NULL) {
893                        break;
894                }
895
896                ndag_prepare_packet_stream(libtrace, rt, nextavail,
897                                packets[read_packets],
898                                TRACE_PREP_DO_NOT_OWN_BUFFER);
899
900                read_packets  ++;
901                if (read_packets >= nb_packets) {
902                        break;
903                }
904        } while (1);
905
906        return read_packets;
907
908}
909
910static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
911                libtrace_packet_t *packet) {
912
913
914        libtrace_eventobj_t event = {0,0,0.0,0};
915        int rem;
916        streamsock_t *nextavail = NULL;
917
918        do {
919                rem = receive_encap_records(libtrace,
920                                &(FORMAT_DATA->receivers[0]), packet, 0);
921
922                if (rem < 0) {
923                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
924                                "Received invalid nDAG records.");
925                        event.type = TRACE_EVENT_TERMINATE;
926                        break;
927                }
928
929                if (rem == 0) {
930                        /* Either we've been halted or we've got no packets
931                         * right now. */
932                        if (is_halted(libtrace) == 0) {
933                                event.type = TRACE_EVENT_TERMINATE;
934                                break;
935                        }
936                        event.type = TRACE_EVENT_SLEEP;
937                        event.seconds = 0.0001;
938                        break;
939                }
940
941                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
942                if (nextavail == NULL) {
943                        event.type = TRACE_EVENT_SLEEP;
944                        event.seconds = 0.0001;
945                        break;
946                }
947
948                event.type = TRACE_EVENT_PACKET;
949                ndag_prepare_packet_stream(libtrace,
950                                &(FORMAT_DATA->receivers[0]), nextavail,
951                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
952                event.size = trace_get_capture_length(packet) +
953                                trace_get_framing_length(packet);
954
955                if (libtrace->filter) {
956                        int filtret = trace_apply_filter(libtrace->filter,
957                                        packet);
958                        if (filtret == -1) {
959                                trace_set_err(libtrace,
960                                                TRACE_ERR_BAD_FILTER,
961                                                "Bad BPF Filter");
962                                event.type = TRACE_EVENT_TERMINATE;
963                                break;
964                        }
965
966                        if (filtret == 0) {
967                                /* Didn't match filter, try next one */
968                                libtrace->filtered_packets ++;
969                                trace_clear_cache(packet);
970                                continue;
971                        }
972                }
973
974                if (libtrace->snaplen > 0) {
975                        trace_set_capture_length(packet, libtrace->snaplen);
976                }
977                libtrace->accepted_packets ++;
978                break;
979        } while (1);
980
981        return event;
982}
983
984static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
985
986        int i;
987
988        stat->dropped_valid = 1;
989        stat->dropped = 0;
990        stat->received_valid = 1;
991        stat->received = 0;
992        stat->missing_valid = 1;
993        stat->missing = 0;
994
995        /* TODO Is this thread safe? */
996        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
997                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
998                stat->received += FORMAT_DATA->receivers[i].received_packets;
999                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1000        }
1001
1002}
1003
1004static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1005                libtrace_stat_t *stat) {
1006
1007        recvstream_t *recvr = (recvstream_t *)t->format_data;
1008
1009        if (libtrace == NULL)
1010                return;
1011        /* TODO Is this thread safe */
1012        stat->dropped_valid = 1;
1013        stat->dropped = recvr->dropped_upstream;
1014
1015        stat->received_valid = 1;
1016        stat->received = recvr->received_packets;
1017
1018        stat->missing_valid = 1;
1019        stat->missing = recvr->missing_records;
1020
1021}
1022
1023static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1024                bool reader) {
1025        recvstream_t *recvr;
1026
1027        if (!reader || t->type != THREAD_PERPKT) {
1028                return 0;
1029        }
1030
1031        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1032        t->format_data = recvr;
1033
1034        return 0;
1035}
1036
1037static struct libtrace_format_t ndag = {
1038
1039        "ndag",
1040        "",
1041        TRACE_FORMAT_NDAG,
1042        NULL,                   /* probe filename */
1043        NULL,                   /* probe magic */
1044        ndag_init_input,        /* init_input */
1045        NULL,                   /* config_input */
1046        ndag_start_input,       /* start_input */
1047        ndag_pause_input,       /* pause_input */
1048        NULL,                   /* init_output */
1049        NULL,                   /* config_output */
1050        NULL,                   /* start_output */
1051        ndag_fin_input,         /* fin_input */
1052        NULL,                   /* fin_output */
1053        ndag_read_packet,       /* read_packet */
1054        ndag_prepare_packet,    /* prepare_packet */
1055        NULL,                   /* fin_packet */
1056        NULL,                   /* write_packet */
1057        erf_get_link_type,      /* get_link_type */
1058        erf_get_direction,      /* get_direction */
1059        erf_set_direction,      /* set_direction */
1060        erf_get_erf_timestamp,  /* get_erf_timestamp */
1061        NULL,                   /* get_timeval */
1062        NULL,                   /* get_seconds */
1063        NULL,                   /* get_timespec */
1064        NULL,                   /* seek_erf */
1065        NULL,                   /* seek_timeval */
1066        NULL,                   /* seek_seconds */
1067        erf_get_capture_length, /* get_capture_length */
1068        erf_get_wire_length,    /* get_wire_length */
1069        erf_get_framing_length, /* get_framing_length */
1070        erf_set_capture_length, /* set_capture_length */
1071        NULL,                   /* get_received_packets */
1072        NULL,                   /* get_filtered_packets */
1073        NULL,                   /* get_dropped_packets */
1074        ndag_get_statistics,    /* get_statistics */
1075        NULL,                   /* get_fd */
1076        trace_event_ndag,       /* trace_event */
1077        NULL,                   /* help */
1078        NULL,                   /* next pointer */
1079        {true, 0},              /* live packet capture */
1080        ndag_pstart_input,      /* parallel start */
1081        ndag_pread_packets,     /* parallel read */
1082        ndag_pause_input,       /* parallel pause */
1083        NULL,
1084        ndag_pregister_thread,  /* register thread */
1085        NULL,
1086        ndag_get_thread_stats   /* per-thread stats */
1087};
1088
1089void ndag_constructor(void) {
1090        register_format(&ndag);
1091}
Note: See TracBrowser for help on using the repository browser.