source: lib/format_ndag.c @ eb70703

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since eb70703 was eb70703, checked in by Shane Alcock <salcock@…>, 3 years ago

Only initialise mmsgbufs when we're actually going to use them.

Previous change meant that we aren't going to be actually calling
recvmmsg so stop wasting time repeatedly initialising buffers that
aren't going to be used.

Also moved initialisation of mmsgbuf fields that do not change
out of the per-packet path so we don't end up repeatedly setting
values to the same thing that they were before.

  • Property mode set to 100644
File size: 42.6 KB
Line 
1
2#define _GNU_SOURCE
3
4#include "config.h"
5#include "common.h"
6#include "libtrace.h"
7#include "libtrace_int.h"
8#include "format_helper.h"
9#include "format_erf.h"
10
11#include <assert.h>
12#include <errno.h>
13#include <fcntl.h>
14#include <stdio.h>
15#include <string.h>
16#include <unistd.h>
17#include <stdlib.h>
18#include <net/if.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netdb.h>
22
23#include "format_ndag.h"
24
25#define NDAG_IDLE_TIMEOUT (600)
26#define ENCAP_BUFSIZE (10000)
27#define CTRL_BUF_SIZE (10000)
28#define ENCAP_BUFFERS (1000)
29
30#define RECV_BATCH_SIZE (50)
31
32#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
33
34static struct libtrace_format_t ndag;
35
36volatile int ndag_paused = 0;
37
38typedef struct monitor {
39        uint16_t monitorid;
40        uint64_t laststart;
41} ndag_monitor_t;
42
43
44typedef struct streamsource {
45        uint16_t monitor;
46        char *groupaddr;
47        char *localiface;
48        uint16_t port;
49} streamsource_t;
50
51typedef struct streamsock {
52        char *groupaddr;
53        int sock;
54        struct addrinfo *srcaddr;
55        uint16_t port;
56        uint32_t expectedseq;
57        ndag_monitor_t *monitorptr;
58        char **saved;
59        char *nextread;
60        int nextreadind;
61        int nextwriteind;
62        int savedsize[ENCAP_BUFFERS];
63        uint32_t startidle;
64        uint64_t recordcount;
65
66        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
67} streamsock_t;
68
69typedef struct recvstream {
70        streamsock_t *sources;
71        uint16_t sourcecount;
72        libtrace_message_queue_t mqueue;
73        int threadindex;
74        ndag_monitor_t *knownmonitors;
75        uint16_t monitorcount;
76
77        uint64_t dropped_upstream;
78        uint64_t missing_records;
79        uint64_t received_packets;
80} recvstream_t;
81
82typedef struct ndag_format_data {
83        char *multicastgroup;
84        char *portstr;
85        char *localiface;
86        uint16_t nextthreadid;
87        recvstream_t *receivers;
88
89        pthread_t controlthread;
90        libtrace_message_queue_t controlqueue;
91} ndag_format_data_t;
92
93enum {
94        NDAG_CLIENT_HALT = 0x01,
95        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
96        NDAG_CLIENT_NEWGROUP = 0x03
97};
98
99typedef struct ndagreadermessage {
100        uint8_t type;
101        streamsource_t contents;
102} ndag_internal_message_t;
103
104
105static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
106
107        /* Calculate seq_a - seq_b, taking wraparound into account */
108        if (seq_a == seq_b) return 0;
109
110        if (seq_a > seq_b) {
111                return (int) (seq_a - seq_b);
112        }
113
114        /* -1 for the wrap and another -1 because we don't use zero */
115        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
116}
117
118static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
119        ndag_common_t *header = (ndag_common_t *)msgbuf;
120
121        if (msgsize < sizeof(ndag_common_t)) {
122                fprintf(stderr,
123                        "nDAG message does not have a complete nDAG header.\n");
124                return 0;
125        }
126
127        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
128                fprintf(stderr,
129                        "nDAG message does not have a valid magic number.\n");
130                return 0;
131        }
132
133        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
134                fprintf(stderr,
135                        "nDAG message has an invalid header version: %u\n",
136                                header->version);
137                return 0;
138        }
139
140        return header->type;
141}
142
143static int join_multicast_group(char *groupaddr, char *localiface,
144        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
145
146        struct addrinfo hints;
147        struct addrinfo *gotten;
148        struct addrinfo *group;
149        unsigned int interface;
150        char pstr[16];
151        struct group_req greq;
152        int bufsize;
153
154        int sock;
155
156        if (portstr == NULL) {
157                snprintf(pstr, 15, "%u", portnum);
158                portstr = pstr;
159        }
160
161        interface = if_nametoindex(localiface);
162        if (interface == 0) {
163                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
164                                localiface, strerror(errno));
165                return -1;
166        }
167
168        hints.ai_family = PF_UNSPEC;
169        hints.ai_socktype = SOCK_DGRAM;
170        hints.ai_flags = AI_PASSIVE;
171        hints.ai_protocol = 0;
172
173        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
174                fprintf(stderr,
175                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
176                                portstr, strerror(errno));
177                return -1;
178        }
179
180        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
181                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
182                                groupaddr, strerror(errno));
183                return -1;
184        }
185
186        *srcinfo = gotten;
187        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
188        if (sock < 0) {
189                fprintf(stderr,
190                        "Failed to create multicast socket for %s:%s -- %s\n",
191                                groupaddr, portstr, strerror(errno));
192                goto sockcreateover;
193        }
194
195        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
196        {
197                fprintf(stderr,
198                        "Failed to bind to multicast socket %s:%s -- %s\n",
199                                groupaddr, portstr, strerror(errno));
200                close(sock);
201                sock = -1;
202                goto sockcreateover;
203        }
204
205        greq.gr_interface = interface;
206        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
207
208        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
209                        sizeof(greq)) < 0) {
210                fprintf(stderr,
211                        "Failed to join multicast group %s:%s -- %s\n",
212                                groupaddr, portstr, strerror(errno));
213                close(sock);
214                sock = -1;
215                goto sockcreateover;
216        }
217
218        bufsize = 16 * 1024 * 1024;
219        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
220                                (socklen_t)sizeof(int)) < 0) {
221
222                fprintf(stderr,
223                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
224                                groupaddr, portstr, strerror(errno));
225                close(sock);
226                sock = -1;
227                goto sockcreateover;
228        }
229
230sockcreateover:
231        freeaddrinfo(group);
232        return sock;
233}
234
235
236static int ndag_init_input(libtrace_t *libtrace) {
237
238        char *scan = NULL;
239        char *next = NULL;
240
241        libtrace->format_data = (ndag_format_data_t *)malloc(
242                        sizeof(ndag_format_data_t));
243
244        FORMAT_DATA->multicastgroup = NULL;
245        FORMAT_DATA->portstr = NULL;
246        FORMAT_DATA->localiface = NULL;
247        FORMAT_DATA->nextthreadid = 0;
248        FORMAT_DATA->receivers = NULL;
249
250        scan = strchr(libtrace->uridata, ',');
251        if (scan == NULL) {
252                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
253                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
254                return -1;
255        }
256        FORMAT_DATA->localiface = strndup(libtrace->uridata,
257                        (size_t)(scan - libtrace->uridata));
258        next = scan + 1;
259
260        scan = strchr(next, ',');
261        if (scan == NULL) {
262                FORMAT_DATA->portstr = strdup("9001");
263                FORMAT_DATA->multicastgroup = strdup(next);
264        } else {
265                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
266
267                FORMAT_DATA->portstr = strdup(scan + 1);
268        }
269        return 0;
270}
271
272static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
273                uint16_t portnum, uint16_t monid) {
274
275        ndag_internal_message_t alert;
276
277        alert.type = NDAG_CLIENT_NEWGROUP;
278        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
279        alert.contents.localiface = FORMAT_DATA->localiface;
280        alert.contents.port = portnum;
281        alert.contents.monitor = monid;
282
283        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
284                        (void *)&alert);
285
286}
287       
288static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
289                int msgsize, uint16_t *ptmap) {
290
291        int i;
292        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
293        uint8_t msgtype;
294
295        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
296        if (msgtype == 0) {
297                return -1;
298        }
299
300        msgsize -= sizeof(ndag_common_t);
301        if (msgtype == NDAG_PKT_BEACON) {
302                /* If message is a beacon, make sure every port included in the
303                 * beacon is assigned to a receive thread.
304                 */
305                uint16_t *ptr, numstreams;
306
307                if ((uint32_t)msgsize < sizeof(uint16_t)) {
308                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
309                        return -1;
310                }
311
312                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
313                numstreams = ntohs(*ptr);
314                ptr ++;
315
316                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
317                {
318                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
319                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
320                        return -1;
321                }
322
323                for (i = 0; i < numstreams; i++) {
324                        uint16_t streamport = ntohs(*ptr);
325
326                        if (ptmap[streamport] == 0xffff) {
327                                new_group_alert(libtrace,
328                                        FORMAT_DATA->nextthreadid, streamport,
329                                        ntohs(ndaghdr->monitorid));
330
331                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
332
333                                if (libtrace->perpkt_thread_count == 0) {
334                                        FORMAT_DATA->nextthreadid = 0;
335                                } else {
336                                        FORMAT_DATA->nextthreadid =
337                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
338                                }
339                        }
340
341                        ptr ++;
342                }
343        } else {
344                fprintf(stderr,
345                        "Unexpected message type on control channel: %u\n",
346                         msgtype);
347                return -1;
348        }
349
350        return 0;
351
352}
353
354static void *ndag_controller_run(void *tdata) {
355
356        libtrace_t *libtrace = (libtrace_t *)tdata;
357        uint16_t ptmap[65536];
358        int sock = -1;
359        struct addrinfo *receiveaddr = NULL;
360        fd_set listening;
361        struct timeval timeout;
362
363        /* ptmap is a dirty hack to allow us to quickly check if we've already
364         * assigned a stream to a thread.
365         */
366        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
367
368        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
369                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
370                        &receiveaddr);
371        if (sock == -1) {
372                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
373                        "Unable to join multicast group for nDAG control channel");
374                trace_interrupt();
375        }
376
377        ndag_paused = 0;
378        while ((is_halted(libtrace) == -1) && !ndag_paused) {
379                int ret;
380                char buf[CTRL_BUF_SIZE];
381
382                FD_ZERO(&listening);
383                FD_SET(sock, &listening);
384
385                timeout.tv_sec = 0;
386                timeout.tv_usec = 500000;
387
388                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
389                if (ret < 0) {
390                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
391                        break;
392                }
393
394                if (!FD_ISSET(sock, &listening)) {
395                        continue;
396                }
397
398                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
399                                receiveaddr->ai_addr,
400                                &(receiveaddr->ai_addrlen));
401                if (ret < 0) {
402                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
403                        break;
404                }
405
406                if (ret == 0) {
407                        break;
408                }
409
410                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
411                        fprintf(stderr, "Error while parsing nDAG control message.\n");
412                        continue;
413                }
414        }
415
416        if (sock >= 0) {
417                close(sock);
418        }
419
420        /* Control channel has fallen over, should probably encourage libtrace
421         * to halt the receiver threads as well.
422         */
423        if (!is_halted(libtrace)) {
424                trace_interrupt();
425        }
426
427        pthread_exit(NULL);
428}
429
430static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
431{
432        int ret;
433        uint32_t i;
434        /* Configure the set of receiver threads */
435
436        if (FORMAT_DATA->receivers == NULL) {
437                /* What if the number of threads changes between a pause and
438                 * a restart? Can this happen? */
439                FORMAT_DATA->receivers = (recvstream_t *)
440                                malloc(sizeof(recvstream_t) * maxthreads);
441        }
442
443        for (i = 0; i < maxthreads; i++) {
444                FORMAT_DATA->receivers[i].sources = NULL;
445                FORMAT_DATA->receivers[i].sourcecount = 0;
446                FORMAT_DATA->receivers[i].knownmonitors = NULL;
447                FORMAT_DATA->receivers[i].monitorcount = 0;
448                FORMAT_DATA->receivers[i].threadindex = i;
449                FORMAT_DATA->receivers[i].dropped_upstream = 0;
450                FORMAT_DATA->receivers[i].received_packets = 0;
451                FORMAT_DATA->receivers[i].missing_records = 0;
452
453                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
454                                sizeof(ndag_internal_message_t));
455        }
456
457        /* Start the controller thread */
458        /* TODO consider affinity of this thread? */
459
460        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
461                        ndag_controller_run, libtrace);
462        if (ret != 0) {
463                return -1;
464        }
465        return maxthreads;
466}
467
468static int ndag_start_input(libtrace_t *libtrace) {
469        return ndag_start_threads(libtrace, 1);
470}
471
472static int ndag_pstart_input(libtrace_t *libtrace) {
473        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
474                        libtrace->perpkt_thread_count)
475                return 0;
476        return -1;
477}
478
479static void halt_ndag_receiver(recvstream_t *receiver) {
480        int j, i;
481        libtrace_message_queue_destroy(&(receiver->mqueue));
482
483        if (receiver->sources == NULL)
484                return;
485        for (i = 0; i < receiver->sourcecount; i++) {
486                streamsock_t src = receiver->sources[i];
487                if (src.saved) {
488                        for (j = 0; i < ENCAP_BUFFERS; j++) {
489                                if (src.saved[j]) {
490                                        free(src.saved[j]);
491                                }
492                        }
493                        free(src.saved);
494                }
495                for (j = 0; j < RECV_BATCH_SIZE; j++) {
496                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
497                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
498                        }
499                }
500                close(src.sock);
501        }
502        if (receiver->knownmonitors) {
503                free(receiver->knownmonitors);
504        }
505
506        if (receiver->sources) {
507                free(receiver->sources);
508        }
509}
510
511static int ndag_pause_input(libtrace_t *libtrace) {
512        int i;
513
514        /* Close the existing receiver sockets */
515        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
516               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
517        }
518        return 0;
519}
520
521static int ndag_fin_input(libtrace_t *libtrace) {
522
523        if (FORMAT_DATA->receivers) {
524                free(FORMAT_DATA->receivers);
525        }
526        if (FORMAT_DATA->multicastgroup) {
527                free(FORMAT_DATA->multicastgroup);
528        }
529        if (FORMAT_DATA->portstr) {
530                free(FORMAT_DATA->portstr);
531        }
532        if (FORMAT_DATA->localiface) {
533                free(FORMAT_DATA->localiface);
534        }
535
536        free(libtrace->format_data);
537        return 0;
538}
539
540static int ndag_prepare_packet_stream(libtrace_t *libtrace,
541                recvstream_t *rt,
542                streamsock_t *ssock, libtrace_packet_t *packet,
543                uint32_t flags) {
544
545        dag_record_t *erfptr;
546        ndag_encap_t *encaphdr;
547        uint16_t ndag_reccount = 0;
548        int nr;
549
550        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
551                packet->buf_control = TRACE_CTRL_PACKET;
552        } else {
553                packet->buf_control = TRACE_CTRL_EXTERNAL;
554        }
555
556        packet->trace = libtrace;
557        packet->buffer = ssock->nextread;
558        packet->header = ssock->nextread;
559        packet->type = TRACE_RT_DATA_ERF;
560
561        erfptr = (dag_record_t *)packet->header;
562
563        if (erfptr->flags.rxerror == 1) {
564                packet->payload = NULL;
565                erfptr->rlen = htons(erf_get_framing_length(packet));
566        } else {
567                packet->payload = (char *)packet->buffer +
568                                erf_get_framing_length(packet);
569        }
570
571        /* Update upstream drops using lctr */
572
573        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
574                /* TODO */
575        } else {
576                if (rt->received_packets > 0) {
577                        rt->dropped_upstream += ntohs(erfptr->lctr);
578                }
579        }
580
581        rt->received_packets ++;
582        ssock->recordcount += 1;
583
584        nr = ssock->nextreadind;
585        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
586                        sizeof(ndag_common_t));
587
588        ndag_reccount = ntohs(encaphdr->recordcount);
589        if ((ndag_reccount & 0x8000) != 0) {
590                /* Record was truncated -- update rlen appropriately */
591                erfptr->rlen = htons(ssock->savedsize[nr] -
592                                (ssock->nextread - ssock->saved[nr]));
593        }
594        ssock->nextread += ntohs(erfptr->rlen);
595
596        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
597                /* Read everything from this buffer, mark as empty and
598                 * move on. */
599                ssock->savedsize[nr] = 0;
600                nr ++;
601                if (nr == ENCAP_BUFFERS) {
602                        nr = 0;
603                }
604                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
605                                sizeof(ndag_encap_t);
606                ssock->nextreadind = nr;
607        }
608
609        packet->order = erf_get_erf_timestamp(packet);
610        packet->error = packet->payload ? ntohs(erfptr->rlen) :
611                        erf_get_framing_length(packet);
612
613        return ntohs(erfptr->rlen);
614}
615
616static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
617                libtrace_packet_t *packet UNUSED,
618                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
619                uint32_t flags UNUSED) {
620
621        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
622        return 0;
623
624}
625
626static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
627
628        ndag_monitor_t *mon;
629
630        if (rt->monitorcount == 0) {
631                rt->knownmonitors = (ndag_monitor_t *)
632                                malloc(sizeof(ndag_monitor_t) * 5);
633        } else {
634                rt->knownmonitors = (ndag_monitor_t *)
635                            realloc(rt->knownmonitors,
636                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
637        }
638
639        mon = &(rt->knownmonitors[rt->monitorcount]);
640        mon->monitorid = monid;
641        mon->laststart = 0;
642
643        rt->monitorcount ++;
644        return mon;
645}
646
647static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
648
649        streamsock_t *ssock = NULL;
650        ndag_monitor_t *mon = NULL;
651        int i;
652
653        /* TODO consider replacing this with a list or vector so we can
654         * easily remove sources that are no longer in use, rather than
655         * just setting the sock to -1 and having to check them every
656         * time we want to read a packet.
657         */
658        if (rt->sourcecount == 0) {
659                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
660        } else if ((rt->sourcecount % 10) == 0) {
661                rt->sources = (streamsock_t *)realloc(rt->sources,
662                        sizeof(streamsock_t) * (rt->sourcecount + 10));
663        }
664
665        ssock = &(rt->sources[rt->sourcecount]);
666
667        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
668                        NULL, src.port, &(ssock->srcaddr));
669
670        if (ssock->sock < 0) {
671                return -1;
672        }
673
674        for (i = 0; i < rt->monitorcount; i++) {
675                if (rt->knownmonitors[i].monitorid == src.monitor) {
676                        mon = &(rt->knownmonitors[i]);
677                        break;
678                }
679        }
680
681        if (mon == NULL) {
682                mon = add_new_knownmonitor(rt, src.monitor);
683        }
684
685        ssock->port = src.port;
686        ssock->groupaddr = src.groupaddr;
687        ssock->expectedseq = 0;
688        ssock->monitorptr = mon;
689        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
690        ssock->startidle = 0;
691
692        for (i = 0; i < ENCAP_BUFFERS; i++) {
693                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
694                ssock->savedsize[i] = 0;
695        }
696
697        for (i = 0; i < RECV_BATCH_SIZE; i++) {
698                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
699                                malloc(sizeof(struct iovec));
700                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
701                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
702                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
703                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
704                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
705                ssock->mmsgbufs[i].msg_len = 1;
706        }
707
708        ssock->nextread = NULL;;
709        ssock->nextreadind = 0;
710        ssock->recordcount = 0;
711        rt->sourcecount += 1;
712
713        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
714                        ssock->groupaddr, ssock->port, rt->threadindex);
715
716        return ssock->port;
717}
718
719static int receiver_read_messages(recvstream_t *rt) {
720
721        ndag_internal_message_t msg;
722
723        while (libtrace_message_queue_try_get(&(rt->mqueue),
724                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
725                switch(msg.type) {
726                        case NDAG_CLIENT_NEWGROUP:
727                                if (add_new_streamsock(rt, msg.contents) < 0) {
728                                        return -1;
729                                }
730                                break;
731                        case NDAG_CLIENT_HALT:
732                                return 0;
733                }
734        }
735        return 1;
736
737}
738
739static inline int readable_data(streamsock_t *ssock) {
740
741        if (ssock->sock == -1) {
742                return 0;
743        }
744        if (ssock->savedsize[ssock->nextreadind] == 0) {
745                return 0;
746        }
747        /*
748        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
749                        ssock->savedsize[ssock->nextreadind]) {
750                return 0;
751        }
752        */
753        return 1;
754
755
756}
757
758static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
759
760        int i;
761        for (i = 0; i < rt->sourcecount; i++) {
762                if (rt->sources[i].monitorptr == mon) {
763                        rt->sources[i].expectedseq = 0;
764                }
765        }
766
767}
768
769static void init_receivers(streamsock_t *ssock, int required) {
770
771        int wind = ssock->nextwriteind;
772        int i;
773
774        for (i = 0; i < required; i++) {
775                ssock->mmsgbufs[i].msg_len = 0;
776                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
777                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
778                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
779
780                wind ++;
781        }
782}
783
784static int count_receivers(streamsock_t *ssock) {
785
786        int wind = ssock->nextwriteind;
787        int i;
788        int avail = 0;
789
790        for (i = 0; i < RECV_BATCH_SIZE; i++) {
791                if (wind == ENCAP_BUFFERS) {
792                        wind = 0;
793                }
794
795                if (ssock->savedsize[wind] != 0) {
796                        /* No more empty buffers */
797                        break;
798                }
799
800                avail ++;
801                wind ++;
802        }
803
804        return avail;
805}
806
807static int check_ndag_received(streamsock_t *ssock, int index,
808                unsigned int msglen, recvstream_t *rt) {
809
810        ndag_encap_t *encaphdr;
811        ndag_monitor_t *mon;
812        uint8_t rectype;
813
814        /* Check that we have a valid nDAG encap record */
815        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
816
817        if (rectype == NDAG_PKT_KEEPALIVE) {
818                /* Keep-alive, reset startidle and carry on. Don't
819                 * change nextwrite -- we want to overwrite the
820                 * keep-alive with usable content. */
821                return 0;
822        } else if (rectype != NDAG_PKT_ENCAPERF) {
823                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
824                                ssock->groupaddr, ssock->port);
825                close(ssock->sock);
826                ssock->sock = -1;
827                return -1;
828        }
829
830        ssock->savedsize[index] = msglen;
831        ssock->nextwriteind ++;
832
833        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
834                ssock->nextwriteind = 0;
835        }
836
837        /* Get the useful info from the encap header */
838        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
839
840        mon = ssock->monitorptr;
841
842        if (mon->laststart == 0) {
843                mon->laststart = bswap_be_to_host64(encaphdr->started);
844        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
845                mon->laststart = bswap_be_to_host64(encaphdr->started);
846                reset_expected_seqs(rt, mon);
847
848                /* TODO what is a good way to indicate this to clients?
849                 * set the loss counter in the ERF header? a bit rude?
850                 * use another bit in the ERF header?
851                 * add a queryable flag to libtrace_packet_t?
852                 */
853
854        }
855
856        if (ssock->expectedseq != 0) {
857                rt->missing_records += seq_cmp(
858                                ntohl(encaphdr->seqno), ssock->expectedseq);
859        }
860        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
861        if (ssock->expectedseq == 0) {
862                ssock->expectedseq ++;
863        }
864
865        if (ssock->nextread == NULL) {
866                /* If this is our first read, set up 'nextread'
867                 * by skipping past the nDAG headers */
868                ssock->nextread = ssock->saved[0] +
869                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
870        }
871        return 1;
872
873}
874
875static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
876                int *gottime, recvstream_t *rt) {
877
878        int avail, ret, ndagstat, i;
879        int toret = 0;
880
881        if (ssock->sock == -1) {
882                return 0;
883        }
884
885        avail = count_receivers(ssock);
886
887        if (avail == 0) {
888                /* All buffers were full, so something must be
889                 * available. */
890                return 1;
891        }
892
893        if (avail < RECV_BATCH_SIZE / 2) {
894                return 1;
895        }
896
897        init_receivers(ssock, avail);
898
899        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
900                        MSG_DONTWAIT, NULL);
901
902        if (ret < 0) {
903                /* Nothing to receive right now, but we should still
904                 * count as 'ready' if at least one buffer is full */
905                if (errno == EAGAIN || errno == EWOULDBLOCK) {
906                        if (readable_data(ssock)) {
907                                toret = 1;
908                        }
909                        if (!(*gottime)) {
910                                gettimeofday(tv, NULL);
911                                *gottime = 1;
912                        }
913                        if (ssock->startidle == 0) {
914                                ssock->startidle = tv->tv_sec;
915                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
916                                fprintf(stderr,
917                                        "Closing channel %s:%u due to inactivity.\n",
918                                        ssock->groupaddr,
919                                        ssock->port);
920
921                                close(ssock->sock);
922                                ssock->sock = -1;
923                        }
924                } else {
925
926                        fprintf(stderr,
927                                "Error receiving encapsulated records from %s:%u -- %s \n",
928                                ssock->groupaddr, ssock->port,
929                                strerror(errno));
930                        close(ssock->sock);
931                        ssock->sock = -1;
932                }
933                return toret;
934        }
935        ssock->startidle = 0;
936        for (i = 0; i < ret; i++) {
937                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
938                                ssock->mmsgbufs[i].msg_len, rt);
939                if (ndagstat == -1) {
940                        break;
941                }
942
943                if (ndagstat == 1) {
944                        toret = 1;
945                }
946        }
947
948        return toret;
949}
950
951static int receive_from_sockets(recvstream_t *rt) {
952
953        int i, readybufs, gottime;
954        struct timeval tv;
955
956        readybufs = 0;
957        gottime = 0;
958
959        for (i = 0; i < rt->sourcecount; i ++) {
960                readybufs += receive_from_single_socket(&(rt->sources[i]),
961                                &tv, &gottime, rt);
962        }
963
964        return readybufs;
965
966}
967
968
969static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
970                libtrace_packet_t *packet) {
971
972        int iserr = 0;
973
974        if (packet->buf_control == TRACE_CTRL_PACKET) {
975                free(packet->buffer);
976                packet->buffer = NULL;
977        }
978
979        do {
980                /* Make sure we shouldn't be halting */
981                if ((iserr = is_halted(libtrace)) != -1) {
982                        return iserr;
983                }
984
985                /* Check for any messages from the control thread */
986                iserr = receiver_read_messages(rt);
987
988                if (iserr <= 0) {
989                        return iserr;
990                }
991
992                /* If blocking and no sources, sleep for a bit and then try
993                 * checking for messages again.
994                 */
995                if (rt->sourcecount == 0) {
996                        usleep(10000);
997                        continue;
998                }
999
1000                if ((iserr = receive_from_sockets(rt)) < 0) {
1001                        return iserr;
1002                } else if (iserr > 0) {
1003                        /* At least one of our input sockets has available
1004                         * data, let's go ahead and use what we have. */
1005                        break;
1006                }
1007
1008                /* None of our sources have anything available, we can take
1009                 * a short break rather than immediately trying again.
1010                 */
1011                if (iserr == 0) {
1012                        usleep(100);
1013                }
1014
1015        } while (1);
1016
1017        return iserr;
1018}
1019
1020static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1021                libtrace_packet_t *packet) {
1022
1023        int iserr = 0;
1024
1025        if (packet->buf_control == TRACE_CTRL_PACKET) {
1026                free(packet->buffer);
1027                packet->buffer = NULL;
1028        }
1029
1030        /* Make sure we shouldn't be halting */
1031        if ((iserr = is_halted(libtrace)) != -1) {
1032                return iserr;
1033        }
1034
1035        /* If non-blocking and there are no sources, just break */
1036        if (rt->sourcecount == 0) {
1037                return 0;
1038        }
1039
1040        return receive_from_sockets(rt);
1041}
1042
1043static streamsock_t *select_next_packet(recvstream_t *rt) {
1044        int i;
1045        streamsock_t *ssock = NULL;
1046        uint64_t earliest = 0;
1047        uint64_t currentts = 0;
1048        dag_record_t *daghdr;
1049
1050        for (i = 0; i < rt->sourcecount; i ++) {
1051                if (!readable_data(&(rt->sources[i]))) {
1052                        continue;
1053                }
1054
1055                daghdr = (dag_record_t *)(rt->sources[i].nextread);
1056                currentts = bswap_le_to_host64(daghdr->ts);
1057
1058                if (earliest == 0 || earliest > currentts) {
1059                        earliest = currentts;
1060                        ssock = &(rt->sources[i]);
1061                }
1062                /*
1063                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
1064                                i, currentts,
1065                                rt->sources[i].recordcount,
1066                                rt->missing_records);
1067                */
1068        }
1069        return ssock;
1070}
1071
1072static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1073
1074        int rem;
1075        streamsock_t *nextavail = NULL;
1076        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1077                        packet);
1078
1079        if (rem <= 0) {
1080                return rem;
1081        }
1082
1083        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1084        if (nextavail == NULL) {
1085                return 0;
1086        }
1087
1088        /* nextread should point at an ERF header, so prepare 'packet' to be
1089         * a libtrace ERF packet. */
1090
1091        return ndag_prepare_packet_stream(libtrace,
1092                        &(FORMAT_DATA->receivers[0]), nextavail,
1093                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1094}
1095
1096static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1097                libtrace_packet_t **packets, size_t nb_packets) {
1098
1099        recvstream_t *rt;
1100        int rem;
1101        size_t read_packets = 0;
1102        streamsock_t *nextavail = NULL;
1103
1104        rt = (recvstream_t *)t->format_data;
1105
1106
1107        do {
1108                /* Only check for messages once per batch */
1109                if (read_packets == 0) {
1110                        rem = receive_encap_records_block(libtrace, rt,
1111                                packets[read_packets]);
1112                } else {
1113                        rem = receive_encap_records_nonblock(libtrace, rt,
1114                                packets[read_packets]);
1115                }
1116
1117                if (rem < 0) {
1118                        return rem;
1119                }
1120
1121                if (rem == 0) {
1122                        break;
1123                }
1124
1125                nextavail = select_next_packet(rt);
1126                if (nextavail == NULL) {
1127                        break;
1128                }
1129
1130                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1131                                packets[read_packets],
1132                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1133
1134                read_packets  ++;
1135                if (read_packets >= nb_packets) {
1136                        break;
1137                }
1138        } while (1);
1139
1140        return read_packets;
1141
1142}
1143
1144static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1145                libtrace_packet_t *packet) {
1146
1147
1148        libtrace_eventobj_t event = {0,0,0.0,0};
1149        int rem;
1150        streamsock_t *nextavail = NULL;
1151
1152        /* Only check for messages once per call */
1153        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1154        if (rem <= 0) {
1155                event.type = TRACE_EVENT_TERMINATE;
1156                return event;
1157        }
1158
1159        do {
1160                rem = receive_encap_records_nonblock(libtrace,
1161                                &(FORMAT_DATA->receivers[0]), packet);
1162
1163                if (rem < 0) {
1164                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1165                                "Received invalid nDAG records.");
1166                        event.type = TRACE_EVENT_TERMINATE;
1167                        break;
1168                }
1169
1170                if (rem == 0) {
1171                        /* Either we've been halted or we've got no packets
1172                         * right now. */
1173                        if (is_halted(libtrace) == 0) {
1174                                event.type = TRACE_EVENT_TERMINATE;
1175                                break;
1176                        }
1177                        event.type = TRACE_EVENT_SLEEP;
1178                        event.seconds = 0.0001;
1179                        break;
1180                }
1181
1182                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1183                if (nextavail == NULL) {
1184                        event.type = TRACE_EVENT_SLEEP;
1185                        event.seconds = 0.0001;
1186                        break;
1187                }
1188
1189                event.type = TRACE_EVENT_PACKET;
1190                ndag_prepare_packet_stream(libtrace,
1191                                &(FORMAT_DATA->receivers[0]), nextavail,
1192                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1193                event.size = trace_get_capture_length(packet) +
1194                                trace_get_framing_length(packet);
1195
1196                if (libtrace->filter) {
1197                        int filtret = trace_apply_filter(libtrace->filter,
1198                                        packet);
1199                        if (filtret == -1) {
1200                                trace_set_err(libtrace,
1201                                                TRACE_ERR_BAD_FILTER,
1202                                                "Bad BPF Filter");
1203                                event.type = TRACE_EVENT_TERMINATE;
1204                                break;
1205                        }
1206
1207                        if (filtret == 0) {
1208                                /* Didn't match filter, try next one */
1209                                libtrace->filtered_packets ++;
1210                                trace_clear_cache(packet);
1211                                continue;
1212                        }
1213                }
1214
1215                if (libtrace->snaplen > 0) {
1216                        trace_set_capture_length(packet, libtrace->snaplen);
1217                }
1218                libtrace->accepted_packets ++;
1219                break;
1220        } while (1);
1221
1222        return event;
1223}
1224
1225static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1226
1227        int i;
1228
1229        stat->dropped_valid = 1;
1230        stat->dropped = 0;
1231        stat->received_valid = 1;
1232        stat->received = 0;
1233        stat->missing_valid = 1;
1234        stat->missing = 0;
1235
1236        /* TODO Is this thread safe? */
1237        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1238                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1239                stat->received += FORMAT_DATA->receivers[i].received_packets;
1240                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1241        }
1242
1243}
1244
1245static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1246                libtrace_stat_t *stat) {
1247
1248        recvstream_t *recvr = (recvstream_t *)t->format_data;
1249
1250        if (libtrace == NULL)
1251                return;
1252        /* TODO Is this thread safe */
1253        stat->dropped_valid = 1;
1254        stat->dropped = recvr->dropped_upstream;
1255
1256        stat->received_valid = 1;
1257        stat->received = recvr->received_packets;
1258
1259        stat->missing_valid = 1;
1260        stat->missing = recvr->missing_records;
1261
1262}
1263
1264static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1265                bool reader) {
1266        recvstream_t *recvr;
1267
1268        if (!reader || t->type != THREAD_PERPKT) {
1269                return 0;
1270        }
1271
1272        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1273        t->format_data = recvr;
1274
1275        return 0;
1276}
1277
1278static struct libtrace_format_t ndag = {
1279
1280        "ndag",
1281        "",
1282        TRACE_FORMAT_NDAG,
1283        NULL,                   /* probe filename */
1284        NULL,                   /* probe magic */
1285        ndag_init_input,        /* init_input */
1286        NULL,                   /* config_input */
1287        ndag_start_input,       /* start_input */
1288        ndag_pause_input,       /* pause_input */
1289        NULL,                   /* init_output */
1290        NULL,                   /* config_output */
1291        NULL,                   /* start_output */
1292        ndag_fin_input,         /* fin_input */
1293        NULL,                   /* fin_output */
1294        ndag_read_packet,       /* read_packet */
1295        ndag_prepare_packet,    /* prepare_packet */
1296        NULL,                   /* fin_packet */
1297        NULL,                   /* write_packet */
1298        erf_get_link_type,      /* get_link_type */
1299        erf_get_direction,      /* get_direction */
1300        erf_set_direction,      /* set_direction */
1301        erf_get_erf_timestamp,  /* get_erf_timestamp */
1302        NULL,                   /* get_timeval */
1303        NULL,                   /* get_seconds */
1304        NULL,                   /* get_timespec */
1305        NULL,                   /* seek_erf */
1306        NULL,                   /* seek_timeval */
1307        NULL,                   /* seek_seconds */
1308        erf_get_capture_length, /* get_capture_length */
1309        erf_get_wire_length,    /* get_wire_length */
1310        erf_get_framing_length, /* get_framing_length */
1311        erf_set_capture_length, /* set_capture_length */
1312        NULL,                   /* get_received_packets */
1313        NULL,                   /* get_filtered_packets */
1314        NULL,                   /* get_dropped_packets */
1315        ndag_get_statistics,    /* get_statistics */
1316        NULL,                   /* get_fd */
1317        trace_event_ndag,       /* trace_event */
1318        NULL,                   /* help */
1319        NULL,                   /* next pointer */
1320        {true, 0},              /* live packet capture */
1321        ndag_pstart_input,      /* parallel start */
1322        ndag_pread_packets,     /* parallel read */
1323        ndag_pause_input,       /* parallel pause */
1324        NULL,
1325        ndag_pregister_thread,  /* register thread */
1326        NULL,
1327        ndag_get_thread_stats   /* per-thread stats */
1328};
1329
1330void ndag_constructor(void) {
1331        register_format(&ndag);
1332}
Note: See TracBrowser for help on using the repository browser.