source: lib/format_ndag.c @ 5d8280a

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 5d8280a was 5d8280a, checked in by Shane Alcock <salcock@…>, 4 years ago

Increased buffer number and batch size for ndag receiving.

Only call recvmmsg if there are a decent number of buffers
available to fill -- the overhead is not worth it if we've only
got one or two buffers available and, in that case, we must have
plenty of buffered packets ready for reading anyway.

  • Property mode set to 100644
File size: 42.3 KB
Line 
1
2#define _GNU_SOURCE
3
4#include "config.h"
5#include "common.h"
6#include "libtrace.h"
7#include "libtrace_int.h"
8#include "format_helper.h"
9#include "format_erf.h"
10
11#include <assert.h>
12#include <errno.h>
13#include <fcntl.h>
14#include <stdio.h>
15#include <string.h>
16#include <unistd.h>
17#include <stdlib.h>
18#include <net/if.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netdb.h>
22
23#include "format_ndag.h"
24
25#define NDAG_IDLE_TIMEOUT (600)
26#define ENCAP_BUFSIZE (10000)
27#define CTRL_BUF_SIZE (10000)
28#define ENCAP_BUFFERS (1000)
29
30#define RECV_BATCH_SIZE (50)
31
32#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
33
34static struct libtrace_format_t ndag;
35
36volatile int ndag_paused = 0;
37
38typedef struct monitor {
39        uint16_t monitorid;
40        uint64_t laststart;
41} ndag_monitor_t;
42
43
44typedef struct streamsource {
45        uint16_t monitor;
46        char *groupaddr;
47        char *localiface;
48        uint16_t port;
49} streamsource_t;
50
51typedef struct streamsock {
52        char *groupaddr;
53        int sock;
54        struct addrinfo *srcaddr;
55        uint16_t port;
56        uint32_t expectedseq;
57        ndag_monitor_t *monitorptr;
58        char **saved;
59        char *nextread;
60        int nextreadind;
61        int nextwriteind;
62        int savedsize[ENCAP_BUFFERS];
63        uint32_t startidle;
64        uint64_t recordcount;
65
66        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
67} streamsock_t;
68
69typedef struct recvstream {
70        streamsock_t *sources;
71        uint16_t sourcecount;
72        libtrace_message_queue_t mqueue;
73        int threadindex;
74        ndag_monitor_t *knownmonitors;
75        uint16_t monitorcount;
76
77        uint64_t dropped_upstream;
78        uint64_t missing_records;
79        uint64_t received_packets;
80} recvstream_t;
81
82typedef struct ndag_format_data {
83        char *multicastgroup;
84        char *portstr;
85        char *localiface;
86        uint16_t nextthreadid;
87        recvstream_t *receivers;
88
89        pthread_t controlthread;
90        libtrace_message_queue_t controlqueue;
91} ndag_format_data_t;
92
93enum {
94        NDAG_CLIENT_HALT = 0x01,
95        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
96        NDAG_CLIENT_NEWGROUP = 0x03
97};
98
99typedef struct ndagreadermessage {
100        uint8_t type;
101        streamsource_t contents;
102} ndag_internal_message_t;
103
104
105static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
106
107        /* Calculate seq_a - seq_b, taking wraparound into account */
108        if (seq_a == seq_b) return 0;
109
110        if (seq_a > seq_b) {
111                return (int) (seq_a - seq_b);
112        }
113
114        /* -1 for the wrap and another -1 because we don't use zero */
115        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
116}
117
118static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
119        ndag_common_t *header = (ndag_common_t *)msgbuf;
120
121        if (msgsize < sizeof(ndag_common_t)) {
122                fprintf(stderr,
123                        "nDAG message does not have a complete nDAG header.\n");
124                return 0;
125        }
126
127        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
128                fprintf(stderr,
129                        "nDAG message does not have a valid magic number.\n");
130                return 0;
131        }
132
133        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
134                fprintf(stderr,
135                        "nDAG message has an invalid header version: %u\n",
136                                header->version);
137                return 0;
138        }
139
140        return header->type;
141}
142
143static int join_multicast_group(char *groupaddr, char *localiface,
144        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
145
146        struct addrinfo hints;
147        struct addrinfo *gotten;
148        struct addrinfo *group;
149        unsigned int interface;
150        char pstr[16];
151        struct group_req greq;
152        int bufsize;
153
154        int sock;
155
156        if (portstr == NULL) {
157                snprintf(pstr, 15, "%u", portnum);
158                portstr = pstr;
159        }
160
161        interface = if_nametoindex(localiface);
162        if (interface == 0) {
163                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
164                                localiface, strerror(errno));
165                return -1;
166        }
167
168        hints.ai_family = PF_UNSPEC;
169        hints.ai_socktype = SOCK_DGRAM;
170        hints.ai_flags = AI_PASSIVE;
171        hints.ai_protocol = 0;
172
173        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
174                fprintf(stderr,
175                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
176                                portstr, strerror(errno));
177                return -1;
178        }
179
180        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
181                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
182                                groupaddr, strerror(errno));
183                return -1;
184        }
185
186        *srcinfo = gotten;
187        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
188        if (sock < 0) {
189                fprintf(stderr,
190                        "Failed to create multicast socket for %s:%s -- %s\n",
191                                groupaddr, portstr, strerror(errno));
192                goto sockcreateover;
193        }
194
195        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
196        {
197                fprintf(stderr,
198                        "Failed to bind to multicast socket %s:%s -- %s\n",
199                                groupaddr, portstr, strerror(errno));
200                close(sock);
201                sock = -1;
202                goto sockcreateover;
203        }
204
205        greq.gr_interface = interface;
206        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
207
208        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
209                        sizeof(greq)) < 0) {
210                fprintf(stderr,
211                        "Failed to join multicast group %s:%s -- %s\n",
212                                groupaddr, portstr, strerror(errno));
213                close(sock);
214                sock = -1;
215                goto sockcreateover;
216        }
217
218        bufsize = 16 * 1024 * 1024;
219        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
220                                (socklen_t)sizeof(int)) < 0) {
221
222                fprintf(stderr,
223                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
224                                groupaddr, portstr, strerror(errno));
225                close(sock);
226                sock = -1;
227                goto sockcreateover;
228        }
229
230sockcreateover:
231        freeaddrinfo(group);
232        return sock;
233}
234
235
236static int ndag_init_input(libtrace_t *libtrace) {
237
238        char *scan = NULL;
239        char *next = NULL;
240
241        libtrace->format_data = (ndag_format_data_t *)malloc(
242                        sizeof(ndag_format_data_t));
243
244        FORMAT_DATA->multicastgroup = NULL;
245        FORMAT_DATA->portstr = NULL;
246        FORMAT_DATA->localiface = NULL;
247        FORMAT_DATA->nextthreadid = 0;
248        FORMAT_DATA->receivers = NULL;
249
250        scan = strchr(libtrace->uridata, ',');
251        if (scan == NULL) {
252                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
253                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
254                return -1;
255        }
256        FORMAT_DATA->localiface = strndup(libtrace->uridata,
257                        (size_t)(scan - libtrace->uridata));
258        next = scan + 1;
259
260        scan = strchr(next, ',');
261        if (scan == NULL) {
262                FORMAT_DATA->portstr = strdup("9001");
263                FORMAT_DATA->multicastgroup = strdup(next);
264        } else {
265                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
266
267                FORMAT_DATA->portstr = strdup(scan + 1);
268        }
269        return 0;
270}
271
272static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
273                uint16_t portnum, uint16_t monid) {
274
275        ndag_internal_message_t alert;
276
277        alert.type = NDAG_CLIENT_NEWGROUP;
278        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
279        alert.contents.localiface = FORMAT_DATA->localiface;
280        alert.contents.port = portnum;
281        alert.contents.monitor = monid;
282
283        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
284                        (void *)&alert);
285
286}
287       
288static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
289                int msgsize, uint16_t *ptmap) {
290
291        int i;
292        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
293        uint8_t msgtype;
294
295        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
296        if (msgtype == 0) {
297                return -1;
298        }
299
300        msgsize -= sizeof(ndag_common_t);
301        if (msgtype == NDAG_PKT_BEACON) {
302                /* If message is a beacon, make sure every port included in the
303                 * beacon is assigned to a receive thread.
304                 */
305                uint16_t *ptr, numstreams;
306
307                if ((uint32_t)msgsize < sizeof(uint16_t)) {
308                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
309                        return -1;
310                }
311
312                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
313                numstreams = ntohs(*ptr);
314                ptr ++;
315
316                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
317                {
318                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
319                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
320                        return -1;
321                }
322
323                for (i = 0; i < numstreams; i++) {
324                        uint16_t streamport = ntohs(*ptr);
325
326                        if (ptmap[streamport] == 0xffff) {
327                                new_group_alert(libtrace,
328                                        FORMAT_DATA->nextthreadid, streamport,
329                                        ntohs(ndaghdr->monitorid));
330
331                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
332
333                                if (libtrace->perpkt_thread_count == 0) {
334                                        FORMAT_DATA->nextthreadid = 0;
335                                } else {
336                                        FORMAT_DATA->nextthreadid =
337                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
338                                }
339                        }
340
341                        ptr ++;
342                }
343        } else {
344                fprintf(stderr,
345                        "Unexpected message type on control channel: %u\n",
346                         msgtype);
347                return -1;
348        }
349
350        return 0;
351
352}
353
354static void *ndag_controller_run(void *tdata) {
355
356        libtrace_t *libtrace = (libtrace_t *)tdata;
357        uint16_t ptmap[65536];
358        int sock = -1;
359        struct addrinfo *receiveaddr = NULL;
360        fd_set listening;
361        struct timeval timeout;
362
363        /* ptmap is a dirty hack to allow us to quickly check if we've already
364         * assigned a stream to a thread.
365         */
366        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
367
368        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
369                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
370                        &receiveaddr);
371        if (sock == -1) {
372                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
373                        "Unable to join multicast group for nDAG control channel");
374                trace_interrupt();
375        }
376
377        ndag_paused = 0;
378        while ((is_halted(libtrace) == -1) && !ndag_paused) {
379                int ret;
380                char buf[CTRL_BUF_SIZE];
381
382                FD_ZERO(&listening);
383                FD_SET(sock, &listening);
384
385                timeout.tv_sec = 0;
386                timeout.tv_usec = 500000;
387
388                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
389                if (ret < 0) {
390                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
391                        break;
392                }
393
394                if (!FD_ISSET(sock, &listening)) {
395                        continue;
396                }
397
398                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
399                                receiveaddr->ai_addr,
400                                &(receiveaddr->ai_addrlen));
401                if (ret < 0) {
402                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
403                        break;
404                }
405
406                if (ret == 0) {
407                        break;
408                }
409
410                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
411                        fprintf(stderr, "Error while parsing nDAG control message.\n");
412                        continue;
413                }
414        }
415
416        if (sock >= 0) {
417                close(sock);
418        }
419
420        /* Control channel has fallen over, should probably encourage libtrace
421         * to halt the receiver threads as well.
422         */
423        if (!is_halted(libtrace)) {
424                trace_interrupt();
425        }
426
427        pthread_exit(NULL);
428}
429
430static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
431{
432        int ret;
433        uint32_t i;
434        /* Configure the set of receiver threads */
435
436        if (FORMAT_DATA->receivers == NULL) {
437                /* What if the number of threads changes between a pause and
438                 * a restart? Can this happen? */
439                FORMAT_DATA->receivers = (recvstream_t *)
440                                malloc(sizeof(recvstream_t) * maxthreads);
441        }
442
443        for (i = 0; i < maxthreads; i++) {
444                FORMAT_DATA->receivers[i].sources = NULL;
445                FORMAT_DATA->receivers[i].sourcecount = 0;
446                FORMAT_DATA->receivers[i].knownmonitors = NULL;
447                FORMAT_DATA->receivers[i].monitorcount = 0;
448                FORMAT_DATA->receivers[i].threadindex = i;
449                FORMAT_DATA->receivers[i].dropped_upstream = 0;
450                FORMAT_DATA->receivers[i].received_packets = 0;
451                FORMAT_DATA->receivers[i].missing_records = 0;
452
453                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
454                                sizeof(ndag_internal_message_t));
455        }
456
457        /* Start the controller thread */
458        /* TODO consider affinity of this thread? */
459
460        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
461                        ndag_controller_run, libtrace);
462        if (ret != 0) {
463                return -1;
464        }
465        return maxthreads;
466}
467
468static int ndag_start_input(libtrace_t *libtrace) {
469        return ndag_start_threads(libtrace, 1);
470}
471
472static int ndag_pstart_input(libtrace_t *libtrace) {
473        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
474                        libtrace->perpkt_thread_count)
475                return 0;
476        return -1;
477}
478
479static void halt_ndag_receiver(recvstream_t *receiver) {
480        int j, i;
481        libtrace_message_queue_destroy(&(receiver->mqueue));
482
483        if (receiver->sources == NULL)
484                return;
485        for (i = 0; i < receiver->sourcecount; i++) {
486                streamsock_t src = receiver->sources[i];
487                if (src.saved) {
488                        for (j = 0; i < ENCAP_BUFFERS; j++) {
489                                if (src.saved[j]) {
490                                        free(src.saved[j]);
491                                }
492                        }
493                        free(src.saved);
494                }
495                for (j = 0; j < RECV_BATCH_SIZE; j++) {
496                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
497                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
498                        }
499                }
500                close(src.sock);
501        }
502        if (receiver->knownmonitors) {
503                free(receiver->knownmonitors);
504        }
505
506        if (receiver->sources) {
507                free(receiver->sources);
508        }
509}
510
511static int ndag_pause_input(libtrace_t *libtrace) {
512        int i;
513
514        /* Close the existing receiver sockets */
515        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
516               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
517        }
518        return 0;
519}
520
521static int ndag_fin_input(libtrace_t *libtrace) {
522
523        if (FORMAT_DATA->receivers) {
524                free(FORMAT_DATA->receivers);
525        }
526        if (FORMAT_DATA->multicastgroup) {
527                free(FORMAT_DATA->multicastgroup);
528        }
529        if (FORMAT_DATA->portstr) {
530                free(FORMAT_DATA->portstr);
531        }
532        if (FORMAT_DATA->localiface) {
533                free(FORMAT_DATA->localiface);
534        }
535
536        free(libtrace->format_data);
537        return 0;
538}
539
540static int ndag_prepare_packet_stream(libtrace_t *libtrace,
541                recvstream_t *rt,
542                streamsock_t *ssock, libtrace_packet_t *packet,
543                uint32_t flags) {
544
545        dag_record_t *erfptr;
546        ndag_encap_t *encaphdr;
547        uint16_t ndag_reccount = 0;
548        int nr;
549
550        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
551                packet->buf_control = TRACE_CTRL_PACKET;
552        } else {
553                packet->buf_control = TRACE_CTRL_EXTERNAL;
554        }
555
556        packet->trace = libtrace;
557        packet->buffer = ssock->nextread;
558        packet->header = ssock->nextread;
559        packet->type = TRACE_RT_DATA_ERF;
560
561        erfptr = (dag_record_t *)packet->header;
562
563        if (erfptr->flags.rxerror == 1) {
564                packet->payload = NULL;
565                erfptr->rlen = htons(erf_get_framing_length(packet));
566        } else {
567                packet->payload = (char *)packet->buffer +
568                                erf_get_framing_length(packet);
569        }
570
571        /* Update upstream drops using lctr */
572
573        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
574                /* TODO */
575        } else {
576                if (rt->received_packets > 0) {
577                        rt->dropped_upstream += ntohs(erfptr->lctr);
578                }
579        }
580
581        rt->received_packets ++;
582        ssock->recordcount += 1;
583
584        nr = ssock->nextreadind;
585        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
586                        sizeof(ndag_common_t));
587
588        ndag_reccount = ntohs(encaphdr->recordcount);
589        if ((ndag_reccount & 0x8000) != 0) {
590                /* Record was truncated -- update rlen appropriately */
591                erfptr->rlen = htons(ssock->savedsize[nr] -
592                                (ssock->nextread - ssock->saved[nr]));
593        }
594        ssock->nextread += ntohs(erfptr->rlen);
595
596        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
597                /* Read everything from this buffer, mark as empty and
598                 * move on. */
599                ssock->savedsize[nr] = 0;
600                nr ++;
601                if (nr == ENCAP_BUFFERS) {
602                        nr = 0;
603                }
604                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
605                                sizeof(ndag_encap_t);
606                ssock->nextreadind = nr;
607        }
608
609        packet->order = erf_get_erf_timestamp(packet);
610        packet->error = packet->payload ? ntohs(erfptr->rlen) :
611                        erf_get_framing_length(packet);
612
613        return ntohs(erfptr->rlen);
614}
615
616static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
617                libtrace_packet_t *packet UNUSED,
618                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
619                uint32_t flags UNUSED) {
620
621        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
622        return 0;
623
624}
625
626static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
627
628        ndag_monitor_t *mon;
629
630        if (rt->monitorcount == 0) {
631                rt->knownmonitors = (ndag_monitor_t *)
632                                malloc(sizeof(ndag_monitor_t) * 5);
633        } else {
634                rt->knownmonitors = (ndag_monitor_t *)
635                            realloc(rt->knownmonitors,
636                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
637        }
638
639        mon = &(rt->knownmonitors[rt->monitorcount]);
640        mon->monitorid = monid;
641        mon->laststart = 0;
642
643        rt->monitorcount ++;
644        return mon;
645}
646
647static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
648
649        streamsock_t *ssock = NULL;
650        ndag_monitor_t *mon = NULL;
651        int i;
652
653        /* TODO consider replacing this with a list or vector so we can
654         * easily remove sources that are no longer in use, rather than
655         * just setting the sock to -1 and having to check them every
656         * time we want to read a packet.
657         */
658        if (rt->sourcecount == 0) {
659                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
660        } else if ((rt->sourcecount % 10) == 0) {
661                rt->sources = (streamsock_t *)realloc(rt->sources,
662                        sizeof(streamsock_t) * (rt->sourcecount + 10));
663        }
664
665        ssock = &(rt->sources[rt->sourcecount]);
666
667        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
668                        NULL, src.port, &(ssock->srcaddr));
669
670        if (ssock->sock < 0) {
671                return -1;
672        }
673
674        for (i = 0; i < rt->monitorcount; i++) {
675                if (rt->knownmonitors[i].monitorid == src.monitor) {
676                        mon = &(rt->knownmonitors[i]);
677                        break;
678                }
679        }
680
681        if (mon == NULL) {
682                mon = add_new_knownmonitor(rt, src.monitor);
683        }
684
685        ssock->port = src.port;
686        ssock->groupaddr = src.groupaddr;
687        ssock->expectedseq = 0;
688        ssock->monitorptr = mon;
689        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
690        ssock->startidle = 0;
691
692        for (i = 0; i < ENCAP_BUFFERS; i++) {
693                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
694                ssock->savedsize[i] = 0;
695        }
696
697        for (i = 0; i < RECV_BATCH_SIZE; i++) {
698                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
699                                malloc(sizeof(struct iovec));
700        }
701
702        ssock->nextread = NULL;;
703        ssock->nextreadind = 0;
704        ssock->recordcount = 0;
705        rt->sourcecount += 1;
706
707        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
708                        ssock->groupaddr, ssock->port, rt->threadindex);
709
710        return ssock->port;
711}
712
713static int receiver_read_messages(recvstream_t *rt) {
714
715        ndag_internal_message_t msg;
716
717        while (libtrace_message_queue_try_get(&(rt->mqueue),
718                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
719                switch(msg.type) {
720                        case NDAG_CLIENT_NEWGROUP:
721                                if (add_new_streamsock(rt, msg.contents) < 0) {
722                                        return -1;
723                                }
724                                break;
725                        case NDAG_CLIENT_HALT:
726                                return 0;
727                }
728        }
729        return 1;
730
731}
732
733static inline int readable_data(streamsock_t *ssock) {
734
735        if (ssock->sock == -1) {
736                return 0;
737        }
738        if (ssock->savedsize[ssock->nextreadind] == 0) {
739                return 0;
740        }
741        /*
742        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
743                        ssock->savedsize[ssock->nextreadind]) {
744                return 0;
745        }
746        */
747        return 1;
748
749
750}
751
752static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
753
754        int i;
755        for (i = 0; i < rt->sourcecount; i++) {
756                if (rt->sources[i].monitorptr == mon) {
757                        rt->sources[i].expectedseq = 0;
758                }
759        }
760
761}
762
763static int init_receivers(streamsock_t *ssock) {
764
765        int wind = ssock->nextwriteind;
766        int i;
767        int avail = 0;
768
769        for (i = 0; i < RECV_BATCH_SIZE; i++) {
770                if (wind == ENCAP_BUFFERS) {
771                        wind = 0;
772                }
773
774                if (ssock->savedsize[wind] != 0) {
775                        /* No more empty buffers */
776                        break;
777                }
778
779                ssock->mmsgbufs[i].msg_len = 0;
780                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
781                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
782                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
783                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
784                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
785                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
786                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
787                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
788
789                avail ++;
790                wind ++;
791        }
792
793        return avail;
794}
795
796static int check_ndag_received(streamsock_t *ssock, int index,
797                unsigned int msglen, recvstream_t *rt) {
798
799        ndag_encap_t *encaphdr;
800        ndag_monitor_t *mon;
801        uint8_t rectype;
802
803        /* Check that we have a valid nDAG encap record */
804        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
805
806        if (rectype == NDAG_PKT_KEEPALIVE) {
807                /* Keep-alive, reset startidle and carry on. Don't
808                 * change nextwrite -- we want to overwrite the
809                 * keep-alive with usable content. */
810                return 0;
811        } else if (rectype != NDAG_PKT_ENCAPERF) {
812                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
813                                ssock->groupaddr, ssock->port);
814                close(ssock->sock);
815                ssock->sock = -1;
816                return -1;
817        }
818
819        ssock->savedsize[index] = msglen;
820        ssock->nextwriteind ++;
821
822        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
823                ssock->nextwriteind = 0;
824        }
825
826        /* Get the useful info from the encap header */
827        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
828
829        mon = ssock->monitorptr;
830
831        if (mon->laststart == 0) {
832                mon->laststart = bswap_be_to_host64(encaphdr->started);
833        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
834                mon->laststart = bswap_be_to_host64(encaphdr->started);
835                reset_expected_seqs(rt, mon);
836
837                /* TODO what is a good way to indicate this to clients?
838                 * set the loss counter in the ERF header? a bit rude?
839                 * use another bit in the ERF header?
840                 * add a queryable flag to libtrace_packet_t?
841                 */
842
843        }
844
845        if (ssock->expectedseq != 0) {
846                rt->missing_records += seq_cmp(
847                                ntohl(encaphdr->seqno), ssock->expectedseq);
848        }
849        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
850        if (ssock->expectedseq == 0) {
851                ssock->expectedseq ++;
852        }
853
854        if (ssock->nextread == NULL) {
855                /* If this is our first read, set up 'nextread'
856                 * by skipping past the nDAG headers */
857                ssock->nextread = ssock->saved[0] +
858                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
859        }
860        return 1;
861
862}
863
864static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
865                int *gottime, recvstream_t *rt) {
866
867        int avail, ret, ndagstat, i;
868        int toret = 0;
869
870        if (ssock->sock == -1) {
871                return 0;
872        }
873
874        avail = init_receivers(ssock);
875
876        if (avail == 0) {
877                /* All buffers were full, so something must be
878                 * available. */
879                return 1;
880        }
881
882        if (avail < RECV_BATCH_SIZE / 2) {
883                return 1;
884        }
885
886        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
887                        MSG_DONTWAIT, NULL);
888
889        if (ret < 0) {
890                /* Nothing to receive right now, but we should still
891                 * count as 'ready' if at least one buffer is full */
892                if (errno == EAGAIN || errno == EWOULDBLOCK) {
893                        if (readable_data(ssock)) {
894                                toret = 1;
895                        }
896                        if (!(*gottime)) {
897                                gettimeofday(tv, NULL);
898                                *gottime = 1;
899                        }
900                        if (ssock->startidle == 0) {
901                                ssock->startidle = tv->tv_sec;
902                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
903                                fprintf(stderr,
904                                        "Closing channel %s:%u due to inactivity.\n",
905                                        ssock->groupaddr,
906                                        ssock->port);
907
908                                close(ssock->sock);
909                                ssock->sock = -1;
910                        }
911                } else {
912
913                        fprintf(stderr,
914                                "Error receiving encapsulated records from %s:%u -- %s \n",
915                                ssock->groupaddr, ssock->port,
916                                strerror(errno));
917                        close(ssock->sock);
918                        ssock->sock = -1;
919                }
920                return toret;
921        }
922        ssock->startidle = 0;
923        for (i = 0; i < ret; i++) {
924                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
925                                ssock->mmsgbufs[i].msg_len, rt);
926                if (ndagstat == -1) {
927                        break;
928                }
929
930                if (ndagstat == 1) {
931                        toret = 1;
932                }
933        }
934
935        return toret;
936}
937
938static int receive_from_sockets(recvstream_t *rt) {
939
940        int i, readybufs, gottime;
941        struct timeval tv;
942
943        readybufs = 0;
944        gottime = 0;
945
946        for (i = 0; i < rt->sourcecount; i ++) {
947                readybufs += receive_from_single_socket(&(rt->sources[i]),
948                                &tv, &gottime, rt);
949        }
950
951        return readybufs;
952
953}
954
955
956static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
957                libtrace_packet_t *packet) {
958
959        int iserr = 0;
960
961        if (packet->buf_control == TRACE_CTRL_PACKET) {
962                free(packet->buffer);
963                packet->buffer = NULL;
964        }
965
966        do {
967                /* Make sure we shouldn't be halting */
968                if ((iserr = is_halted(libtrace)) != -1) {
969                        return iserr;
970                }
971
972                /* Check for any messages from the control thread */
973                iserr = receiver_read_messages(rt);
974
975                if (iserr <= 0) {
976                        return iserr;
977                }
978
979                /* If blocking and no sources, sleep for a bit and then try
980                 * checking for messages again.
981                 */
982                if (rt->sourcecount == 0) {
983                        usleep(10000);
984                        continue;
985                }
986
987                if ((iserr = receive_from_sockets(rt)) < 0) {
988                        return iserr;
989                } else if (iserr > 0) {
990                        /* At least one of our input sockets has available
991                         * data, let's go ahead and use what we have. */
992                        break;
993                }
994
995                /* None of our sources have anything available, we can take
996                 * a short break rather than immediately trying again.
997                 */
998                if (iserr == 0) {
999                        usleep(100);
1000                }
1001
1002        } while (1);
1003
1004        return iserr;
1005}
1006
1007static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1008                libtrace_packet_t *packet) {
1009
1010        int iserr = 0;
1011
1012        if (packet->buf_control == TRACE_CTRL_PACKET) {
1013                free(packet->buffer);
1014                packet->buffer = NULL;
1015        }
1016
1017        /* Make sure we shouldn't be halting */
1018        if ((iserr = is_halted(libtrace)) != -1) {
1019                return iserr;
1020        }
1021
1022        /* If non-blocking and there are no sources, just break */
1023        if (rt->sourcecount == 0) {
1024                return 0;
1025        }
1026
1027        return receive_from_sockets(rt);
1028}
1029
1030static streamsock_t *select_next_packet(recvstream_t *rt) {
1031        int i;
1032        streamsock_t *ssock = NULL;
1033        uint64_t earliest = 0;
1034        uint64_t currentts = 0;
1035        dag_record_t *daghdr;
1036
1037        for (i = 0; i < rt->sourcecount; i ++) {
1038                if (!readable_data(&(rt->sources[i]))) {
1039                        continue;
1040                }
1041
1042                daghdr = (dag_record_t *)(rt->sources[i].nextread);
1043                currentts = bswap_le_to_host64(daghdr->ts);
1044
1045                if (earliest == 0 || earliest > currentts) {
1046                        earliest = currentts;
1047                        ssock = &(rt->sources[i]);
1048                }
1049                /*
1050                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
1051                                i, currentts,
1052                                rt->sources[i].recordcount,
1053                                rt->missing_records);
1054                */
1055        }
1056        return ssock;
1057}
1058
1059static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1060
1061        int rem;
1062        streamsock_t *nextavail = NULL;
1063        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1064                        packet);
1065
1066        if (rem <= 0) {
1067                return rem;
1068        }
1069
1070        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1071        if (nextavail == NULL) {
1072                return 0;
1073        }
1074
1075        /* nextread should point at an ERF header, so prepare 'packet' to be
1076         * a libtrace ERF packet. */
1077
1078        return ndag_prepare_packet_stream(libtrace,
1079                        &(FORMAT_DATA->receivers[0]), nextavail,
1080                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1081}
1082
1083static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1084                libtrace_packet_t **packets, size_t nb_packets) {
1085
1086        recvstream_t *rt;
1087        int rem;
1088        size_t read_packets = 0;
1089        streamsock_t *nextavail = NULL;
1090
1091        rt = (recvstream_t *)t->format_data;
1092
1093
1094        do {
1095                /* Only check for messages once per batch */
1096                if (read_packets == 0) {
1097                        rem = receive_encap_records_block(libtrace, rt,
1098                                packets[read_packets]);
1099                } else {
1100                        rem = receive_encap_records_nonblock(libtrace, rt,
1101                                packets[read_packets]);
1102                }
1103
1104                if (rem < 0) {
1105                        return rem;
1106                }
1107
1108                if (rem == 0) {
1109                        break;
1110                }
1111
1112                nextavail = select_next_packet(rt);
1113                if (nextavail == NULL) {
1114                        break;
1115                }
1116
1117                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1118                                packets[read_packets],
1119                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1120
1121                read_packets  ++;
1122                if (read_packets >= nb_packets) {
1123                        break;
1124                }
1125        } while (1);
1126
1127        return read_packets;
1128
1129}
1130
1131static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1132                libtrace_packet_t *packet) {
1133
1134
1135        libtrace_eventobj_t event = {0,0,0.0,0};
1136        int rem;
1137        streamsock_t *nextavail = NULL;
1138
1139        /* Only check for messages once per call */
1140        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1141        if (rem <= 0) {
1142                event.type = TRACE_EVENT_TERMINATE;
1143                return event;
1144        }
1145
1146        do {
1147                rem = receive_encap_records_nonblock(libtrace,
1148                                &(FORMAT_DATA->receivers[0]), packet);
1149
1150                if (rem < 0) {
1151                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1152                                "Received invalid nDAG records.");
1153                        event.type = TRACE_EVENT_TERMINATE;
1154                        break;
1155                }
1156
1157                if (rem == 0) {
1158                        /* Either we've been halted or we've got no packets
1159                         * right now. */
1160                        if (is_halted(libtrace) == 0) {
1161                                event.type = TRACE_EVENT_TERMINATE;
1162                                break;
1163                        }
1164                        event.type = TRACE_EVENT_SLEEP;
1165                        event.seconds = 0.0001;
1166                        break;
1167                }
1168
1169                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1170                if (nextavail == NULL) {
1171                        event.type = TRACE_EVENT_SLEEP;
1172                        event.seconds = 0.0001;
1173                        break;
1174                }
1175
1176                event.type = TRACE_EVENT_PACKET;
1177                ndag_prepare_packet_stream(libtrace,
1178                                &(FORMAT_DATA->receivers[0]), nextavail,
1179                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1180                event.size = trace_get_capture_length(packet) +
1181                                trace_get_framing_length(packet);
1182
1183                if (libtrace->filter) {
1184                        int filtret = trace_apply_filter(libtrace->filter,
1185                                        packet);
1186                        if (filtret == -1) {
1187                                trace_set_err(libtrace,
1188                                                TRACE_ERR_BAD_FILTER,
1189                                                "Bad BPF Filter");
1190                                event.type = TRACE_EVENT_TERMINATE;
1191                                break;
1192                        }
1193
1194                        if (filtret == 0) {
1195                                /* Didn't match filter, try next one */
1196                                libtrace->filtered_packets ++;
1197                                trace_clear_cache(packet);
1198                                continue;
1199                        }
1200                }
1201
1202                if (libtrace->snaplen > 0) {
1203                        trace_set_capture_length(packet, libtrace->snaplen);
1204                }
1205                libtrace->accepted_packets ++;
1206                break;
1207        } while (1);
1208
1209        return event;
1210}
1211
1212static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1213
1214        int i;
1215
1216        stat->dropped_valid = 1;
1217        stat->dropped = 0;
1218        stat->received_valid = 1;
1219        stat->received = 0;
1220        stat->missing_valid = 1;
1221        stat->missing = 0;
1222
1223        /* TODO Is this thread safe? */
1224        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1225                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1226                stat->received += FORMAT_DATA->receivers[i].received_packets;
1227                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1228        }
1229
1230}
1231
1232static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1233                libtrace_stat_t *stat) {
1234
1235        recvstream_t *recvr = (recvstream_t *)t->format_data;
1236
1237        if (libtrace == NULL)
1238                return;
1239        /* TODO Is this thread safe */
1240        stat->dropped_valid = 1;
1241        stat->dropped = recvr->dropped_upstream;
1242
1243        stat->received_valid = 1;
1244        stat->received = recvr->received_packets;
1245
1246        stat->missing_valid = 1;
1247        stat->missing = recvr->missing_records;
1248
1249}
1250
1251static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1252                bool reader) {
1253        recvstream_t *recvr;
1254
1255        if (!reader || t->type != THREAD_PERPKT) {
1256                return 0;
1257        }
1258
1259        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1260        t->format_data = recvr;
1261
1262        return 0;
1263}
1264
1265static struct libtrace_format_t ndag = {
1266
1267        "ndag",
1268        "",
1269        TRACE_FORMAT_NDAG,
1270        NULL,                   /* probe filename */
1271        NULL,                   /* probe magic */
1272        ndag_init_input,        /* init_input */
1273        NULL,                   /* config_input */
1274        ndag_start_input,       /* start_input */
1275        ndag_pause_input,       /* pause_input */
1276        NULL,                   /* init_output */
1277        NULL,                   /* config_output */
1278        NULL,                   /* start_output */
1279        ndag_fin_input,         /* fin_input */
1280        NULL,                   /* fin_output */
1281        ndag_read_packet,       /* read_packet */
1282        ndag_prepare_packet,    /* prepare_packet */
1283        NULL,                   /* fin_packet */
1284        NULL,                   /* write_packet */
1285        erf_get_link_type,      /* get_link_type */
1286        erf_get_direction,      /* get_direction */
1287        erf_set_direction,      /* set_direction */
1288        erf_get_erf_timestamp,  /* get_erf_timestamp */
1289        NULL,                   /* get_timeval */
1290        NULL,                   /* get_seconds */
1291        NULL,                   /* get_timespec */
1292        NULL,                   /* seek_erf */
1293        NULL,                   /* seek_timeval */
1294        NULL,                   /* seek_seconds */
1295        erf_get_capture_length, /* get_capture_length */
1296        erf_get_wire_length,    /* get_wire_length */
1297        erf_get_framing_length, /* get_framing_length */
1298        erf_set_capture_length, /* set_capture_length */
1299        NULL,                   /* get_received_packets */
1300        NULL,                   /* get_filtered_packets */
1301        NULL,                   /* get_dropped_packets */
1302        ndag_get_statistics,    /* get_statistics */
1303        NULL,                   /* get_fd */
1304        trace_event_ndag,       /* trace_event */
1305        NULL,                   /* help */
1306        NULL,                   /* next pointer */
1307        {true, 0},              /* live packet capture */
1308        ndag_pstart_input,      /* parallel start */
1309        ndag_pread_packets,     /* parallel read */
1310        ndag_pause_input,       /* parallel pause */
1311        NULL,
1312        ndag_pregister_thread,  /* register thread */
1313        NULL,
1314        ndag_get_thread_stats   /* per-thread stats */
1315};
1316
1317void ndag_constructor(void) {
1318        register_format(&ndag);
1319}
Note: See TracBrowser for help on using the repository browser.