source: lib/format_ndag.c @ 07de3c6

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 07de3c6 was 07de3c6, checked in by Shane Alcock <salcock@…>, 4 years ago

Why count the number of buffers available every time?

Surely it is much more efficient to just have a counter that
gets modified each time a buffer is filled or empty...

  • Property mode set to 100644
File size: 42.4 KB
Line 
1
2#define _GNU_SOURCE
3
4#include "config.h"
5#include "common.h"
6#include "libtrace.h"
7#include "libtrace_int.h"
8#include "format_helper.h"
9#include "format_erf.h"
10
11#include <assert.h>
12#include <errno.h>
13#include <fcntl.h>
14#include <stdio.h>
15#include <string.h>
16#include <unistd.h>
17#include <stdlib.h>
18#include <net/if.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netdb.h>
22
23#include "format_ndag.h"
24
25#define NDAG_IDLE_TIMEOUT (600)
26#define ENCAP_BUFSIZE (10000)
27#define CTRL_BUF_SIZE (10000)
28#define ENCAP_BUFFERS (1000)
29
30#define RECV_BATCH_SIZE (50)
31
32#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
33
34static struct libtrace_format_t ndag;
35
36volatile int ndag_paused = 0;
37
38typedef struct monitor {
39        uint16_t monitorid;
40        uint64_t laststart;
41} ndag_monitor_t;
42
43
44typedef struct streamsource {
45        uint16_t monitor;
46        char *groupaddr;
47        char *localiface;
48        uint16_t port;
49} streamsource_t;
50
51typedef struct streamsock {
52        char *groupaddr;
53        int sock;
54        struct addrinfo *srcaddr;
55        uint16_t port;
56        uint32_t expectedseq;
57        ndag_monitor_t *monitorptr;
58        char **saved;
59        char *nextread;
60        int nextreadind;
61        int nextwriteind;
62        int savedsize[ENCAP_BUFFERS];
63        uint32_t startidle;
64        uint64_t recordcount;
65
66        int bufavail;
67
68        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
69} streamsock_t;
70
71typedef struct recvstream {
72        streamsock_t *sources;
73        uint16_t sourcecount;
74        libtrace_message_queue_t mqueue;
75        int threadindex;
76        ndag_monitor_t *knownmonitors;
77        uint16_t monitorcount;
78
79        uint64_t dropped_upstream;
80        uint64_t missing_records;
81        uint64_t received_packets;
82} recvstream_t;
83
84typedef struct ndag_format_data {
85        char *multicastgroup;
86        char *portstr;
87        char *localiface;
88        uint16_t nextthreadid;
89        recvstream_t *receivers;
90
91        pthread_t controlthread;
92        libtrace_message_queue_t controlqueue;
93} ndag_format_data_t;
94
95enum {
96        NDAG_CLIENT_HALT = 0x01,
97        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
98        NDAG_CLIENT_NEWGROUP = 0x03
99};
100
101typedef struct ndagreadermessage {
102        uint8_t type;
103        streamsource_t contents;
104} ndag_internal_message_t;
105
106
107static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
108
109        /* Calculate seq_a - seq_b, taking wraparound into account */
110        if (seq_a == seq_b) return 0;
111
112        if (seq_a > seq_b) {
113                return (int) (seq_a - seq_b);
114        }
115
116        /* -1 for the wrap and another -1 because we don't use zero */
117        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
118}
119
120static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
121        ndag_common_t *header = (ndag_common_t *)msgbuf;
122
123        if (msgsize < sizeof(ndag_common_t)) {
124                fprintf(stderr,
125                        "nDAG message does not have a complete nDAG header.\n");
126                return 0;
127        }
128
129        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
130                fprintf(stderr,
131                        "nDAG message does not have a valid magic number.\n");
132                return 0;
133        }
134
135        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
136                fprintf(stderr,
137                        "nDAG message has an invalid header version: %u\n",
138                                header->version);
139                return 0;
140        }
141
142        return header->type;
143}
144
145static int join_multicast_group(char *groupaddr, char *localiface,
146        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
147
148        struct addrinfo hints;
149        struct addrinfo *gotten;
150        struct addrinfo *group;
151        unsigned int interface;
152        char pstr[16];
153        struct group_req greq;
154        int bufsize;
155
156        int sock;
157
158        if (portstr == NULL) {
159                snprintf(pstr, 15, "%u", portnum);
160                portstr = pstr;
161        }
162
163        interface = if_nametoindex(localiface);
164        if (interface == 0) {
165                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
166                                localiface, strerror(errno));
167                return -1;
168        }
169
170        hints.ai_family = PF_UNSPEC;
171        hints.ai_socktype = SOCK_DGRAM;
172        hints.ai_flags = AI_PASSIVE;
173        hints.ai_protocol = 0;
174
175        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
176                fprintf(stderr,
177                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
178                                portstr, strerror(errno));
179                return -1;
180        }
181
182        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
183                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
184                                groupaddr, strerror(errno));
185                return -1;
186        }
187
188        *srcinfo = gotten;
189        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
190        if (sock < 0) {
191                fprintf(stderr,
192                        "Failed to create multicast socket for %s:%s -- %s\n",
193                                groupaddr, portstr, strerror(errno));
194                goto sockcreateover;
195        }
196
197        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
198        {
199                fprintf(stderr,
200                        "Failed to bind to multicast socket %s:%s -- %s\n",
201                                groupaddr, portstr, strerror(errno));
202                close(sock);
203                sock = -1;
204                goto sockcreateover;
205        }
206
207        greq.gr_interface = interface;
208        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
209
210        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
211                        sizeof(greq)) < 0) {
212                fprintf(stderr,
213                        "Failed to join multicast group %s:%s -- %s\n",
214                                groupaddr, portstr, strerror(errno));
215                close(sock);
216                sock = -1;
217                goto sockcreateover;
218        }
219
220        bufsize = 16 * 1024 * 1024;
221        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
222                                (socklen_t)sizeof(int)) < 0) {
223
224                fprintf(stderr,
225                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
226                                groupaddr, portstr, strerror(errno));
227                close(sock);
228                sock = -1;
229                goto sockcreateover;
230        }
231
232sockcreateover:
233        freeaddrinfo(group);
234        return sock;
235}
236
237
238static int ndag_init_input(libtrace_t *libtrace) {
239
240        char *scan = NULL;
241        char *next = NULL;
242
243        libtrace->format_data = (ndag_format_data_t *)malloc(
244                        sizeof(ndag_format_data_t));
245
246        FORMAT_DATA->multicastgroup = NULL;
247        FORMAT_DATA->portstr = NULL;
248        FORMAT_DATA->localiface = NULL;
249        FORMAT_DATA->nextthreadid = 0;
250        FORMAT_DATA->receivers = NULL;
251
252        scan = strchr(libtrace->uridata, ',');
253        if (scan == NULL) {
254                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
255                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
256                return -1;
257        }
258        FORMAT_DATA->localiface = strndup(libtrace->uridata,
259                        (size_t)(scan - libtrace->uridata));
260        next = scan + 1;
261
262        scan = strchr(next, ',');
263        if (scan == NULL) {
264                FORMAT_DATA->portstr = strdup("9001");
265                FORMAT_DATA->multicastgroup = strdup(next);
266        } else {
267                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
268
269                FORMAT_DATA->portstr = strdup(scan + 1);
270        }
271        return 0;
272}
273
274static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
275                uint16_t portnum, uint16_t monid) {
276
277        ndag_internal_message_t alert;
278
279        alert.type = NDAG_CLIENT_NEWGROUP;
280        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
281        alert.contents.localiface = FORMAT_DATA->localiface;
282        alert.contents.port = portnum;
283        alert.contents.monitor = monid;
284
285        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
286                        (void *)&alert);
287
288}
289       
290static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
291                int msgsize, uint16_t *ptmap) {
292
293        int i;
294        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
295        uint8_t msgtype;
296
297        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
298        if (msgtype == 0) {
299                return -1;
300        }
301
302        msgsize -= sizeof(ndag_common_t);
303        if (msgtype == NDAG_PKT_BEACON) {
304                /* If message is a beacon, make sure every port included in the
305                 * beacon is assigned to a receive thread.
306                 */
307                uint16_t *ptr, numstreams;
308
309                if ((uint32_t)msgsize < sizeof(uint16_t)) {
310                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
311                        return -1;
312                }
313
314                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
315                numstreams = ntohs(*ptr);
316                ptr ++;
317
318                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
319                {
320                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
321                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
322                        return -1;
323                }
324
325                for (i = 0; i < numstreams; i++) {
326                        uint16_t streamport = ntohs(*ptr);
327
328                        if (ptmap[streamport] == 0xffff) {
329                                new_group_alert(libtrace,
330                                        FORMAT_DATA->nextthreadid, streamport,
331                                        ntohs(ndaghdr->monitorid));
332
333                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
334
335                                if (libtrace->perpkt_thread_count == 0) {
336                                        FORMAT_DATA->nextthreadid = 0;
337                                } else {
338                                        FORMAT_DATA->nextthreadid =
339                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
340                                }
341                        }
342
343                        ptr ++;
344                }
345        } else {
346                fprintf(stderr,
347                        "Unexpected message type on control channel: %u\n",
348                         msgtype);
349                return -1;
350        }
351
352        return 0;
353
354}
355
356static void *ndag_controller_run(void *tdata) {
357
358        libtrace_t *libtrace = (libtrace_t *)tdata;
359        uint16_t ptmap[65536];
360        int sock = -1;
361        struct addrinfo *receiveaddr = NULL;
362        fd_set listening;
363        struct timeval timeout;
364
365        /* ptmap is a dirty hack to allow us to quickly check if we've already
366         * assigned a stream to a thread.
367         */
368        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
369
370        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
371                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
372                        &receiveaddr);
373        if (sock == -1) {
374                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
375                        "Unable to join multicast group for nDAG control channel");
376                trace_interrupt();
377        }
378
379        ndag_paused = 0;
380        while ((is_halted(libtrace) == -1) && !ndag_paused) {
381                int ret;
382                char buf[CTRL_BUF_SIZE];
383
384                FD_ZERO(&listening);
385                FD_SET(sock, &listening);
386
387                timeout.tv_sec = 0;
388                timeout.tv_usec = 500000;
389
390                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
391                if (ret < 0) {
392                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
393                        break;
394                }
395
396                if (!FD_ISSET(sock, &listening)) {
397                        continue;
398                }
399
400                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
401                                receiveaddr->ai_addr,
402                                &(receiveaddr->ai_addrlen));
403                if (ret < 0) {
404                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
405                        break;
406                }
407
408                if (ret == 0) {
409                        break;
410                }
411
412                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
413                        fprintf(stderr, "Error while parsing nDAG control message.\n");
414                        continue;
415                }
416        }
417
418        if (sock >= 0) {
419                close(sock);
420        }
421
422        /* Control channel has fallen over, should probably encourage libtrace
423         * to halt the receiver threads as well.
424         */
425        if (!is_halted(libtrace)) {
426                trace_interrupt();
427        }
428
429        pthread_exit(NULL);
430}
431
432static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
433{
434        int ret;
435        uint32_t i;
436        /* Configure the set of receiver threads */
437
438        if (FORMAT_DATA->receivers == NULL) {
439                /* What if the number of threads changes between a pause and
440                 * a restart? Can this happen? */
441                FORMAT_DATA->receivers = (recvstream_t *)
442                                malloc(sizeof(recvstream_t) * maxthreads);
443        }
444
445        for (i = 0; i < maxthreads; i++) {
446                FORMAT_DATA->receivers[i].sources = NULL;
447                FORMAT_DATA->receivers[i].sourcecount = 0;
448                FORMAT_DATA->receivers[i].knownmonitors = NULL;
449                FORMAT_DATA->receivers[i].monitorcount = 0;
450                FORMAT_DATA->receivers[i].threadindex = i;
451                FORMAT_DATA->receivers[i].dropped_upstream = 0;
452                FORMAT_DATA->receivers[i].received_packets = 0;
453                FORMAT_DATA->receivers[i].missing_records = 0;
454
455                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
456                                sizeof(ndag_internal_message_t));
457        }
458
459        /* Start the controller thread */
460        /* TODO consider affinity of this thread? */
461
462        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
463                        ndag_controller_run, libtrace);
464        if (ret != 0) {
465                return -1;
466        }
467        return maxthreads;
468}
469
470static int ndag_start_input(libtrace_t *libtrace) {
471        return ndag_start_threads(libtrace, 1);
472}
473
474static int ndag_pstart_input(libtrace_t *libtrace) {
475        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
476                        libtrace->perpkt_thread_count)
477                return 0;
478        return -1;
479}
480
481static void halt_ndag_receiver(recvstream_t *receiver) {
482        int j, i;
483        libtrace_message_queue_destroy(&(receiver->mqueue));
484
485        if (receiver->sources == NULL)
486                return;
487        for (i = 0; i < receiver->sourcecount; i++) {
488                streamsock_t src = receiver->sources[i];
489                if (src.saved) {
490                        for (j = 0; i < ENCAP_BUFFERS; j++) {
491                                if (src.saved[j]) {
492                                        free(src.saved[j]);
493                                }
494                        }
495                        free(src.saved);
496                }
497                for (j = 0; j < RECV_BATCH_SIZE; j++) {
498                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
499                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
500                        }
501                }
502                close(src.sock);
503        }
504        if (receiver->knownmonitors) {
505                free(receiver->knownmonitors);
506        }
507
508        if (receiver->sources) {
509                free(receiver->sources);
510        }
511}
512
513static int ndag_pause_input(libtrace_t *libtrace) {
514        int i;
515
516        /* Close the existing receiver sockets */
517        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
518               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
519        }
520        return 0;
521}
522
523static int ndag_fin_input(libtrace_t *libtrace) {
524
525        if (FORMAT_DATA->receivers) {
526                free(FORMAT_DATA->receivers);
527        }
528        if (FORMAT_DATA->multicastgroup) {
529                free(FORMAT_DATA->multicastgroup);
530        }
531        if (FORMAT_DATA->portstr) {
532                free(FORMAT_DATA->portstr);
533        }
534        if (FORMAT_DATA->localiface) {
535                free(FORMAT_DATA->localiface);
536        }
537
538        free(libtrace->format_data);
539        return 0;
540}
541
542static int ndag_prepare_packet_stream(libtrace_t *libtrace,
543                recvstream_t *rt,
544                streamsock_t *ssock, libtrace_packet_t *packet,
545                uint32_t flags) {
546
547        dag_record_t *erfptr;
548        ndag_encap_t *encaphdr;
549        uint16_t ndag_reccount = 0;
550        int nr;
551
552        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
553                packet->buf_control = TRACE_CTRL_PACKET;
554        } else {
555                packet->buf_control = TRACE_CTRL_EXTERNAL;
556        }
557
558        packet->trace = libtrace;
559        packet->buffer = ssock->nextread;
560        packet->header = ssock->nextread;
561        packet->type = TRACE_RT_DATA_ERF;
562
563        erfptr = (dag_record_t *)packet->header;
564
565        if (erfptr->flags.rxerror == 1) {
566                packet->payload = NULL;
567                erfptr->rlen = htons(erf_get_framing_length(packet));
568        } else {
569                packet->payload = (char *)packet->buffer +
570                                erf_get_framing_length(packet);
571        }
572
573        /* Update upstream drops using lctr */
574
575        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
576                /* TODO */
577        } else {
578                if (rt->received_packets > 0) {
579                        rt->dropped_upstream += ntohs(erfptr->lctr);
580                }
581        }
582
583        rt->received_packets ++;
584        ssock->recordcount += 1;
585
586        nr = ssock->nextreadind;
587        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
588                        sizeof(ndag_common_t));
589
590        ndag_reccount = ntohs(encaphdr->recordcount);
591        if ((ndag_reccount & 0x8000) != 0) {
592                /* Record was truncated -- update rlen appropriately */
593                erfptr->rlen = htons(ssock->savedsize[nr] -
594                                (ssock->nextread - ssock->saved[nr]));
595        }
596        ssock->nextread += ntohs(erfptr->rlen);
597
598        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
599                /* Read everything from this buffer, mark as empty and
600                 * move on. */
601                ssock->savedsize[nr] = 0;
602                ssock->bufavail ++;
603
604                assert(ssock->bufavail > 0 && ssock->bufavail <= ENCAP_BUFFERS);
605                nr ++;
606                if (nr == ENCAP_BUFFERS) {
607                        nr = 0;
608                }
609                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
610                                sizeof(ndag_encap_t);
611                ssock->nextreadind = nr;
612        }
613
614        packet->order = erf_get_erf_timestamp(packet);
615        packet->error = packet->payload ? ntohs(erfptr->rlen) :
616                        erf_get_framing_length(packet);
617
618        return ntohs(erfptr->rlen);
619}
620
621static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
622                libtrace_packet_t *packet UNUSED,
623                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
624                uint32_t flags UNUSED) {
625
626        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
627        return 0;
628
629}
630
631static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
632
633        ndag_monitor_t *mon;
634
635        if (rt->monitorcount == 0) {
636                rt->knownmonitors = (ndag_monitor_t *)
637                                malloc(sizeof(ndag_monitor_t) * 5);
638        } else {
639                rt->knownmonitors = (ndag_monitor_t *)
640                            realloc(rt->knownmonitors,
641                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
642        }
643
644        mon = &(rt->knownmonitors[rt->monitorcount]);
645        mon->monitorid = monid;
646        mon->laststart = 0;
647
648        rt->monitorcount ++;
649        return mon;
650}
651
652static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
653
654        streamsock_t *ssock = NULL;
655        ndag_monitor_t *mon = NULL;
656        int i;
657
658        /* TODO consider replacing this with a list or vector so we can
659         * easily remove sources that are no longer in use, rather than
660         * just setting the sock to -1 and having to check them every
661         * time we want to read a packet.
662         */
663        if (rt->sourcecount == 0) {
664                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
665        } else if ((rt->sourcecount % 10) == 0) {
666                rt->sources = (streamsock_t *)realloc(rt->sources,
667                        sizeof(streamsock_t) * (rt->sourcecount + 10));
668        }
669
670        ssock = &(rt->sources[rt->sourcecount]);
671
672        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
673                        NULL, src.port, &(ssock->srcaddr));
674
675        if (ssock->sock < 0) {
676                return -1;
677        }
678
679        for (i = 0; i < rt->monitorcount; i++) {
680                if (rt->knownmonitors[i].monitorid == src.monitor) {
681                        mon = &(rt->knownmonitors[i]);
682                        break;
683                }
684        }
685
686        if (mon == NULL) {
687                mon = add_new_knownmonitor(rt, src.monitor);
688        }
689
690        ssock->port = src.port;
691        ssock->groupaddr = src.groupaddr;
692        ssock->expectedseq = 0;
693        ssock->monitorptr = mon;
694        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
695        ssock->bufavail = ENCAP_BUFFERS;
696        ssock->startidle = 0;
697
698        for (i = 0; i < ENCAP_BUFFERS; i++) {
699                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
700                ssock->savedsize[i] = 0;
701        }
702
703        for (i = 0; i < RECV_BATCH_SIZE; i++) {
704                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
705                                malloc(sizeof(struct iovec));
706                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
707                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
708                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
709                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
710                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
711                ssock->mmsgbufs[i].msg_len = 1;
712        }
713
714        ssock->nextread = NULL;;
715        ssock->nextreadind = 0;
716        ssock->recordcount = 0;
717        rt->sourcecount += 1;
718
719        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
720                        ssock->groupaddr, ssock->port, rt->threadindex);
721
722        return ssock->port;
723}
724
725static int receiver_read_messages(recvstream_t *rt) {
726
727        ndag_internal_message_t msg;
728
729        while (libtrace_message_queue_try_get(&(rt->mqueue),
730                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
731                switch(msg.type) {
732                        case NDAG_CLIENT_NEWGROUP:
733                                if (add_new_streamsock(rt, msg.contents) < 0) {
734                                        return -1;
735                                }
736                                break;
737                        case NDAG_CLIENT_HALT:
738                                return 0;
739                }
740        }
741        return 1;
742
743}
744
745static inline int readable_data(streamsock_t *ssock) {
746
747        if (ssock->sock == -1) {
748                return 0;
749        }
750        if (ssock->savedsize[ssock->nextreadind] == 0) {
751                return 0;
752        }
753        /*
754        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
755                        ssock->savedsize[ssock->nextreadind]) {
756                return 0;
757        }
758        */
759        return 1;
760
761
762}
763
764static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
765
766        int i;
767        for (i = 0; i < rt->sourcecount; i++) {
768                if (rt->sources[i].monitorptr == mon) {
769                        rt->sources[i].expectedseq = 0;
770                }
771        }
772
773}
774
775static int init_receivers(streamsock_t *ssock, int required) {
776
777        int wind = ssock->nextwriteind;
778        int i;
779
780        for (i = 0; i < required; i++) {
781                if (i >= RECV_BATCH_SIZE) {
782                        break;
783                }
784
785                ssock->mmsgbufs[i].msg_len = 0;
786                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
787                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
788                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
789
790                wind ++;
791        }
792
793        return i;
794}
795
796static int check_ndag_received(streamsock_t *ssock, int index,
797                unsigned int msglen, recvstream_t *rt) {
798
799        ndag_encap_t *encaphdr;
800        ndag_monitor_t *mon;
801        uint8_t rectype;
802
803        /* Check that we have a valid nDAG encap record */
804        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
805
806        if (rectype == NDAG_PKT_KEEPALIVE) {
807                /* Keep-alive, reset startidle and carry on. Don't
808                 * change nextwrite -- we want to overwrite the
809                 * keep-alive with usable content. */
810                return 0;
811        } else if (rectype != NDAG_PKT_ENCAPERF) {
812                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
813                                ssock->groupaddr, ssock->port);
814                close(ssock->sock);
815                ssock->sock = -1;
816                return -1;
817        }
818
819        ssock->savedsize[index] = msglen;
820        ssock->nextwriteind ++;
821        ssock->bufavail --;
822
823        assert(ssock->bufavail >= 0);
824
825        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
826                ssock->nextwriteind = 0;
827        }
828
829        /* Get the useful info from the encap header */
830        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
831
832        mon = ssock->monitorptr;
833
834        if (mon->laststart == 0) {
835                mon->laststart = bswap_be_to_host64(encaphdr->started);
836        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
837                mon->laststart = bswap_be_to_host64(encaphdr->started);
838                reset_expected_seqs(rt, mon);
839
840                /* TODO what is a good way to indicate this to clients?
841                 * set the loss counter in the ERF header? a bit rude?
842                 * use another bit in the ERF header?
843                 * add a queryable flag to libtrace_packet_t?
844                 */
845
846        }
847
848        if (ssock->expectedseq != 0) {
849                rt->missing_records += seq_cmp(
850                                ntohl(encaphdr->seqno), ssock->expectedseq);
851        }
852        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
853        if (ssock->expectedseq == 0) {
854                ssock->expectedseq ++;
855        }
856
857        if (ssock->nextread == NULL) {
858                /* If this is our first read, set up 'nextread'
859                 * by skipping past the nDAG headers */
860                ssock->nextread = ssock->saved[0] +
861                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
862        }
863        return 1;
864
865}
866
867static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
868                int *gottime, recvstream_t *rt) {
869
870        int ret, ndagstat, i, avail;
871        int toret = 0;
872
873        if (ssock->sock == -1) {
874                return 0;
875        }
876
877        if (ssock->bufavail == 0) {
878                /* All buffers were full, so something must be
879                 * available. */
880                return 1;
881        }
882
883        if (ssock->bufavail < RECV_BATCH_SIZE / 2) {
884                return 1;
885        }
886
887        avail = init_receivers(ssock, ssock->bufavail);
888
889        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
890                        MSG_DONTWAIT, NULL);
891
892        if (ret < 0) {
893                /* Nothing to receive right now, but we should still
894                 * count as 'ready' if at least one buffer is full */
895                if (errno == EAGAIN || errno == EWOULDBLOCK) {
896                        if (readable_data(ssock)) {
897                                toret = 1;
898                        }
899                        if (!(*gottime)) {
900                                gettimeofday(tv, NULL);
901                                *gottime = 1;
902                        }
903                        if (ssock->startidle == 0) {
904                                ssock->startidle = tv->tv_sec;
905                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
906                                fprintf(stderr,
907                                        "Closing channel %s:%u due to inactivity.\n",
908                                        ssock->groupaddr,
909                                        ssock->port);
910
911                                close(ssock->sock);
912                                ssock->sock = -1;
913                        }
914                } else {
915
916                        fprintf(stderr,
917                                "Error receiving encapsulated records from %s:%u -- %s \n",
918                                ssock->groupaddr, ssock->port,
919                                strerror(errno));
920                        close(ssock->sock);
921                        ssock->sock = -1;
922                }
923                return toret;
924        }
925        ssock->startidle = 0;
926        for (i = 0; i < ret; i++) {
927                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
928                                ssock->mmsgbufs[i].msg_len, rt);
929                if (ndagstat == -1) {
930                        break;
931                }
932
933                if (ndagstat == 1) {
934                        toret = 1;
935                }
936        }
937
938        return toret;
939}
940
941static int receive_from_sockets(recvstream_t *rt) {
942
943        int i, readybufs, gottime;
944        struct timeval tv;
945
946        readybufs = 0;
947        gottime = 0;
948
949        for (i = 0; i < rt->sourcecount; i ++) {
950                readybufs += receive_from_single_socket(&(rt->sources[i]),
951                                &tv, &gottime, rt);
952        }
953
954        return readybufs;
955
956}
957
958
959static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
960                libtrace_packet_t *packet) {
961
962        int iserr = 0;
963
964        if (packet->buf_control == TRACE_CTRL_PACKET) {
965                free(packet->buffer);
966                packet->buffer = NULL;
967        }
968
969        do {
970                /* Make sure we shouldn't be halting */
971                if ((iserr = is_halted(libtrace)) != -1) {
972                        return iserr;
973                }
974
975                /* Check for any messages from the control thread */
976                iserr = receiver_read_messages(rt);
977
978                if (iserr <= 0) {
979                        return iserr;
980                }
981
982                /* If blocking and no sources, sleep for a bit and then try
983                 * checking for messages again.
984                 */
985                if (rt->sourcecount == 0) {
986                        usleep(10000);
987                        continue;
988                }
989
990                if ((iserr = receive_from_sockets(rt)) < 0) {
991                        return iserr;
992                } else if (iserr > 0) {
993                        /* At least one of our input sockets has available
994                         * data, let's go ahead and use what we have. */
995                        break;
996                }
997
998                /* None of our sources have anything available, we can take
999                 * a short break rather than immediately trying again.
1000                 */
1001                if (iserr == 0) {
1002                        usleep(100);
1003                }
1004
1005        } while (1);
1006
1007        return iserr;
1008}
1009
1010static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1011                libtrace_packet_t *packet) {
1012
1013        int iserr = 0;
1014
1015        if (packet->buf_control == TRACE_CTRL_PACKET) {
1016                free(packet->buffer);
1017                packet->buffer = NULL;
1018        }
1019
1020        /* Make sure we shouldn't be halting */
1021        if ((iserr = is_halted(libtrace)) != -1) {
1022                return iserr;
1023        }
1024
1025        /* If non-blocking and there are no sources, just break */
1026        if (rt->sourcecount == 0) {
1027                return 0;
1028        }
1029
1030        return receive_from_sockets(rt);
1031}
1032
1033static streamsock_t *select_next_packet(recvstream_t *rt) {
1034        int i;
1035        streamsock_t *ssock = NULL;
1036        uint64_t earliest = 0;
1037        uint64_t currentts = 0;
1038        dag_record_t *daghdr;
1039
1040        for (i = 0; i < rt->sourcecount; i ++) {
1041                if (!readable_data(&(rt->sources[i]))) {
1042                        continue;
1043                }
1044
1045                daghdr = (dag_record_t *)(rt->sources[i].nextread);
1046                currentts = bswap_le_to_host64(daghdr->ts);
1047
1048                if (earliest == 0 || earliest > currentts) {
1049                        earliest = currentts;
1050                        ssock = &(rt->sources[i]);
1051                }
1052                /*
1053                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
1054                                i, currentts,
1055                                rt->sources[i].recordcount,
1056                                rt->missing_records);
1057                */
1058        }
1059        return ssock;
1060}
1061
1062static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1063
1064        int rem;
1065        streamsock_t *nextavail = NULL;
1066        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1067                        packet);
1068
1069        if (rem <= 0) {
1070                return rem;
1071        }
1072
1073        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1074        if (nextavail == NULL) {
1075                return 0;
1076        }
1077
1078        /* nextread should point at an ERF header, so prepare 'packet' to be
1079         * a libtrace ERF packet. */
1080
1081        return ndag_prepare_packet_stream(libtrace,
1082                        &(FORMAT_DATA->receivers[0]), nextavail,
1083                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1084}
1085
1086static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1087                libtrace_packet_t **packets, size_t nb_packets) {
1088
1089        recvstream_t *rt;
1090        int rem;
1091        size_t read_packets = 0;
1092        streamsock_t *nextavail = NULL;
1093
1094        rt = (recvstream_t *)t->format_data;
1095
1096
1097        do {
1098                /* Only check for messages once per batch */
1099                if (read_packets == 0) {
1100                        rem = receive_encap_records_block(libtrace, rt,
1101                                packets[read_packets]);
1102                } else {
1103                        rem = receive_encap_records_nonblock(libtrace, rt,
1104                                packets[read_packets]);
1105                }
1106
1107                if (rem < 0) {
1108                        return rem;
1109                }
1110
1111                if (rem == 0) {
1112                        break;
1113                }
1114
1115                nextavail = select_next_packet(rt);
1116                if (nextavail == NULL) {
1117                        break;
1118                }
1119
1120                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1121                                packets[read_packets],
1122                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1123
1124                read_packets  ++;
1125                if (read_packets >= nb_packets) {
1126                        break;
1127                }
1128        } while (1);
1129
1130        return read_packets;
1131
1132}
1133
1134static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1135                libtrace_packet_t *packet) {
1136
1137
1138        libtrace_eventobj_t event = {0,0,0.0,0};
1139        int rem;
1140        streamsock_t *nextavail = NULL;
1141
1142        /* Only check for messages once per call */
1143        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1144        if (rem <= 0) {
1145                event.type = TRACE_EVENT_TERMINATE;
1146                return event;
1147        }
1148
1149        do {
1150                rem = receive_encap_records_nonblock(libtrace,
1151                                &(FORMAT_DATA->receivers[0]), packet);
1152
1153                if (rem < 0) {
1154                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1155                                "Received invalid nDAG records.");
1156                        event.type = TRACE_EVENT_TERMINATE;
1157                        break;
1158                }
1159
1160                if (rem == 0) {
1161                        /* Either we've been halted or we've got no packets
1162                         * right now. */
1163                        if (is_halted(libtrace) == 0) {
1164                                event.type = TRACE_EVENT_TERMINATE;
1165                                break;
1166                        }
1167                        event.type = TRACE_EVENT_SLEEP;
1168                        event.seconds = 0.0001;
1169                        break;
1170                }
1171
1172                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1173                if (nextavail == NULL) {
1174                        event.type = TRACE_EVENT_SLEEP;
1175                        event.seconds = 0.0001;
1176                        break;
1177                }
1178
1179                event.type = TRACE_EVENT_PACKET;
1180                ndag_prepare_packet_stream(libtrace,
1181                                &(FORMAT_DATA->receivers[0]), nextavail,
1182                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1183                event.size = trace_get_capture_length(packet) +
1184                                trace_get_framing_length(packet);
1185
1186                if (libtrace->filter) {
1187                        int filtret = trace_apply_filter(libtrace->filter,
1188                                        packet);
1189                        if (filtret == -1) {
1190                                trace_set_err(libtrace,
1191                                                TRACE_ERR_BAD_FILTER,
1192                                                "Bad BPF Filter");
1193                                event.type = TRACE_EVENT_TERMINATE;
1194                                break;
1195                        }
1196
1197                        if (filtret == 0) {
1198                                /* Didn't match filter, try next one */
1199                                libtrace->filtered_packets ++;
1200                                trace_clear_cache(packet);
1201                                continue;
1202                        }
1203                }
1204
1205                if (libtrace->snaplen > 0) {
1206                        trace_set_capture_length(packet, libtrace->snaplen);
1207                }
1208                libtrace->accepted_packets ++;
1209                break;
1210        } while (1);
1211
1212        return event;
1213}
1214
1215static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1216
1217        int i;
1218
1219        stat->dropped_valid = 1;
1220        stat->dropped = 0;
1221        stat->received_valid = 1;
1222        stat->received = 0;
1223        stat->missing_valid = 1;
1224        stat->missing = 0;
1225
1226        /* TODO Is this thread safe? */
1227        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1228                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1229                stat->received += FORMAT_DATA->receivers[i].received_packets;
1230                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1231        }
1232
1233}
1234
1235static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1236                libtrace_stat_t *stat) {
1237
1238        recvstream_t *recvr = (recvstream_t *)t->format_data;
1239
1240        if (libtrace == NULL)
1241                return;
1242        /* TODO Is this thread safe */
1243        stat->dropped_valid = 1;
1244        stat->dropped = recvr->dropped_upstream;
1245
1246        stat->received_valid = 1;
1247        stat->received = recvr->received_packets;
1248
1249        stat->missing_valid = 1;
1250        stat->missing = recvr->missing_records;
1251
1252}
1253
1254static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1255                bool reader) {
1256        recvstream_t *recvr;
1257
1258        if (!reader || t->type != THREAD_PERPKT) {
1259                return 0;
1260        }
1261
1262        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1263        t->format_data = recvr;
1264
1265        return 0;
1266}
1267
1268static struct libtrace_format_t ndag = {
1269
1270        "ndag",
1271        "",
1272        TRACE_FORMAT_NDAG,
1273        NULL,                   /* probe filename */
1274        NULL,                   /* probe magic */
1275        ndag_init_input,        /* init_input */
1276        NULL,                   /* config_input */
1277        ndag_start_input,       /* start_input */
1278        ndag_pause_input,       /* pause_input */
1279        NULL,                   /* init_output */
1280        NULL,                   /* config_output */
1281        NULL,                   /* start_output */
1282        ndag_fin_input,         /* fin_input */
1283        NULL,                   /* fin_output */
1284        ndag_read_packet,       /* read_packet */
1285        ndag_prepare_packet,    /* prepare_packet */
1286        NULL,                   /* fin_packet */
1287        NULL,                   /* write_packet */
1288        erf_get_link_type,      /* get_link_type */
1289        erf_get_direction,      /* get_direction */
1290        erf_set_direction,      /* set_direction */
1291        erf_get_erf_timestamp,  /* get_erf_timestamp */
1292        NULL,                   /* get_timeval */
1293        NULL,                   /* get_seconds */
1294        NULL,                   /* get_timespec */
1295        NULL,                   /* seek_erf */
1296        NULL,                   /* seek_timeval */
1297        NULL,                   /* seek_seconds */
1298        erf_get_capture_length, /* get_capture_length */
1299        erf_get_wire_length,    /* get_wire_length */
1300        erf_get_framing_length, /* get_framing_length */
1301        erf_set_capture_length, /* set_capture_length */
1302        NULL,                   /* get_received_packets */
1303        NULL,                   /* get_filtered_packets */
1304        NULL,                   /* get_dropped_packets */
1305        ndag_get_statistics,    /* get_statistics */
1306        NULL,                   /* get_fd */
1307        trace_event_ndag,       /* trace_event */
1308        NULL,                   /* help */
1309        NULL,                   /* next pointer */
1310        {true, 0},              /* live packet capture */
1311        ndag_pstart_input,      /* parallel start */
1312        ndag_pread_packets,     /* parallel read */
1313        ndag_pause_input,       /* parallel pause */
1314        NULL,
1315        ndag_pregister_thread,  /* register thread */
1316        NULL,
1317        ndag_get_thread_stats   /* per-thread stats */
1318};
1319
1320void ndag_constructor(void) {
1321        register_format(&ndag);
1322}
Note: See TracBrowser for help on using the repository browser.