source: lib/format_ndag.c @ e68325b

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since e68325b was e68325b, checked in by Shane Alcock <salcock@…>, 4 years ago

Add support for receiving keep-alives on the ndag sockets.

These are used by the ndag transmitter to let clients know that
they haven't died; instead, there have been no packets to export
for some reason.

  • Property mode set to 100644
File size: 38.5 KB
Line 
1
2#define _GNU_SOURCE
3
4#include "config.h"
5#include "common.h"
6#include "libtrace.h"
7#include "libtrace_int.h"
8#include "format_helper.h"
9#include "format_erf.h"
10
11#include <assert.h>
12#include <errno.h>
13#include <fcntl.h>
14#include <stdio.h>
15#include <string.h>
16#include <unistd.h>
17#include <stdlib.h>
18#include <net/if.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netdb.h>
22
23#include "format_ndag.h"
24
25#define NDAG_IDLE_TIMEOUT (600)
26#define ENCAP_BUFSIZE (10000)
27#define CTRL_BUF_SIZE (10000)
28#define ENCAP_BUFFERS (100)
29
30#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
31
32static struct libtrace_format_t ndag;
33
34volatile int ndag_paused = 0;
35
36typedef struct streamsource {
37        uint16_t monitor;
38        char *groupaddr;
39        char *localiface;
40        uint16_t port;
41} streamsource_t;
42
43typedef struct streamsock {
44        char *groupaddr;
45        int sock;
46        struct addrinfo *srcaddr;
47        uint16_t port;
48        uint32_t expectedseq;
49        uint16_t monitorid;
50
51        char **saved;
52        char *nextread;
53        int nextreadind;
54        int nextwriteind;
55        int savedsize[ENCAP_BUFFERS];
56        uint32_t startidle;
57        uint64_t recordcount;
58
59} streamsock_t;
60
61typedef struct recvstream {
62        streamsock_t *sources;
63        uint16_t sourcecount;
64        libtrace_message_queue_t mqueue;
65        int threadindex;
66
67        uint64_t dropped_upstream;
68        uint64_t missing_records;
69        uint64_t received_packets;
70} recvstream_t;
71
72typedef struct ndag_format_data {
73        char *multicastgroup;
74        char *portstr;
75        char *localiface;
76        uint16_t nextthreadid;
77        recvstream_t *receivers;
78
79        pthread_t controlthread;
80        libtrace_message_queue_t controlqueue;
81} ndag_format_data_t;
82
83enum {
84        NDAG_CLIENT_HALT = 0x01,
85        NDAG_CLIENT_RESTARTED = 0x02,
86        NDAG_CLIENT_NEWGROUP = 0x03
87};
88
89typedef struct ndagreadermessage {
90        uint8_t type;
91        streamsource_t contents;
92} ndag_internal_message_t;
93
94
95static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
96
97        /* Calculate seq_a - seq_b, taking wraparound into account */
98        if (seq_a == seq_b) return 0;
99
100        if (seq_a > seq_b) {
101                return (int) (seq_a - seq_b);
102        }
103        return (int) (0xffffffff - ((seq_b - seq_a) - 1));
104}
105
106static inline uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
107        ndag_common_t *header = (ndag_common_t *)msgbuf;
108
109        if (msgsize < sizeof(ndag_common_t)) {
110                fprintf(stderr,
111                        "nDAG message does not have a complete nDAG header.\n");
112                return 0;
113        }
114
115        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
116                fprintf(stderr,
117                        "nDAG message does not have a valid magic number.\n");
118                return 0;
119        }
120
121        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
122                fprintf(stderr,
123                        "nDAG message has an invalid header version: %u\n",
124                                header->version);
125                return 0;
126        }
127
128        return header->type;
129}
130
131static int join_multicast_group(char *groupaddr, char *localiface,
132        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
133
134        struct addrinfo hints;
135        struct addrinfo *gotten;
136        struct addrinfo *group;
137        unsigned int interface;
138        char pstr[16];
139        struct group_req greq;
140        int bufsize;
141
142        int sock;
143
144        if (portstr == NULL) {
145                snprintf(pstr, 15, "%u", portnum);
146                portstr = pstr;
147        }
148
149        interface = if_nametoindex(localiface);
150        if (interface == 0) {
151                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
152                                localiface, strerror(errno));
153                return -1;
154        }
155
156        hints.ai_family = PF_UNSPEC;
157        hints.ai_socktype = SOCK_DGRAM;
158        hints.ai_flags = AI_PASSIVE;
159        hints.ai_protocol = 0;
160
161        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
162                fprintf(stderr,
163                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
164                                portstr, strerror(errno));
165                return -1;
166        }
167
168        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
169                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
170                                groupaddr, strerror(errno));
171                return -1;
172        }
173
174        *srcinfo = gotten;
175        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
176        if (sock < 0) {
177                fprintf(stderr,
178                        "Failed to create multicast socket for %s:%s -- %s\n",
179                                groupaddr, portstr, strerror(errno));
180                goto sockcreateover;
181        }
182
183        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
184        {
185                fprintf(stderr,
186                        "Failed to bind to multicast socket %s:%s -- %s\n",
187                                groupaddr, portstr, strerror(errno));
188                close(sock);
189                sock = -1;
190                goto sockcreateover;
191        }
192
193        greq.gr_interface = interface;
194        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
195
196        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
197                        sizeof(greq)) < 0) {
198                fprintf(stderr,
199                        "Failed to join multicast group %s:%s -- %s\n",
200                                groupaddr, portstr, strerror(errno));
201                close(sock);
202                sock = -1;
203                goto sockcreateover;
204        }
205
206        bufsize = 16 * 1024 * 1024;
207        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
208                                (socklen_t)sizeof(int)) < 0) {
209
210                fprintf(stderr,
211                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
212                                groupaddr, portstr, strerror(errno));
213                close(sock);
214                sock = -1;
215                goto sockcreateover;
216        }
217
218sockcreateover:
219        freeaddrinfo(group);
220        return sock;
221}
222
223
224static int ndag_init_input(libtrace_t *libtrace) {
225
226        char *scan = NULL;
227        char *next = NULL;
228
229        libtrace->format_data = (ndag_format_data_t *)malloc(
230                        sizeof(ndag_format_data_t));
231
232        FORMAT_DATA->multicastgroup = NULL;
233        FORMAT_DATA->portstr = NULL;
234        FORMAT_DATA->localiface = NULL;
235        FORMAT_DATA->nextthreadid = 0;
236        FORMAT_DATA->receivers = NULL;
237
238        scan = strchr(libtrace->uridata, ',');
239        if (scan == NULL) {
240                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
241                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
242                return -1;
243        }
244        FORMAT_DATA->localiface = strndup(libtrace->uridata,
245                        (size_t)(scan - libtrace->uridata));
246        next = scan + 1;
247
248        scan = strchr(next, ',');
249        if (scan == NULL) {
250                FORMAT_DATA->portstr = strdup("9001");
251                FORMAT_DATA->multicastgroup = strdup(next);
252        } else {
253                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
254
255                FORMAT_DATA->portstr = strdup(scan + 1);
256        }
257        return 0;
258}
259
260static inline void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
261                uint16_t portnum, uint16_t monid) {
262
263        ndag_internal_message_t alert;
264
265        alert.type = NDAG_CLIENT_NEWGROUP;
266        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
267        alert.contents.localiface = FORMAT_DATA->localiface;
268        alert.contents.port = portnum;
269        alert.contents.monitor = monid;
270
271        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
272                        (void *)&alert);
273
274}
275       
276static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
277                int msgsize, uint16_t *ptmap) {
278
279        int i;
280        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
281        uint8_t msgtype;
282
283        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
284        if (msgtype == 0) {
285                return -1;
286        }
287
288        msgsize -= sizeof(ndag_common_t);
289        if (msgtype == NDAG_PKT_BEACON) {
290                /* If message is a beacon, make sure every port included in the
291                 * beacon is assigned to a receive thread.
292                 */
293                uint16_t *ptr, numstreams;
294
295                if ((uint32_t)msgsize < sizeof(uint16_t)) {
296                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
297                        return -1;
298                }
299
300                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
301                numstreams = ntohs(*ptr);
302                ptr ++;
303
304                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
305                {
306                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
307                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
308                        return -1;
309                }
310
311                for (i = 0; i < numstreams; i++) {
312                        uint16_t streamport = ntohs(*ptr);
313
314                        if (ptmap[streamport] == 0xffff) {
315                                new_group_alert(libtrace,
316                                        FORMAT_DATA->nextthreadid, streamport,
317                                        ntohs(ndaghdr->monitorid));
318
319                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
320
321                                if (libtrace->perpkt_thread_count == 0) {
322                                        FORMAT_DATA->nextthreadid = 0;
323                                } else {
324                                        FORMAT_DATA->nextthreadid =
325                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
326                                }
327                        }
328
329                        ptr ++;
330                }
331        } else if (msgtype == NDAG_PKT_RESTARTED) {
332                /* If message is a restart, push that to all active message
333                 * queues. */
334                ndag_internal_message_t alert;
335                alert.type = NDAG_CLIENT_RESTARTED;
336                alert.contents.monitor = ntohs(ndaghdr->monitorid);
337                alert.contents.groupaddr = NULL;
338                alert.contents.localiface = NULL;
339                alert.contents.port = 0;
340                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
341                        libtrace_message_queue_put(
342                                        &(FORMAT_DATA->receivers[i].mqueue),
343                                        (void *)&alert);
344                }
345        } else {
346                fprintf(stderr,
347                        "Unexpected message type on control channel: %u\n",
348                         msgtype);
349                return -1;
350        }
351
352        return 0;
353
354}
355
356static void *ndag_controller_run(void *tdata) {
357
358        libtrace_t *libtrace = (libtrace_t *)tdata;
359        uint16_t ptmap[65536];
360        int sock = -1;
361        struct addrinfo *receiveaddr = NULL;
362        fd_set listening;
363        struct timeval timeout;
364
365        /* ptmap is a dirty hack to allow us to quickly check if we've already
366         * assigned a stream to a thread.
367         */
368        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
369
370        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
371                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
372                        &receiveaddr);
373        if (sock == -1) {
374                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
375                        "Unable to join multicast group for nDAG control channel");
376                trace_interrupt();
377        }
378
379        ndag_paused = 0;
380        while ((is_halted(libtrace) == -1) && !ndag_paused) {
381                int ret;
382                char buf[CTRL_BUF_SIZE];
383
384                FD_ZERO(&listening);
385                FD_SET(sock, &listening);
386
387                timeout.tv_sec = 0;
388                timeout.tv_usec = 500000;
389
390                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
391                if (ret < 0) {
392                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
393                        break;
394                }
395
396                if (!FD_ISSET(sock, &listening)) {
397                        continue;
398                }
399
400                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
401                                receiveaddr->ai_addr,
402                                &(receiveaddr->ai_addrlen));
403                if (ret < 0) {
404                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
405                        break;
406                }
407
408                if (ret == 0) {
409                        break;
410                }
411
412                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
413                        fprintf(stderr, "Error while parsing nDAG control message.\n");
414                        continue;
415                }
416        }
417
418        if (sock >= 0) {
419                close(sock);
420        }
421
422        /* Control channel has fallen over, should probably encourage libtrace
423         * to halt the receiver threads as well.
424         */
425        if (!is_halted(libtrace)) {
426                trace_interrupt();
427        }
428
429        pthread_exit(NULL);
430}
431
432static inline int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
433{
434        int ret;
435        uint32_t i;
436        /* Configure the set of receiver threads */
437
438        if (FORMAT_DATA->receivers == NULL) {
439                /* What if the number of threads changes between a pause and
440                 * a restart? Can this happen? */
441                FORMAT_DATA->receivers = (recvstream_t *)
442                                malloc(sizeof(recvstream_t) * maxthreads);
443        }
444
445        for (i = 0; i < maxthreads; i++) {
446                FORMAT_DATA->receivers[i].sources = NULL;
447                FORMAT_DATA->receivers[i].sourcecount = 0;
448                FORMAT_DATA->receivers[i].threadindex = i;
449                FORMAT_DATA->receivers[i].dropped_upstream = 0;
450                FORMAT_DATA->receivers[i].received_packets = 0;
451                FORMAT_DATA->receivers[i].missing_records = 0;
452
453                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
454                                sizeof(ndag_internal_message_t));
455        }
456
457        /* Start the controller thread */
458        /* TODO consider affinity of this thread? */
459
460        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
461                        ndag_controller_run, libtrace);
462        if (ret != 0) {
463                return -1;
464        }
465        return maxthreads;
466}
467
468static int ndag_start_input(libtrace_t *libtrace) {
469        return ndag_start_threads(libtrace, 1);
470}
471
472static int ndag_pstart_input(libtrace_t *libtrace) {
473        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
474                        libtrace->perpkt_thread_count)
475                return 0;
476        return -1;
477}
478
479static void halt_ndag_receiver(recvstream_t *receiver) {
480        int j, i;
481        libtrace_message_queue_destroy(&(receiver->mqueue));
482
483        if (receiver->sources == NULL)
484                return;
485        for (i = 0; i < receiver->sourcecount; i++) {
486                streamsock_t src = receiver->sources[i];
487                if (src.saved) {
488                        for (j = 0; i < ENCAP_BUFFERS; j++) {
489                                if (src.saved[j]) {
490                                        free(src.saved[j]);
491                                }
492                        }
493                        free(src.saved);
494                }
495                close(src.sock);
496        }
497        if (receiver->sources) {
498                free(receiver->sources);
499        }
500}
501
502static int ndag_pause_input(libtrace_t *libtrace) {
503        int i;
504
505        /* Close the existing receiver sockets */
506        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
507               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
508        }
509        return 0;
510}
511
512static int ndag_fin_input(libtrace_t *libtrace) {
513
514        if (FORMAT_DATA->receivers) {
515                free(FORMAT_DATA->receivers);
516        }
517        if (FORMAT_DATA->multicastgroup) {
518                free(FORMAT_DATA->multicastgroup);
519        }
520        if (FORMAT_DATA->portstr) {
521                free(FORMAT_DATA->portstr);
522        }
523        if (FORMAT_DATA->localiface) {
524                free(FORMAT_DATA->localiface);
525        }
526
527        free(libtrace->format_data);
528        return 0;
529}
530
531static int ndag_prepare_packet_stream(libtrace_t *libtrace,
532                recvstream_t *rt,
533                streamsock_t *ssock, libtrace_packet_t *packet,
534                uint32_t flags) {
535
536        dag_record_t *erfptr;
537        ndag_encap_t *encaphdr;
538        uint16_t ndag_reccount = 0;
539        int nr;
540
541        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
542                packet->buf_control = TRACE_CTRL_PACKET;
543        } else {
544                packet->buf_control = TRACE_CTRL_EXTERNAL;
545        }
546
547        packet->trace = libtrace;
548        packet->buffer = ssock->nextread;
549        packet->header = ssock->nextread;
550        packet->type = TRACE_RT_DATA_ERF;
551
552        erfptr = (dag_record_t *)packet->header;
553
554        if (erfptr->flags.rxerror == 1) {
555                packet->payload = NULL;
556                erfptr->rlen = htons(erf_get_framing_length(packet));
557        } else {
558                packet->payload = (char *)packet->buffer +
559                                erf_get_framing_length(packet);
560        }
561
562        /* Update upstream drops using lctr */
563
564        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
565                /* TODO */
566        } else {
567                if (rt->received_packets > 0) {
568                        rt->dropped_upstream += ntohs(erfptr->lctr);
569                }
570        }
571
572        rt->received_packets ++;
573        ssock->recordcount += 1;
574
575        nr = ssock->nextreadind;
576        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
577                        sizeof(ndag_common_t));
578
579        ndag_reccount = ntohs(encaphdr->recordcount);
580        if ((ndag_reccount & 0x8000) != 0) {
581                /* Record was truncated -- update rlen appropriately */
582                erfptr->rlen = htons(ssock->savedsize[nr] -
583                                (ssock->nextread - ssock->saved[nr]));
584        }
585        ssock->nextread += ntohs(erfptr->rlen);
586
587        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
588                /* Read everything from this buffer, mark as empty and
589                 * move on. */
590                ssock->savedsize[nr] = 0;
591                nr = (nr + 1) % ENCAP_BUFFERS;
592                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
593                                sizeof(ndag_encap_t);
594                ssock->nextreadind = nr;
595        }
596
597        packet->order = erf_get_erf_timestamp(packet);
598        packet->error = packet->payload ? ntohs(erfptr->rlen) :
599                        erf_get_framing_length(packet);
600
601        return ntohs(erfptr->rlen);
602}
603
604static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
605                libtrace_packet_t *packet UNUSED,
606                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
607                uint32_t flags UNUSED) {
608
609        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
610        return 0;
611
612}
613
614static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
615
616        streamsock_t *ssock = NULL;
617        int i;
618
619        /* TODO consider replacing this with a list or vector so we can
620         * easily remove sources that are no longer in use, rather than
621         * just setting the sock to -1 and having to check them every
622         * time we want to read a packet.
623         */
624        if (rt->sourcecount == 0) {
625                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
626        } else if ((rt->sourcecount % 10) == 0) {
627                rt->sources = (streamsock_t *)realloc(rt->sources,
628                        sizeof(streamsock_t) * (rt->sourcecount + 10));
629        }
630
631        ssock = &(rt->sources[rt->sourcecount]);
632
633        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
634                        NULL, src.port, &(ssock->srcaddr));
635
636        if (ssock->sock < 0) {
637                return -1;
638        }
639
640        ssock->port = src.port;
641        ssock->groupaddr = src.groupaddr;
642        ssock->expectedseq = 0;
643        ssock->monitorid = src.monitor;
644        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
645        ssock->startidle = 0;
646
647        for (i = 0; i < ENCAP_BUFFERS; i++) {
648                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
649                ssock->savedsize[i] = 0;
650        }
651
652        ssock->nextread = NULL;;
653        ssock->nextreadind = 0;
654        ssock->recordcount = 0;
655        rt->sourcecount += 1;
656
657        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
658                        ssock->groupaddr, ssock->port, rt->threadindex);
659
660        return ssock->port;
661}
662
663static int receiver_read_messages(recvstream_t *rt) {
664
665        ndag_internal_message_t msg;
666
667        while (libtrace_message_queue_try_get(&(rt->mqueue),
668                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
669                switch(msg.type) {
670                        case NDAG_CLIENT_NEWGROUP:
671                                if (add_new_streamsock(rt, msg.contents) < 0) {
672                                        return -1;
673                                }
674                                break;
675                        case NDAG_CLIENT_RESTARTED:
676                                /* TODO */
677
678                                break;
679                        case NDAG_CLIENT_HALT:
680                                return 0;
681                }
682        }
683        return 1;
684
685}
686
687static inline int readable_data(streamsock_t ssock) {
688
689        if (ssock.sock == -1) {
690                return 0;
691        }
692        if (ssock.savedsize[ssock.nextreadind] == 0) {
693                return 0;
694        }
695        if (ssock.nextread - ssock.saved[ssock.nextreadind] >=
696                        ssock.savedsize[ssock.nextreadind]) {
697                return 0;
698        }
699        return 1;
700
701
702}
703
704static inline int read_required(streamsock_t ssock) {
705        if (ssock.sock == -1)
706                return 0;
707        if (ssock.savedsize[ssock.nextwriteind] == 0)
708                return 1;
709        //if (ssock.nextread - ssock.saved >= ssock.savedsize)
710        //        return 1;
711        return 0;
712}
713
714
715static int receive_from_sockets(recvstream_t *rt) {
716
717        int i, ret, readybufs, gottime;
718        struct timeval tv;
719        uint8_t rectype;
720
721        readybufs = 0;
722        gottime = 0;
723
724        for (i = 0; i < rt->sourcecount; i ++) {
725                int nw;
726                ndag_encap_t *encaphdr;
727
728                if (!read_required(rt->sources[i])) {
729                        if (rt->sources[i].sock != -1) {
730                                readybufs ++;
731                        }
732                        continue;
733                }
734
735                nw = rt->sources[i].nextwriteind;
736
737                ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw],
738                                ENCAP_BUFSIZE, MSG_DONTWAIT,
739                                rt->sources[i].srcaddr->ai_addr,
740                                &(rt->sources[i].srcaddr->ai_addrlen));
741                if (ret < 0) {
742                        /* Nothing to receive right now, but we should still
743                         * count as 'ready' if at least one buffer is full */
744                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
745                                if (readable_data(rt->sources[i])) {
746                                        readybufs ++;
747                                }
748                                if (!gottime) {
749                                        gettimeofday(&tv, NULL);
750                                }
751                                if (rt->sources[i].startidle == 0) {
752                                        rt->sources[i].startidle = tv.tv_sec;
753                                } else if (tv.tv_sec - rt->sources[i].startidle > NDAG_IDLE_TIMEOUT) {
754                                        fprintf(stderr, "Closing channel %s:%u due to inactivity.\n",
755                                                        rt->sources[i].groupaddr,
756                                                        rt->sources[i].port);
757
758                                        close(rt->sources[i].sock);
759                                        rt->sources[i].sock = -1;
760                                }
761                                continue;
762                        }
763
764                        fprintf(stderr,
765                                        "Error receiving encapsulated records from %s:%u -- %s \n",
766                                        rt->sources[i].groupaddr,
767                                        rt->sources[i].port,
768                                        strerror(errno));
769                        close(rt->sources[i].sock);
770                        rt->sources[i].sock = -1;
771                        continue;
772                }
773                rt->sources[i].startidle = 0;
774
775                /* Check that we have a valid nDAG encap record */
776                rectype = check_ndag_header(rt->sources[i].saved[nw], ret);
777
778                if (rectype == NDAG_PKT_KEEPALIVE) {
779                        /* Keep-alive, reset startidle and carry on. Don't
780                         * change nextwrite -- we want to overwrite the
781                         * keep-alive with usable content. */
782                        continue;
783                } else if (rectype != NDAG_PKT_ENCAPERF) {
784                        fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
785                                        rt->sources[i].groupaddr,
786                                        rt->sources[i].port);
787                        close(rt->sources[i].sock);
788                        rt->sources[i].sock = -1;
789                        continue;
790                }
791
792                rt->sources[i].savedsize[nw] = ret;
793                rt->sources[i].nextwriteind =
794                                (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS;
795
796                /* Get the useful info from the encap header */
797                encaphdr = (ndag_encap_t *)(rt->sources[i].saved[nw] +
798                                sizeof(ndag_common_t));
799
800                if (rt->sources[i].expectedseq != 0) {
801                        rt->missing_records += seq_cmp(
802                                        ntohl(encaphdr->seqno),
803                                        rt->sources[i].expectedseq);
804                }
805                rt->sources[i].expectedseq = ntohl(encaphdr->seqno) + 1;
806
807                if (rt->sources[i].nextread == NULL) {
808                        /* If this is our first read, set up 'nextread'
809                         * by skipping past the nDAG headers */
810                        rt->sources[i].nextread = rt->sources[i].saved[0] +
811                                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
812                }
813
814                readybufs += 1;
815        }
816
817        return readybufs;
818
819}
820
821
822static int receive_encap_records(libtrace_t *libtrace, recvstream_t *rt,
823                libtrace_packet_t *packet, int block) {
824
825        int iserr = 0;
826
827        if (packet->buf_control == TRACE_CTRL_PACKET) {
828                free(packet->buffer);
829                packet->buffer = NULL;
830        }
831
832        do {
833                /* Make sure we shouldn't be halting */
834                if ((iserr = is_halted(libtrace)) != -1) {
835                        return iserr;
836                }
837
838                /* First, check for any messages from the control thread */
839                iserr = receiver_read_messages(rt);
840
841                if (iserr <= 0) {
842                        return iserr;
843                }
844
845                /* If non-blocking and there are no sources, just break */
846                if (!block && rt->sourcecount == 0) {
847                        iserr = 0;
848                        break;
849                }
850
851                /* If blocking and no sources, sleep for a bit and then try
852                 * checking for messages again.
853                 */
854                if (block && rt->sourcecount == 0) {
855                        usleep(10000);
856                        continue;
857                }
858
859                if ((iserr = receive_from_sockets(rt)) < 0) {
860                        return iserr;
861                } else if (iserr > 0) {
862                        /* At least one of our input sockets has available
863                         * data, let's go ahead and use what we have. */
864                        break;
865                }
866
867                /* None of our sources have anything available, we can take
868                 * a short break rather than immediately trying again.
869                 */
870                if (block && iserr == 0) {
871                        usleep(100);
872                }
873
874        } while (block);
875
876        return iserr;
877}
878
879static streamsock_t *select_next_packet(recvstream_t *rt) {
880        int i;
881        streamsock_t *ssock = NULL;
882        uint64_t earliest = 0;
883        uint64_t currentts = 0;
884        dag_record_t *daghdr;
885
886        for (i = 0; i < rt->sourcecount; i ++) {
887                if (!readable_data(rt->sources[i])) {
888                        continue;
889                }
890
891                daghdr = (dag_record_t *)(rt->sources[i].nextread);
892                currentts = bswap_le_to_host64(daghdr->ts);
893
894                if (earliest == 0 || earliest > currentts) {
895                        earliest = currentts;
896                        ssock = &(rt->sources[i]);
897                }
898                /*
899                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
900                                i, currentts,
901                                rt->sources[i].recordcount,
902                                rt->missing_records);
903                */
904        }
905        return ssock;
906}
907
908static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
909
910        int rem;
911        streamsock_t *nextavail = NULL;
912        rem = receive_encap_records(libtrace, &(FORMAT_DATA->receivers[0]),
913                        packet, 1);
914
915        if (rem <= 0) {
916                return rem;
917        }
918
919        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
920        if (nextavail == NULL) {
921                return 0;
922        }
923
924        /* nextread should point at an ERF header, so prepare 'packet' to be
925         * a libtrace ERF packet. */
926
927        return ndag_prepare_packet_stream(libtrace,
928                        &(FORMAT_DATA->receivers[0]), nextavail,
929                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
930}
931
932static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
933                libtrace_packet_t **packets, size_t nb_packets) {
934
935        recvstream_t *rt;
936        int rem;
937        size_t read_packets = 0;
938        streamsock_t *nextavail = NULL;
939
940        rt = (recvstream_t *)t->format_data;
941
942        do {
943                rem = receive_encap_records(libtrace, rt,
944                                packets[read_packets],
945                                read_packets == 0 ? 1 : 0);
946                if (rem < 0) {
947                        return rem;
948                }
949
950                if (rem == 0) {
951                        break;
952                }
953
954                nextavail = select_next_packet(rt);
955                if (nextavail == NULL) {
956                        break;
957                }
958
959                ndag_prepare_packet_stream(libtrace, rt, nextavail,
960                                packets[read_packets],
961                                TRACE_PREP_DO_NOT_OWN_BUFFER);
962
963                read_packets  ++;
964                if (read_packets >= nb_packets) {
965                        break;
966                }
967        } while (1);
968
969        return read_packets;
970
971}
972
973static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
974                libtrace_packet_t *packet) {
975
976
977        libtrace_eventobj_t event = {0,0,0.0,0};
978        int rem;
979        streamsock_t *nextavail = NULL;
980
981        do {
982                rem = receive_encap_records(libtrace,
983                                &(FORMAT_DATA->receivers[0]), packet, 0);
984
985                if (rem < 0) {
986                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
987                                "Received invalid nDAG records.");
988                        event.type = TRACE_EVENT_TERMINATE;
989                        break;
990                }
991
992                if (rem == 0) {
993                        /* Either we've been halted or we've got no packets
994                         * right now. */
995                        if (is_halted(libtrace) == 0) {
996                                event.type = TRACE_EVENT_TERMINATE;
997                                break;
998                        }
999                        event.type = TRACE_EVENT_SLEEP;
1000                        event.seconds = 0.0001;
1001                        break;
1002                }
1003
1004                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1005                if (nextavail == NULL) {
1006                        event.type = TRACE_EVENT_SLEEP;
1007                        event.seconds = 0.0001;
1008                        break;
1009                }
1010
1011                event.type = TRACE_EVENT_PACKET;
1012                ndag_prepare_packet_stream(libtrace,
1013                                &(FORMAT_DATA->receivers[0]), nextavail,
1014                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1015                event.size = trace_get_capture_length(packet) +
1016                                trace_get_framing_length(packet);
1017
1018                if (libtrace->filter) {
1019                        int filtret = trace_apply_filter(libtrace->filter,
1020                                        packet);
1021                        if (filtret == -1) {
1022                                trace_set_err(libtrace,
1023                                                TRACE_ERR_BAD_FILTER,
1024                                                "Bad BPF Filter");
1025                                event.type = TRACE_EVENT_TERMINATE;
1026                                break;
1027                        }
1028
1029                        if (filtret == 0) {
1030                                /* Didn't match filter, try next one */
1031                                libtrace->filtered_packets ++;
1032                                trace_clear_cache(packet);
1033                                continue;
1034                        }
1035                }
1036
1037                if (libtrace->snaplen > 0) {
1038                        trace_set_capture_length(packet, libtrace->snaplen);
1039                }
1040                libtrace->accepted_packets ++;
1041                break;
1042        } while (1);
1043
1044        return event;
1045}
1046
1047static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1048
1049        int i;
1050
1051        stat->dropped_valid = 1;
1052        stat->dropped = 0;
1053        stat->received_valid = 1;
1054        stat->received = 0;
1055        stat->missing_valid = 1;
1056        stat->missing = 0;
1057
1058        /* TODO Is this thread safe? */
1059        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1060                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1061                stat->received += FORMAT_DATA->receivers[i].received_packets;
1062                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1063        }
1064
1065}
1066
1067static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1068                libtrace_stat_t *stat) {
1069
1070        recvstream_t *recvr = (recvstream_t *)t->format_data;
1071
1072        if (libtrace == NULL)
1073                return;
1074        /* TODO Is this thread safe */
1075        stat->dropped_valid = 1;
1076        stat->dropped = recvr->dropped_upstream;
1077
1078        stat->received_valid = 1;
1079        stat->received = recvr->received_packets;
1080
1081        stat->missing_valid = 1;
1082        stat->missing = recvr->missing_records;
1083
1084}
1085
1086static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1087                bool reader) {
1088        recvstream_t *recvr;
1089
1090        if (!reader || t->type != THREAD_PERPKT) {
1091                return 0;
1092        }
1093
1094        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1095        t->format_data = recvr;
1096
1097        return 0;
1098}
1099
1100static struct libtrace_format_t ndag = {
1101
1102        "ndag",
1103        "",
1104        TRACE_FORMAT_NDAG,
1105        NULL,                   /* probe filename */
1106        NULL,                   /* probe magic */
1107        ndag_init_input,        /* init_input */
1108        NULL,                   /* config_input */
1109        ndag_start_input,       /* start_input */
1110        ndag_pause_input,       /* pause_input */
1111        NULL,                   /* init_output */
1112        NULL,                   /* config_output */
1113        NULL,                   /* start_output */
1114        ndag_fin_input,         /* fin_input */
1115        NULL,                   /* fin_output */
1116        ndag_read_packet,       /* read_packet */
1117        ndag_prepare_packet,    /* prepare_packet */
1118        NULL,                   /* fin_packet */
1119        NULL,                   /* write_packet */
1120        erf_get_link_type,      /* get_link_type */
1121        erf_get_direction,      /* get_direction */
1122        erf_set_direction,      /* set_direction */
1123        erf_get_erf_timestamp,  /* get_erf_timestamp */
1124        NULL,                   /* get_timeval */
1125        NULL,                   /* get_seconds */
1126        NULL,                   /* get_timespec */
1127        NULL,                   /* seek_erf */
1128        NULL,                   /* seek_timeval */
1129        NULL,                   /* seek_seconds */
1130        erf_get_capture_length, /* get_capture_length */
1131        erf_get_wire_length,    /* get_wire_length */
1132        erf_get_framing_length, /* get_framing_length */
1133        erf_set_capture_length, /* set_capture_length */
1134        NULL,                   /* get_received_packets */
1135        NULL,                   /* get_filtered_packets */
1136        NULL,                   /* get_dropped_packets */
1137        ndag_get_statistics,    /* get_statistics */
1138        NULL,                   /* get_fd */
1139        trace_event_ndag,       /* trace_event */
1140        NULL,                   /* help */
1141        NULL,                   /* next pointer */
1142        {true, 0},              /* live packet capture */
1143        ndag_pstart_input,      /* parallel start */
1144        ndag_pread_packets,     /* parallel read */
1145        ndag_pause_input,       /* parallel pause */
1146        NULL,
1147        ndag_pregister_thread,  /* register thread */
1148        NULL,
1149        ndag_get_thread_stats   /* per-thread stats */
1150};
1151
1152void ndag_constructor(void) {
1153        register_format(&ndag);
1154}
Note: See TracBrowser for help on using the repository browser.