source: lib/format_ndag.c @ 4bab977

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 4bab977 was 4bab977, checked in by Shane Alcock <salcock@…>, 4 years ago

Modifications in response to changes in the nDAG protocol.

  • Updated protocol to account for new laststart field in encap. header. Use this field to recognise when the monitor has been restarted.
  • When determining sequence number gap, don't count 0 as a valid sequence number -- seqno 0 is reserved in the nDAG protocol and will never be lost.
  • Property mode set to 100644
File size: 40.0 KB
Line 
1
2#define _GNU_SOURCE
3
4#include "config.h"
5#include "common.h"
6#include "libtrace.h"
7#include "libtrace_int.h"
8#include "format_helper.h"
9#include "format_erf.h"
10
11#include <assert.h>
12#include <errno.h>
13#include <fcntl.h>
14#include <stdio.h>
15#include <string.h>
16#include <unistd.h>
17#include <stdlib.h>
18#include <net/if.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netdb.h>
22
23#include "format_ndag.h"
24
25#define NDAG_IDLE_TIMEOUT (600)
26#define ENCAP_BUFSIZE (10000)
27#define CTRL_BUF_SIZE (10000)
28#define ENCAP_BUFFERS (100)
29
30#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
31
32static struct libtrace_format_t ndag;
33
34volatile int ndag_paused = 0;
35
36typedef struct monitor {
37        uint16_t monitorid;
38        uint64_t laststart;
39} ndag_monitor_t;
40
41
42typedef struct streamsource {
43        uint16_t monitor;
44        char *groupaddr;
45        char *localiface;
46        uint16_t port;
47} streamsource_t;
48
49typedef struct streamsock {
50        char *groupaddr;
51        int sock;
52        struct addrinfo *srcaddr;
53        uint16_t port;
54        uint32_t expectedseq;
55        ndag_monitor_t *monitorptr;
56        char **saved;
57        char *nextread;
58        int nextreadind;
59        int nextwriteind;
60        int savedsize[ENCAP_BUFFERS];
61        uint32_t startidle;
62        uint64_t recordcount;
63
64} streamsock_t;
65
66typedef struct recvstream {
67        streamsock_t *sources;
68        uint16_t sourcecount;
69        libtrace_message_queue_t mqueue;
70        int threadindex;
71        ndag_monitor_t *knownmonitors;
72        uint16_t monitorcount;
73
74        uint64_t dropped_upstream;
75        uint64_t missing_records;
76        uint64_t received_packets;
77} recvstream_t;
78
79typedef struct ndag_format_data {
80        char *multicastgroup;
81        char *portstr;
82        char *localiface;
83        uint16_t nextthreadid;
84        recvstream_t *receivers;
85
86        pthread_t controlthread;
87        libtrace_message_queue_t controlqueue;
88} ndag_format_data_t;
89
90enum {
91        NDAG_CLIENT_HALT = 0x01,
92        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
93        NDAG_CLIENT_NEWGROUP = 0x03
94};
95
96typedef struct ndagreadermessage {
97        uint8_t type;
98        streamsource_t contents;
99} ndag_internal_message_t;
100
101
102static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
103
104        /* Calculate seq_a - seq_b, taking wraparound into account */
105        if (seq_a == seq_b) return 0;
106
107        if (seq_a > seq_b) {
108                return (int) (seq_a - seq_b);
109        }
110
111        /* -1 for the wrap and another -1 because we don't use zero */
112        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
113}
114
115static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
116        ndag_common_t *header = (ndag_common_t *)msgbuf;
117
118        if (msgsize < sizeof(ndag_common_t)) {
119                fprintf(stderr,
120                        "nDAG message does not have a complete nDAG header.\n");
121                return 0;
122        }
123
124        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
125                fprintf(stderr,
126                        "nDAG message does not have a valid magic number.\n");
127                return 0;
128        }
129
130        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
131                fprintf(stderr,
132                        "nDAG message has an invalid header version: %u\n",
133                                header->version);
134                return 0;
135        }
136
137        return header->type;
138}
139
140static int join_multicast_group(char *groupaddr, char *localiface,
141        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
142
143        struct addrinfo hints;
144        struct addrinfo *gotten;
145        struct addrinfo *group;
146        unsigned int interface;
147        char pstr[16];
148        struct group_req greq;
149        int bufsize;
150
151        int sock;
152
153        if (portstr == NULL) {
154                snprintf(pstr, 15, "%u", portnum);
155                portstr = pstr;
156        }
157
158        interface = if_nametoindex(localiface);
159        if (interface == 0) {
160                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
161                                localiface, strerror(errno));
162                return -1;
163        }
164
165        hints.ai_family = PF_UNSPEC;
166        hints.ai_socktype = SOCK_DGRAM;
167        hints.ai_flags = AI_PASSIVE;
168        hints.ai_protocol = 0;
169
170        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
171                fprintf(stderr,
172                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
173                                portstr, strerror(errno));
174                return -1;
175        }
176
177        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
178                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
179                                groupaddr, strerror(errno));
180                return -1;
181        }
182
183        *srcinfo = gotten;
184        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
185        if (sock < 0) {
186                fprintf(stderr,
187                        "Failed to create multicast socket for %s:%s -- %s\n",
188                                groupaddr, portstr, strerror(errno));
189                goto sockcreateover;
190        }
191
192        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
193        {
194                fprintf(stderr,
195                        "Failed to bind to multicast socket %s:%s -- %s\n",
196                                groupaddr, portstr, strerror(errno));
197                close(sock);
198                sock = -1;
199                goto sockcreateover;
200        }
201
202        greq.gr_interface = interface;
203        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
204
205        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
206                        sizeof(greq)) < 0) {
207                fprintf(stderr,
208                        "Failed to join multicast group %s:%s -- %s\n",
209                                groupaddr, portstr, strerror(errno));
210                close(sock);
211                sock = -1;
212                goto sockcreateover;
213        }
214
215        bufsize = 16 * 1024 * 1024;
216        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
217                                (socklen_t)sizeof(int)) < 0) {
218
219                fprintf(stderr,
220                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
221                                groupaddr, portstr, strerror(errno));
222                close(sock);
223                sock = -1;
224                goto sockcreateover;
225        }
226
227sockcreateover:
228        freeaddrinfo(group);
229        return sock;
230}
231
232
233static int ndag_init_input(libtrace_t *libtrace) {
234
235        char *scan = NULL;
236        char *next = NULL;
237
238        libtrace->format_data = (ndag_format_data_t *)malloc(
239                        sizeof(ndag_format_data_t));
240
241        FORMAT_DATA->multicastgroup = NULL;
242        FORMAT_DATA->portstr = NULL;
243        FORMAT_DATA->localiface = NULL;
244        FORMAT_DATA->nextthreadid = 0;
245        FORMAT_DATA->receivers = NULL;
246
247        scan = strchr(libtrace->uridata, ',');
248        if (scan == NULL) {
249                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
250                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
251                return -1;
252        }
253        FORMAT_DATA->localiface = strndup(libtrace->uridata,
254                        (size_t)(scan - libtrace->uridata));
255        next = scan + 1;
256
257        scan = strchr(next, ',');
258        if (scan == NULL) {
259                FORMAT_DATA->portstr = strdup("9001");
260                FORMAT_DATA->multicastgroup = strdup(next);
261        } else {
262                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
263
264                FORMAT_DATA->portstr = strdup(scan + 1);
265        }
266        return 0;
267}
268
269static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
270                uint16_t portnum, uint16_t monid) {
271
272        ndag_internal_message_t alert;
273
274        alert.type = NDAG_CLIENT_NEWGROUP;
275        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
276        alert.contents.localiface = FORMAT_DATA->localiface;
277        alert.contents.port = portnum;
278        alert.contents.monitor = monid;
279
280        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
281                        (void *)&alert);
282
283}
284       
285static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
286                int msgsize, uint16_t *ptmap) {
287
288        int i;
289        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
290        uint8_t msgtype;
291
292        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
293        if (msgtype == 0) {
294                return -1;
295        }
296
297        msgsize -= sizeof(ndag_common_t);
298        if (msgtype == NDAG_PKT_BEACON) {
299                /* If message is a beacon, make sure every port included in the
300                 * beacon is assigned to a receive thread.
301                 */
302                uint16_t *ptr, numstreams;
303
304                if ((uint32_t)msgsize < sizeof(uint16_t)) {
305                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
306                        return -1;
307                }
308
309                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
310                numstreams = ntohs(*ptr);
311                ptr ++;
312
313                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
314                {
315                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
316                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
317                        return -1;
318                }
319
320                for (i = 0; i < numstreams; i++) {
321                        uint16_t streamport = ntohs(*ptr);
322
323                        if (ptmap[streamport] == 0xffff) {
324                                new_group_alert(libtrace,
325                                        FORMAT_DATA->nextthreadid, streamport,
326                                        ntohs(ndaghdr->monitorid));
327
328                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
329
330                                if (libtrace->perpkt_thread_count == 0) {
331                                        FORMAT_DATA->nextthreadid = 0;
332                                } else {
333                                        FORMAT_DATA->nextthreadid =
334                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
335                                }
336                        }
337
338                        ptr ++;
339                }
340        } else {
341                fprintf(stderr,
342                        "Unexpected message type on control channel: %u\n",
343                         msgtype);
344                return -1;
345        }
346
347        return 0;
348
349}
350
351static void *ndag_controller_run(void *tdata) {
352
353        libtrace_t *libtrace = (libtrace_t *)tdata;
354        uint16_t ptmap[65536];
355        int sock = -1;
356        struct addrinfo *receiveaddr = NULL;
357        fd_set listening;
358        struct timeval timeout;
359
360        /* ptmap is a dirty hack to allow us to quickly check if we've already
361         * assigned a stream to a thread.
362         */
363        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
364
365        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
366                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
367                        &receiveaddr);
368        if (sock == -1) {
369                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
370                        "Unable to join multicast group for nDAG control channel");
371                trace_interrupt();
372        }
373
374        ndag_paused = 0;
375        while ((is_halted(libtrace) == -1) && !ndag_paused) {
376                int ret;
377                char buf[CTRL_BUF_SIZE];
378
379                FD_ZERO(&listening);
380                FD_SET(sock, &listening);
381
382                timeout.tv_sec = 0;
383                timeout.tv_usec = 500000;
384
385                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
386                if (ret < 0) {
387                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
388                        break;
389                }
390
391                if (!FD_ISSET(sock, &listening)) {
392                        continue;
393                }
394
395                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
396                                receiveaddr->ai_addr,
397                                &(receiveaddr->ai_addrlen));
398                if (ret < 0) {
399                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
400                        break;
401                }
402
403                if (ret == 0) {
404                        break;
405                }
406
407                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
408                        fprintf(stderr, "Error while parsing nDAG control message.\n");
409                        continue;
410                }
411        }
412
413        if (sock >= 0) {
414                close(sock);
415        }
416
417        /* Control channel has fallen over, should probably encourage libtrace
418         * to halt the receiver threads as well.
419         */
420        if (!is_halted(libtrace)) {
421                trace_interrupt();
422        }
423
424        pthread_exit(NULL);
425}
426
427static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
428{
429        int ret;
430        uint32_t i;
431        /* Configure the set of receiver threads */
432
433        if (FORMAT_DATA->receivers == NULL) {
434                /* What if the number of threads changes between a pause and
435                 * a restart? Can this happen? */
436                FORMAT_DATA->receivers = (recvstream_t *)
437                                malloc(sizeof(recvstream_t) * maxthreads);
438        }
439
440        for (i = 0; i < maxthreads; i++) {
441                FORMAT_DATA->receivers[i].sources = NULL;
442                FORMAT_DATA->receivers[i].sourcecount = 0;
443                FORMAT_DATA->receivers[i].knownmonitors = NULL;
444                FORMAT_DATA->receivers[i].monitorcount = 0;
445                FORMAT_DATA->receivers[i].threadindex = i;
446                FORMAT_DATA->receivers[i].dropped_upstream = 0;
447                FORMAT_DATA->receivers[i].received_packets = 0;
448                FORMAT_DATA->receivers[i].missing_records = 0;
449
450                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
451                                sizeof(ndag_internal_message_t));
452        }
453
454        /* Start the controller thread */
455        /* TODO consider affinity of this thread? */
456
457        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
458                        ndag_controller_run, libtrace);
459        if (ret != 0) {
460                return -1;
461        }
462        return maxthreads;
463}
464
465static int ndag_start_input(libtrace_t *libtrace) {
466        return ndag_start_threads(libtrace, 1);
467}
468
469static int ndag_pstart_input(libtrace_t *libtrace) {
470        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
471                        libtrace->perpkt_thread_count)
472                return 0;
473        return -1;
474}
475
476static void halt_ndag_receiver(recvstream_t *receiver) {
477        int j, i;
478        libtrace_message_queue_destroy(&(receiver->mqueue));
479
480        if (receiver->sources == NULL)
481                return;
482        for (i = 0; i < receiver->sourcecount; i++) {
483                streamsock_t src = receiver->sources[i];
484                if (src.saved) {
485                        for (j = 0; i < ENCAP_BUFFERS; j++) {
486                                if (src.saved[j]) {
487                                        free(src.saved[j]);
488                                }
489                        }
490                        free(src.saved);
491                }
492                close(src.sock);
493        }
494        if (receiver->knownmonitors) {
495                free(receiver->knownmonitors);
496        }
497
498        if (receiver->sources) {
499                free(receiver->sources);
500        }
501}
502
503static int ndag_pause_input(libtrace_t *libtrace) {
504        int i;
505
506        /* Close the existing receiver sockets */
507        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
508               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
509        }
510        return 0;
511}
512
513static int ndag_fin_input(libtrace_t *libtrace) {
514
515        if (FORMAT_DATA->receivers) {
516                free(FORMAT_DATA->receivers);
517        }
518        if (FORMAT_DATA->multicastgroup) {
519                free(FORMAT_DATA->multicastgroup);
520        }
521        if (FORMAT_DATA->portstr) {
522                free(FORMAT_DATA->portstr);
523        }
524        if (FORMAT_DATA->localiface) {
525                free(FORMAT_DATA->localiface);
526        }
527
528        free(libtrace->format_data);
529        return 0;
530}
531
532static int ndag_prepare_packet_stream(libtrace_t *libtrace,
533                recvstream_t *rt,
534                streamsock_t *ssock, libtrace_packet_t *packet,
535                uint32_t flags) {
536
537        dag_record_t *erfptr;
538        ndag_encap_t *encaphdr;
539        uint16_t ndag_reccount = 0;
540        int nr;
541
542        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
543                packet->buf_control = TRACE_CTRL_PACKET;
544        } else {
545                packet->buf_control = TRACE_CTRL_EXTERNAL;
546        }
547
548        packet->trace = libtrace;
549        packet->buffer = ssock->nextread;
550        packet->header = ssock->nextread;
551        packet->type = TRACE_RT_DATA_ERF;
552
553        erfptr = (dag_record_t *)packet->header;
554
555        if (erfptr->flags.rxerror == 1) {
556                packet->payload = NULL;
557                erfptr->rlen = htons(erf_get_framing_length(packet));
558        } else {
559                packet->payload = (char *)packet->buffer +
560                                erf_get_framing_length(packet);
561        }
562
563        /* Update upstream drops using lctr */
564
565        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
566                /* TODO */
567        } else {
568                if (rt->received_packets > 0) {
569                        rt->dropped_upstream += ntohs(erfptr->lctr);
570                }
571        }
572
573        rt->received_packets ++;
574        ssock->recordcount += 1;
575
576        nr = ssock->nextreadind;
577        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
578                        sizeof(ndag_common_t));
579
580        ndag_reccount = ntohs(encaphdr->recordcount);
581        if ((ndag_reccount & 0x8000) != 0) {
582                /* Record was truncated -- update rlen appropriately */
583                erfptr->rlen = htons(ssock->savedsize[nr] -
584                                (ssock->nextread - ssock->saved[nr]));
585        }
586        ssock->nextread += ntohs(erfptr->rlen);
587
588        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
589                /* Read everything from this buffer, mark as empty and
590                 * move on. */
591                ssock->savedsize[nr] = 0;
592                nr = (nr + 1) % ENCAP_BUFFERS;
593                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
594                                sizeof(ndag_encap_t);
595                ssock->nextreadind = nr;
596        }
597
598        packet->order = erf_get_erf_timestamp(packet);
599        packet->error = packet->payload ? ntohs(erfptr->rlen) :
600                        erf_get_framing_length(packet);
601
602        return ntohs(erfptr->rlen);
603}
604
605static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
606                libtrace_packet_t *packet UNUSED,
607                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
608                uint32_t flags UNUSED) {
609
610        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
611        return 0;
612
613}
614
615static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
616
617        ndag_monitor_t *mon;
618
619        if (rt->monitorcount == 0) {
620                rt->knownmonitors = (ndag_monitor_t *)
621                                malloc(sizeof(ndag_monitor_t) * 5);
622        } else {
623                rt->knownmonitors = (ndag_monitor_t *)
624                            realloc(rt->knownmonitors,
625                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
626        }
627
628        mon = &(rt->knownmonitors[rt->monitorcount]);
629        mon->monitorid = monid;
630        mon->laststart = 0;
631
632        rt->monitorcount ++;
633        return mon;
634}
635
636static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
637
638        streamsock_t *ssock = NULL;
639        ndag_monitor_t *mon = NULL;
640        int i;
641
642        /* TODO consider replacing this with a list or vector so we can
643         * easily remove sources that are no longer in use, rather than
644         * just setting the sock to -1 and having to check them every
645         * time we want to read a packet.
646         */
647        if (rt->sourcecount == 0) {
648                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
649        } else if ((rt->sourcecount % 10) == 0) {
650                rt->sources = (streamsock_t *)realloc(rt->sources,
651                        sizeof(streamsock_t) * (rt->sourcecount + 10));
652        }
653
654        ssock = &(rt->sources[rt->sourcecount]);
655
656        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
657                        NULL, src.port, &(ssock->srcaddr));
658
659        if (ssock->sock < 0) {
660                return -1;
661        }
662
663        for (i = 0; i < rt->monitorcount; i++) {
664                if (rt->knownmonitors[i].monitorid == src.monitor) {
665                        mon = &(rt->knownmonitors[i]);
666                        break;
667                }
668        }
669
670        if (mon == NULL) {
671                mon = add_new_knownmonitor(rt, src.monitor);
672        }
673
674        ssock->port = src.port;
675        ssock->groupaddr = src.groupaddr;
676        ssock->expectedseq = 0;
677        ssock->monitorptr = mon;
678        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
679        ssock->startidle = 0;
680
681        for (i = 0; i < ENCAP_BUFFERS; i++) {
682                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
683                ssock->savedsize[i] = 0;
684        }
685
686        ssock->nextread = NULL;;
687        ssock->nextreadind = 0;
688        ssock->recordcount = 0;
689        rt->sourcecount += 1;
690
691        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
692                        ssock->groupaddr, ssock->port, rt->threadindex);
693
694        return ssock->port;
695}
696
697static int receiver_read_messages(recvstream_t *rt) {
698
699        ndag_internal_message_t msg;
700
701        while (libtrace_message_queue_try_get(&(rt->mqueue),
702                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
703                switch(msg.type) {
704                        case NDAG_CLIENT_NEWGROUP:
705                                if (add_new_streamsock(rt, msg.contents) < 0) {
706                                        return -1;
707                                }
708                                break;
709                        case NDAG_CLIENT_HALT:
710                                return 0;
711                }
712        }
713        return 1;
714
715}
716
717static inline int readable_data(streamsock_t *ssock) {
718
719        if (ssock->sock == -1) {
720                return 0;
721        }
722        if (ssock->savedsize[ssock->nextreadind] == 0) {
723                return 0;
724        }
725        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
726                        ssock->savedsize[ssock->nextreadind]) {
727                return 0;
728        }
729        return 1;
730
731
732}
733
734static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
735
736        int i;
737        for (i = 0; i < rt->sourcecount; i++) {
738                if (rt->sources[i].monitorptr == mon) {
739                        rt->sources[i].expectedseq = 0;
740                }
741        }
742
743}
744
745static int receive_from_sockets(recvstream_t *rt) {
746
747        int i, ret, readybufs, gottime;
748        struct timeval tv;
749        uint8_t rectype;
750
751        readybufs = 0;
752        gottime = 0;
753
754        for (i = 0; i < rt->sourcecount; i ++) {
755                int nw;
756                ndag_encap_t *encaphdr;
757                ndag_monitor_t *mon;
758
759                if (rt->sources[i].sock == -1) {
760                        continue;
761                } else if (rt->sources[i].savedsize[rt->sources[i].nextwriteind] != 0) {
762                        readybufs ++;
763                        continue;
764                }
765
766                nw = rt->sources[i].nextwriteind;
767
768                ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw],
769                                ENCAP_BUFSIZE, MSG_DONTWAIT,
770                                rt->sources[i].srcaddr->ai_addr,
771                                &(rt->sources[i].srcaddr->ai_addrlen));
772                if (ret < 0) {
773                        /* Nothing to receive right now, but we should still
774                         * count as 'ready' if at least one buffer is full */
775                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
776                                if (readable_data(&(rt->sources[i]))) {
777                                        readybufs ++;
778                                }
779                                if (!gottime) {
780                                        gettimeofday(&tv, NULL);
781                                }
782                                if (rt->sources[i].startidle == 0) {
783                                        rt->sources[i].startidle = tv.tv_sec;
784                                } else if (tv.tv_sec - rt->sources[i].startidle > NDAG_IDLE_TIMEOUT) {
785                                        fprintf(stderr, "Closing channel %s:%u due to inactivity.\n",
786                                                        rt->sources[i].groupaddr,
787                                                        rt->sources[i].port);
788
789                                        close(rt->sources[i].sock);
790                                        rt->sources[i].sock = -1;
791                                }
792                                continue;
793                        }
794
795                        fprintf(stderr,
796                                        "Error receiving encapsulated records from %s:%u -- %s \n",
797                                        rt->sources[i].groupaddr,
798                                        rt->sources[i].port,
799                                        strerror(errno));
800                        close(rt->sources[i].sock);
801                        rt->sources[i].sock = -1;
802                        continue;
803                }
804                rt->sources[i].startidle = 0;
805
806                /* Check that we have a valid nDAG encap record */
807                rectype = check_ndag_header(rt->sources[i].saved[nw], ret);
808
809                if (rectype == NDAG_PKT_KEEPALIVE) {
810                        /* Keep-alive, reset startidle and carry on. Don't
811                         * change nextwrite -- we want to overwrite the
812                         * keep-alive with usable content. */
813                        continue;
814                } else if (rectype != NDAG_PKT_ENCAPERF) {
815                        fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
816                                        rt->sources[i].groupaddr,
817                                        rt->sources[i].port);
818                        close(rt->sources[i].sock);
819                        rt->sources[i].sock = -1;
820                        continue;
821                }
822
823                rt->sources[i].savedsize[nw] = ret;
824                rt->sources[i].nextwriteind =
825                                (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS;
826
827                /* Get the useful info from the encap header */
828                encaphdr = (ndag_encap_t *)(rt->sources[i].saved[nw] +
829                                sizeof(ndag_common_t));
830
831                mon = rt->sources[i].monitorptr;
832                assert(mon);
833
834                if (mon->laststart == 0) {
835                        mon->laststart = bswap_be_to_host64(encaphdr->started);
836                } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
837                        mon->laststart = bswap_be_to_host64(encaphdr->started);
838                        reset_expected_seqs(rt, mon);
839
840                        /* TODO what is a good way to indicate this to clients?
841                         * set the loss counter in the ERF header? a bit rude?
842                         * use another bit in the ERF header?
843                         * add a queryable flag to libtrace_packet_t?
844                         */
845
846                }
847
848                if (rt->sources[i].expectedseq != 0) {
849                        rt->missing_records += seq_cmp(
850                                        ntohl(encaphdr->seqno),
851                                        rt->sources[i].expectedseq);
852                }
853                rt->sources[i].expectedseq = ntohl(encaphdr->seqno) + 1;
854                if (rt->sources[i].expectedseq == 0) {
855                        rt->sources[i].expectedseq ++;
856                }
857
858                if (rt->sources[i].nextread == NULL) {
859                        /* If this is our first read, set up 'nextread'
860                         * by skipping past the nDAG headers */
861                        rt->sources[i].nextread = rt->sources[i].saved[0] +
862                                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
863                }
864
865                readybufs ++;
866        }
867
868        return readybufs;
869
870}
871
872
873static int receive_encap_records(libtrace_t *libtrace, recvstream_t *rt,
874                libtrace_packet_t *packet, int block) {
875
876        int iserr = 0;
877
878        if (packet->buf_control == TRACE_CTRL_PACKET) {
879                free(packet->buffer);
880                packet->buffer = NULL;
881        }
882
883        do {
884                /* Make sure we shouldn't be halting */
885                if ((iserr = is_halted(libtrace)) != -1) {
886                        return iserr;
887                }
888
889                /* First, check for any messages from the control thread */
890                iserr = receiver_read_messages(rt);
891
892                if (iserr <= 0) {
893                        return iserr;
894                }
895
896                /* If non-blocking and there are no sources, just break */
897                if (!block && rt->sourcecount == 0) {
898                        iserr = 0;
899                        break;
900                }
901
902                /* If blocking and no sources, sleep for a bit and then try
903                 * checking for messages again.
904                 */
905                if (block && rt->sourcecount == 0) {
906                        usleep(10000);
907                        continue;
908                }
909
910                if ((iserr = receive_from_sockets(rt)) < 0) {
911                        return iserr;
912                } else if (iserr > 0) {
913                        /* At least one of our input sockets has available
914                         * data, let's go ahead and use what we have. */
915                        break;
916                }
917
918                /* None of our sources have anything available, we can take
919                 * a short break rather than immediately trying again.
920                 */
921                if (block && iserr == 0) {
922                        usleep(100);
923                }
924
925        } while (block);
926
927        return iserr;
928}
929
930static streamsock_t *select_next_packet(recvstream_t *rt) {
931        int i;
932        streamsock_t *ssock = NULL;
933        uint64_t earliest = 0;
934        uint64_t currentts = 0;
935        dag_record_t *daghdr;
936
937        for (i = 0; i < rt->sourcecount; i ++) {
938                if (!readable_data(&(rt->sources[i]))) {
939                        continue;
940                }
941
942                daghdr = (dag_record_t *)(rt->sources[i].nextread);
943                currentts = bswap_le_to_host64(daghdr->ts);
944
945                if (earliest == 0 || earliest > currentts) {
946                        earliest = currentts;
947                        ssock = &(rt->sources[i]);
948                }
949                /*
950                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
951                                i, currentts,
952                                rt->sources[i].recordcount,
953                                rt->missing_records);
954                */
955        }
956        return ssock;
957}
958
959static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
960
961        int rem;
962        streamsock_t *nextavail = NULL;
963        rem = receive_encap_records(libtrace, &(FORMAT_DATA->receivers[0]),
964                        packet, 1);
965
966        if (rem <= 0) {
967                return rem;
968        }
969
970        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
971        if (nextavail == NULL) {
972                return 0;
973        }
974
975        /* nextread should point at an ERF header, so prepare 'packet' to be
976         * a libtrace ERF packet. */
977
978        return ndag_prepare_packet_stream(libtrace,
979                        &(FORMAT_DATA->receivers[0]), nextavail,
980                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
981}
982
983static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
984                libtrace_packet_t **packets, size_t nb_packets) {
985
986        recvstream_t *rt;
987        int rem;
988        size_t read_packets = 0;
989        streamsock_t *nextavail = NULL;
990
991        rt = (recvstream_t *)t->format_data;
992
993        do {
994                rem = receive_encap_records(libtrace, rt,
995                                packets[read_packets],
996                                read_packets == 0 ? 1 : 0);
997                if (rem < 0) {
998                        return rem;
999                }
1000
1001                if (rem == 0) {
1002                        break;
1003                }
1004
1005                nextavail = select_next_packet(rt);
1006                if (nextavail == NULL) {
1007                        break;
1008                }
1009
1010                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1011                                packets[read_packets],
1012                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1013
1014                read_packets  ++;
1015                if (read_packets >= nb_packets) {
1016                        break;
1017                }
1018        } while (1);
1019
1020        return read_packets;
1021
1022}
1023
1024static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1025                libtrace_packet_t *packet) {
1026
1027
1028        libtrace_eventobj_t event = {0,0,0.0,0};
1029        int rem;
1030        streamsock_t *nextavail = NULL;
1031
1032        do {
1033                rem = receive_encap_records(libtrace,
1034                                &(FORMAT_DATA->receivers[0]), packet, 0);
1035
1036                if (rem < 0) {
1037                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1038                                "Received invalid nDAG records.");
1039                        event.type = TRACE_EVENT_TERMINATE;
1040                        break;
1041                }
1042
1043                if (rem == 0) {
1044                        /* Either we've been halted or we've got no packets
1045                         * right now. */
1046                        if (is_halted(libtrace) == 0) {
1047                                event.type = TRACE_EVENT_TERMINATE;
1048                                break;
1049                        }
1050                        event.type = TRACE_EVENT_SLEEP;
1051                        event.seconds = 0.0001;
1052                        break;
1053                }
1054
1055                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1056                if (nextavail == NULL) {
1057                        event.type = TRACE_EVENT_SLEEP;
1058                        event.seconds = 0.0001;
1059                        break;
1060                }
1061
1062                event.type = TRACE_EVENT_PACKET;
1063                ndag_prepare_packet_stream(libtrace,
1064                                &(FORMAT_DATA->receivers[0]), nextavail,
1065                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1066                event.size = trace_get_capture_length(packet) +
1067                                trace_get_framing_length(packet);
1068
1069                if (libtrace->filter) {
1070                        int filtret = trace_apply_filter(libtrace->filter,
1071                                        packet);
1072                        if (filtret == -1) {
1073                                trace_set_err(libtrace,
1074                                                TRACE_ERR_BAD_FILTER,
1075                                                "Bad BPF Filter");
1076                                event.type = TRACE_EVENT_TERMINATE;
1077                                break;
1078                        }
1079
1080                        if (filtret == 0) {
1081                                /* Didn't match filter, try next one */
1082                                libtrace->filtered_packets ++;
1083                                trace_clear_cache(packet);
1084                                continue;
1085                        }
1086                }
1087
1088                if (libtrace->snaplen > 0) {
1089                        trace_set_capture_length(packet, libtrace->snaplen);
1090                }
1091                libtrace->accepted_packets ++;
1092                break;
1093        } while (1);
1094
1095        return event;
1096}
1097
1098static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1099
1100        int i;
1101
1102        stat->dropped_valid = 1;
1103        stat->dropped = 0;
1104        stat->received_valid = 1;
1105        stat->received = 0;
1106        stat->missing_valid = 1;
1107        stat->missing = 0;
1108
1109        /* TODO Is this thread safe? */
1110        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1111                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1112                stat->received += FORMAT_DATA->receivers[i].received_packets;
1113                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1114        }
1115
1116}
1117
1118static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1119                libtrace_stat_t *stat) {
1120
1121        recvstream_t *recvr = (recvstream_t *)t->format_data;
1122
1123        if (libtrace == NULL)
1124                return;
1125        /* TODO Is this thread safe */
1126        stat->dropped_valid = 1;
1127        stat->dropped = recvr->dropped_upstream;
1128
1129        stat->received_valid = 1;
1130        stat->received = recvr->received_packets;
1131
1132        stat->missing_valid = 1;
1133        stat->missing = recvr->missing_records;
1134
1135}
1136
1137static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1138                bool reader) {
1139        recvstream_t *recvr;
1140
1141        if (!reader || t->type != THREAD_PERPKT) {
1142                return 0;
1143        }
1144
1145        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1146        t->format_data = recvr;
1147
1148        return 0;
1149}
1150
1151static struct libtrace_format_t ndag = {
1152
1153        "ndag",
1154        "",
1155        TRACE_FORMAT_NDAG,
1156        NULL,                   /* probe filename */
1157        NULL,                   /* probe magic */
1158        ndag_init_input,        /* init_input */
1159        NULL,                   /* config_input */
1160        ndag_start_input,       /* start_input */
1161        ndag_pause_input,       /* pause_input */
1162        NULL,                   /* init_output */
1163        NULL,                   /* config_output */
1164        NULL,                   /* start_output */
1165        ndag_fin_input,         /* fin_input */
1166        NULL,                   /* fin_output */
1167        ndag_read_packet,       /* read_packet */
1168        ndag_prepare_packet,    /* prepare_packet */
1169        NULL,                   /* fin_packet */
1170        NULL,                   /* write_packet */
1171        erf_get_link_type,      /* get_link_type */
1172        erf_get_direction,      /* get_direction */
1173        erf_set_direction,      /* set_direction */
1174        erf_get_erf_timestamp,  /* get_erf_timestamp */
1175        NULL,                   /* get_timeval */
1176        NULL,                   /* get_seconds */
1177        NULL,                   /* get_timespec */
1178        NULL,                   /* seek_erf */
1179        NULL,                   /* seek_timeval */
1180        NULL,                   /* seek_seconds */
1181        erf_get_capture_length, /* get_capture_length */
1182        erf_get_wire_length,    /* get_wire_length */
1183        erf_get_framing_length, /* get_framing_length */
1184        erf_set_capture_length, /* set_capture_length */
1185        NULL,                   /* get_received_packets */
1186        NULL,                   /* get_filtered_packets */
1187        NULL,                   /* get_dropped_packets */
1188        ndag_get_statistics,    /* get_statistics */
1189        NULL,                   /* get_fd */
1190        trace_event_ndag,       /* trace_event */
1191        NULL,                   /* help */
1192        NULL,                   /* next pointer */
1193        {true, 0},              /* live packet capture */
1194        ndag_pstart_input,      /* parallel start */
1195        ndag_pread_packets,     /* parallel read */
1196        ndag_pause_input,       /* parallel pause */
1197        NULL,
1198        ndag_pregister_thread,  /* register thread */
1199        NULL,
1200        ndag_get_thread_stats   /* per-thread stats */
1201};
1202
1203void ndag_constructor(void) {
1204        register_format(&ndag);
1205}
Note: See TracBrowser for help on using the repository browser.