source: lib/format_ndag.c @ aa7db84

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since aa7db84 was aa7db84, checked in by Shane Alcock <salcock@…>, 3 years ago

Added support for seqno wrapping and improved performance.

Performance improvements are as follows:

  • Use larger receive buffer for the multicast recv socket.
  • Remove select in main receive function with a non-blocking recvfrom.
  • Only sleep if no sources have data available, thus avoiding constant 100% CPU usage due to the above change.
  • Property mode set to 100644
File size: 36.3 KB
Line 
1
2#define _GNU_SOURCE
3
4#include "config.h"
5#include "common.h"
6#include "libtrace.h"
7#include "libtrace_int.h"
8#include "format_helper.h"
9#include "format_erf.h"
10
11#include <assert.h>
12#include <errno.h>
13#include <fcntl.h>
14#include <stdio.h>
15#include <string.h>
16#include <unistd.h>
17#include <stdlib.h>
18#include <net/if.h>
19#include <sys/types.h>
20#include <sys/socket.h>
21#include <netdb.h>
22
23#include "format_ndag.h"
24
25#define ENCAP_BUFSIZE (10000)
26#define CTRL_BUF_SIZE (10000)
27
28#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
29
30static struct libtrace_format_t ndag;
31
32volatile int ndag_paused = 0;
33
34typedef struct streamsource {
35        uint16_t monitor;
36        char *groupaddr;
37        char *localiface;
38        uint16_t port;
39} streamsource_t;
40
41typedef struct streamsock {
42        char *groupaddr;
43        int sock;
44        struct addrinfo *srcaddr;
45        uint16_t port;
46        uint32_t expectedseq;
47        uint16_t monitorid;
48
49        char *saved;
50        char *nextread;
51        int savedsize;
52        ndag_encap_t *encaphdr;
53        uint64_t recordcount;
54
55} streamsock_t;
56
57typedef struct recvstream {
58        streamsock_t *sources;
59        uint16_t sourcecount;
60        libtrace_message_queue_t mqueue;
61        int threadindex;
62
63        uint64_t dropped_upstream;
64        uint64_t missing_records;
65        uint64_t received_packets;
66} recvstream_t;
67
68typedef struct ndag_format_data {
69        char *multicastgroup;
70        char *portstr;
71        char *localiface;
72        uint16_t nextthreadid;
73        recvstream_t *receivers;
74
75        pthread_t controlthread;
76        libtrace_message_queue_t controlqueue;
77} ndag_format_data_t;
78
79enum {
80        NDAG_CLIENT_HALT = 0x01,
81        NDAG_CLIENT_RESTARTED = 0x02,
82        NDAG_CLIENT_NEWGROUP = 0x03
83};
84
85typedef struct ndagreadermessage {
86        uint8_t type;
87        streamsource_t contents;
88} ndag_internal_message_t;
89
90
91static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
92
93        /* Calculate seq_a - seq_b, taking wraparound into account */
94        if (seq_a == seq_b) return 0;
95
96        if (seq_a > seq_b) {
97                return (int) (seq_a - seq_b);
98        }
99        return (int) (0xffffffff - ((seq_b - seq_a) - 1));
100}
101
102static inline uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
103        ndag_common_t *header = (ndag_common_t *)msgbuf;
104
105        if (msgsize < sizeof(ndag_common_t)) {
106                fprintf(stderr,
107                        "nDAG message does not have a complete nDAG header.\n");
108                return 0;
109        }
110
111        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
112                fprintf(stderr,
113                        "nDAG message does not have a valid magic number.\n");
114                return 0;
115        }
116
117        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
118                fprintf(stderr,
119                        "nDAG message has an invalid header version: %u\n",
120                                header->version);
121                return 0;
122        }
123
124        return header->type;
125}
126
127static int join_multicast_group(char *groupaddr, char *localiface,
128        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
129
130        struct addrinfo hints;
131        struct addrinfo *gotten;
132        struct addrinfo *group;
133        unsigned int interface;
134        char pstr[16];
135        struct group_req greq;
136        int bufsize;
137
138        int sock;
139
140        if (portstr == NULL) {
141                snprintf(pstr, 15, "%u", portnum);
142                portstr = pstr;
143        }
144
145        interface = if_nametoindex(localiface);
146        if (interface == 0) {
147                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
148                                localiface, strerror(errno));
149                return -1;
150        }
151
152        hints.ai_family = PF_UNSPEC;
153        hints.ai_socktype = SOCK_DGRAM;
154        hints.ai_flags = AI_PASSIVE;
155        hints.ai_protocol = 0;
156
157        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
158                fprintf(stderr,
159                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
160                                portstr, strerror(errno));
161                return -1;
162        }
163
164        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
165                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
166                                groupaddr, strerror(errno));
167                return -1;
168        }
169
170        *srcinfo = gotten;
171        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
172        if (sock < 0) {
173                fprintf(stderr,
174                        "Failed to create multicast socket for %s:%s -- %s\n",
175                                groupaddr, portstr, strerror(errno));
176                goto sockcreateover;
177        }
178
179        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
180        {
181                fprintf(stderr,
182                        "Failed to bind to multicast socket %s:%s -- %s\n",
183                                groupaddr, portstr, strerror(errno));
184                close(sock);
185                sock = -1;
186                goto sockcreateover;
187        }
188
189        greq.gr_interface = interface;
190        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
191
192        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
193                        sizeof(greq)) < 0) {
194                fprintf(stderr,
195                        "Failed to join multicast group %s:%s -- %s\n",
196                                groupaddr, portstr, strerror(errno));
197                close(sock);
198                sock = -1;
199                goto sockcreateover;
200        }
201
202        bufsize = 16 * 1024 * 1024;
203        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
204                                (socklen_t)sizeof(int)) < 0) {
205
206                fprintf(stderr,
207                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
208                                groupaddr, portstr, strerror(errno));
209                close(sock);
210                sock = -1;
211                goto sockcreateover;
212        }
213
214sockcreateover:
215        freeaddrinfo(group);
216        return sock;
217}
218
219
220static int ndag_init_input(libtrace_t *libtrace) {
221
222        char *scan = NULL;
223        char *next = NULL;
224
225        libtrace->format_data = (ndag_format_data_t *)malloc(
226                        sizeof(ndag_format_data_t));
227
228        FORMAT_DATA->multicastgroup = NULL;
229        FORMAT_DATA->portstr = NULL;
230        FORMAT_DATA->localiface = NULL;
231        FORMAT_DATA->nextthreadid = 0;
232        FORMAT_DATA->receivers = NULL;
233
234        scan = strchr(libtrace->uridata, ',');
235        if (scan == NULL) {
236                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
237                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
238                return -1;
239        }
240        FORMAT_DATA->localiface = strndup(libtrace->uridata,
241                        (size_t)(scan - libtrace->uridata));
242        next = scan + 1;
243
244        scan = strchr(next, ',');
245        if (scan == NULL) {
246                FORMAT_DATA->portstr = strdup("9001");
247                FORMAT_DATA->multicastgroup = strdup(next);
248        } else {
249                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
250
251                FORMAT_DATA->portstr = strdup(scan + 1);
252        }
253        return 0;
254}
255
256static inline void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
257                uint16_t portnum, uint16_t monid) {
258
259        ndag_internal_message_t alert;
260
261        alert.type = NDAG_CLIENT_NEWGROUP;
262        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
263        alert.contents.localiface = FORMAT_DATA->localiface;
264        alert.contents.port = portnum;
265        alert.contents.monitor = monid;
266
267        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
268                        (void *)&alert);
269
270}
271       
272static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
273                int msgsize, uint16_t *ptmap) {
274
275        int i;
276        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
277        uint8_t msgtype;
278
279        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
280        if (msgtype == 0) {
281                return -1;
282        }
283
284        msgsize -= sizeof(ndag_common_t);
285        if (msgtype == NDAG_PKT_BEACON) {
286                /* If message is a beacon, make sure every port included in the
287                 * beacon is assigned to a receive thread.
288                 */
289                uint16_t *ptr, numstreams;
290
291                if ((uint32_t)msgsize < sizeof(uint16_t)) {
292                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
293                        return -1;
294                }
295
296                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
297                numstreams = ntohs(*ptr);
298                ptr ++;
299
300                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
301                {
302                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
303                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
304                        return -1;
305                }
306
307                for (i = 0; i < numstreams; i++) {
308                        uint16_t streamport = ntohs(*ptr);
309
310                        if (ptmap[streamport] == 0xffff) {
311                                new_group_alert(libtrace,
312                                        FORMAT_DATA->nextthreadid, streamport,
313                                        ntohs(ndaghdr->monitorid));
314
315                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
316
317                                if (libtrace->perpkt_thread_count == 0) {
318                                        FORMAT_DATA->nextthreadid = 0;
319                                } else {
320                                        FORMAT_DATA->nextthreadid =
321                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
322                                }
323                        }
324
325                        ptr ++;
326                }
327        } else if (msgtype == NDAG_PKT_RESTARTED) {
328                /* If message is a restart, push that to all active message
329                 * queues. */
330                ndag_internal_message_t alert;
331                alert.type = NDAG_CLIENT_RESTARTED;
332                alert.contents.monitor = ntohs(ndaghdr->monitorid);
333                alert.contents.groupaddr = NULL;
334                alert.contents.localiface = NULL;
335                alert.contents.port = 0;
336                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
337                        libtrace_message_queue_put(
338                                        &(FORMAT_DATA->receivers[i].mqueue),
339                                        (void *)&alert);
340                }
341        } else {
342                fprintf(stderr,
343                        "Unexpected message type on control channel: %u\n",
344                         msgtype);
345                return -1;
346        }
347
348        return 0;
349
350}
351
352static void *ndag_controller_run(void *tdata) {
353
354        libtrace_t *libtrace = (libtrace_t *)tdata;
355        uint16_t ptmap[65536];
356        int sock = -1;
357        struct addrinfo *receiveaddr = NULL;
358        fd_set listening;
359        struct timeval timeout;
360
361        /* ptmap is a dirty hack to allow us to quickly check if we've already
362         * assigned a stream to a thread.
363         */
364        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
365
366        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
367                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
368                        &receiveaddr);
369        if (sock == -1) {
370                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
371                        "Unable to join multicast group for nDAG control channel");
372                trace_interrupt();
373        }
374
375        ndag_paused = 0;
376        while ((is_halted(libtrace) == -1) && !ndag_paused) {
377                int ret;
378                char buf[CTRL_BUF_SIZE];
379
380                FD_ZERO(&listening);
381                FD_SET(sock, &listening);
382
383                timeout.tv_sec = 0;
384                timeout.tv_usec = 500000;
385
386                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
387                if (ret < 0) {
388                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
389                        break;
390                }
391
392                if (!FD_ISSET(sock, &listening)) {
393                        continue;
394                }
395
396                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
397                                receiveaddr->ai_addr,
398                                &(receiveaddr->ai_addrlen));
399                if (ret < 0) {
400                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
401                        break;
402                }
403
404                if (ret == 0) {
405                        break;
406                }
407
408                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
409                        fprintf(stderr, "Error while parsing nDAG control message.\n");
410                        continue;
411                }
412        }
413
414        if (sock >= 0) {
415                close(sock);
416        }
417
418        /* Control channel has fallen over, should probably encourage libtrace
419         * to halt the receiver threads as well.
420         */
421        if (!is_halted(libtrace)) {
422                trace_interrupt();
423        }
424
425        pthread_exit(NULL);
426}
427
428static inline int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
429{
430        int ret;
431        uint32_t i;
432        /* Configure the set of receiver threads */
433
434        if (FORMAT_DATA->receivers == NULL) {
435                /* What if the number of threads changes between a pause and
436                 * a restart? Can this happen? */
437                FORMAT_DATA->receivers = (recvstream_t *)
438                                malloc(sizeof(recvstream_t) * maxthreads);
439        }
440
441        for (i = 0; i < maxthreads; i++) {
442                FORMAT_DATA->receivers[i].sources = NULL;
443                FORMAT_DATA->receivers[i].sourcecount = 0;
444                FORMAT_DATA->receivers[i].threadindex = i;
445                FORMAT_DATA->receivers[i].dropped_upstream = 0;
446                FORMAT_DATA->receivers[i].received_packets = 0;
447                FORMAT_DATA->receivers[i].missing_records = 0;
448
449                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
450                                sizeof(ndag_internal_message_t));
451        }
452
453        /* Start the controller thread */
454        /* TODO consider affinity of this thread? */
455
456        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
457                        ndag_controller_run, libtrace);
458        if (ret != 0) {
459                return -1;
460        }
461        return maxthreads;
462}
463
464static int ndag_start_input(libtrace_t *libtrace) {
465        return ndag_start_threads(libtrace, 1);
466}
467
468static int ndag_pstart_input(libtrace_t *libtrace) {
469        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
470                        libtrace->perpkt_thread_count)
471                return 0;
472        return -1;
473}
474
475static void halt_ndag_receiver(recvstream_t *receiver) {
476        int i;
477        libtrace_message_queue_destroy(&(receiver->mqueue));
478
479        if (receiver->sources == NULL)
480                return;
481        for (i = 0; i < receiver->sourcecount; i++) {
482                streamsock_t src = receiver->sources[i];
483                if (src.saved) {
484                        free(src.saved);
485                }
486                close(src.sock);
487        }
488        if (receiver->sources) {
489                free(receiver->sources);
490        }
491}
492
493static int ndag_pause_input(libtrace_t *libtrace) {
494        int i;
495
496        /* Close the existing receiver sockets */
497        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
498               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
499        }
500        return 0;
501}
502
503static int ndag_fin_input(libtrace_t *libtrace) {
504
505        if (FORMAT_DATA->receivers) {
506                free(FORMAT_DATA->receivers);
507        }
508        if (FORMAT_DATA->multicastgroup) {
509                free(FORMAT_DATA->multicastgroup);
510        }
511        if (FORMAT_DATA->portstr) {
512                free(FORMAT_DATA->portstr);
513        }
514        if (FORMAT_DATA->localiface) {
515                free(FORMAT_DATA->localiface);
516        }
517
518        free(libtrace->format_data);
519        return 0;
520}
521
522static int ndag_prepare_packet_stream(libtrace_t *libtrace,
523                recvstream_t *rt,
524                streamsock_t *ssock, libtrace_packet_t *packet,
525                uint32_t flags) {
526
527        dag_record_t *erfptr;
528        uint16_t ndag_reccount = 0;
529
530        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
531                packet->buf_control = TRACE_CTRL_PACKET;
532        } else {
533                packet->buf_control = TRACE_CTRL_EXTERNAL;
534        }
535
536        packet->trace = libtrace;
537        packet->buffer = ssock->nextread;
538        packet->header = ssock->nextread;
539        packet->type = TRACE_RT_DATA_ERF;
540
541        erfptr = (dag_record_t *)packet->header;
542
543        if (erfptr->flags.rxerror == 1) {
544                packet->payload = NULL;
545                erfptr->rlen = htons(erf_get_framing_length(packet));
546        } else {
547                packet->payload = (char *)packet->buffer +
548                                erf_get_framing_length(packet);
549        }
550
551        /* Update upstream drops using lctr */
552
553        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
554                /* TODO */
555        } else {
556                if (rt->received_packets > 0) {
557                        rt->dropped_upstream += ntohs(erfptr->lctr);
558                }
559        }
560
561        rt->received_packets ++;
562        ssock->recordcount += 1;
563
564        ndag_reccount = ntohs(ssock->encaphdr->recordcount);
565        if ((ndag_reccount & 0x8000) != 0) {
566                /* Record was truncated -- update rlen appropriately */
567                erfptr->rlen = htons(ssock->savedsize -
568                                (ssock->nextread - ssock->saved));
569        }
570        ssock->nextread += ntohs(erfptr->rlen);
571
572        packet->order = erf_get_erf_timestamp(packet);
573        packet->error = packet->payload ? ntohs(erfptr->rlen) :
574                        erf_get_framing_length(packet);
575
576        return ntohs(erfptr->rlen);
577}
578
579static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
580                libtrace_packet_t *packet UNUSED,
581                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
582                uint32_t flags UNUSED) {
583
584        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
585        return 0;
586
587}
588
589static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
590
591        streamsock_t *ssock = NULL;
592
593        if (rt->sourcecount == 0) {
594                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
595        } else if ((rt->sourcecount % 10) == 0) {
596                rt->sources = (streamsock_t *)realloc(rt->sources,
597                        sizeof(streamsock_t) * (rt->sourcecount + 10));
598        }
599
600        ssock = &(rt->sources[rt->sourcecount]);
601
602        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
603                        NULL, src.port, &(ssock->srcaddr));
604
605        if (ssock->sock < 0) {
606                return -1;
607        }
608
609        ssock->port = src.port;
610        ssock->groupaddr = src.groupaddr;
611        ssock->expectedseq = 0;
612        ssock->monitorid = src.monitor;
613        ssock->saved = (char *)malloc(ENCAP_BUFSIZE);
614        ssock->nextread = ssock->saved;
615        ssock->savedsize = 0;
616        ssock->encaphdr = NULL;
617        ssock->recordcount = 0;
618        rt->sourcecount += 1;
619
620        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
621                        ssock->groupaddr, ssock->port, rt->threadindex);
622
623        return ssock->port;
624}
625
626static int receiver_read_messages(recvstream_t *rt) {
627
628        ndag_internal_message_t msg;
629
630        while (libtrace_message_queue_try_get(&(rt->mqueue),
631                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
632                switch(msg.type) {
633                        case NDAG_CLIENT_NEWGROUP:
634                                if (add_new_streamsock(rt, msg.contents) < 0) {
635                                        return -1;
636                                }
637                                break;
638                        case NDAG_CLIENT_RESTARTED:
639                                /* TODO */
640
641                                break;
642                        case NDAG_CLIENT_HALT:
643                                return 0;
644                }
645        }
646        return 1;
647
648}
649
650static inline int read_required(streamsock_t ssock) {
651        if (ssock.sock == -1)
652                return 0;
653        if (ssock.savedsize == 0)
654                return 1;
655        if (ssock.nextread - ssock.saved >= ssock.savedsize)
656                return 1;
657        return 0;
658}
659
660
661static int receive_from_sockets(recvstream_t *rt) {
662
663        int i, ret, readybufs, availsocks, successrecv;
664
665        /* TODO maybe need a way to tidy up "dead" sockets and signal back
666         * to the control thread that we've killed the socket for a particular
667         * port.
668         */
669
670        readybufs = 0;
671        availsocks = 0;
672        successrecv = 0;
673
674        for (i = 0; i < rt->sourcecount; i ++) {
675                if (read_required(rt->sources[i])) {
676                        availsocks += 1;
677                } else if (rt->sources[i].sock != -1) {
678                        readybufs += 1;
679                        availsocks += 1;
680                }
681        }
682
683        /* If all of our sockets already have data sitting in their
684         * buffers then we can save ourselves some 'select'ing.
685         */
686        if (availsocks == readybufs) {
687                return readybufs;
688        }
689
690        /* Otherwise, at least one active socket has an empty buffer so
691         * we better try to read some data from those sockets (just in
692         * case the correct 'next' packet is waiting on one of those
693         * sockets.
694         */
695        for (i = 0; i < rt->sourcecount; i ++) {
696                if (!read_required(rt->sources[i])) {
697                        if (rt->sources[i].sock != -1) {
698                                readybufs ++;
699                        }
700                        continue;
701                }
702
703                rt->sources[i].savedsize = 0;
704                rt->sources[i].nextread = rt->sources[i].saved;
705
706                ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved,
707                                ENCAP_BUFSIZE, MSG_DONTWAIT,
708                                rt->sources[i].srcaddr->ai_addr,
709                                &(rt->sources[i].srcaddr->ai_addrlen));
710                if (ret < 0) {
711                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
712                                continue;
713                        }
714
715                        fprintf(stderr,
716                                        "Error receiving encapsulated records from %s:%u -- %s \n",
717                                        rt->sources[i].groupaddr,
718                                        rt->sources[i].port,
719                                        strerror(errno));
720                        close(rt->sources[i].sock);
721                        rt->sources[i].sock = -1;
722                        continue;
723                }
724
725                if (ret == 0) {
726                        fprintf(stderr, "Received zero bytes on the channel for %s:%u.\n",
727                                        rt->sources[i].groupaddr,
728                                        rt->sources[i].port);
729                        close(rt->sources[i].sock);
730                        rt->sources[i].sock = -1;
731                        continue;
732                }
733
734                rt->sources[i].savedsize = ret;
735                successrecv ++;
736
737                /* Check that we have a valid nDAG encap record */
738                if (check_ndag_header(rt->sources[i].saved, ret) !=
739                                        NDAG_PKT_ENCAPERF) {
740                        fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
741                                        rt->sources[i].groupaddr,
742                                        rt->sources[i].port);
743                        close(rt->sources[i].sock);
744                        rt->sources[i].sock = -1;
745                        continue;
746                }
747
748                /* Save the useful info from the encap header */
749                rt->sources[i].encaphdr = (ndag_encap_t *)
750                                (rt->sources[i].saved + sizeof(ndag_common_t));
751
752                if (rt->sources[i].expectedseq != 0) {
753                        rt->missing_records += seq_cmp(
754                                        ntohl(rt->sources[i].encaphdr->seqno),
755                                        rt->sources[i].expectedseq);
756                }
757                rt->sources[i].expectedseq =
758                                ntohl(rt->sources[i].encaphdr->seqno) + 1;
759
760                /* If all good, skip past the nDAG headers */
761                rt->sources[i].nextread = rt->sources[i].saved +
762                                sizeof(ndag_common_t) + sizeof(ndag_encap_t);
763
764                readybufs += 1;
765        }
766
767        return readybufs;
768
769}
770
771
772static int receive_encap_records(libtrace_t *libtrace, recvstream_t *rt,
773                libtrace_packet_t *packet, int block) {
774
775        int iserr = 0;
776
777        if (packet->buf_control == TRACE_CTRL_PACKET) {
778                free(packet->buffer);
779                packet->buffer = NULL;
780        }
781
782        do {
783                /* Make sure we shouldn't be halting */
784                if ((iserr = is_halted(libtrace)) != -1) {
785                        return iserr;
786                }
787
788                /* First, check for any messages from the control thread */
789                iserr = receiver_read_messages(rt);
790
791                if (iserr <= 0) {
792                        return iserr;
793                }
794
795                /* If non-blocking and there are no sources, just break */
796                if (!block && rt->sourcecount == 0) {
797                        iserr = 0;
798                        break;
799                }
800
801                /* If blocking and no sources, sleep for a bit and then try
802                 * checking for messages again.
803                 */
804                if (block && rt->sourcecount == 0) {
805                        usleep(10000);
806                        continue;
807                }
808
809                if ((iserr = receive_from_sockets(rt)) < 0) {
810                        return iserr;
811                } else if (iserr > 0) {
812                        /* At least one of our input sockets has available
813                         * data, let's go ahead and use what we have. */
814                        break;
815                }
816
817                /* None of our sources have anything available, we can take
818                 * a short break rather than immediately trying again.
819                 */
820                if (block && iserr == 0) {
821                        usleep(100);
822                }
823
824        } while (block);
825
826        return iserr;
827}
828
829static streamsock_t *select_next_packet(recvstream_t *rt) {
830        int i;
831        streamsock_t *ssock = NULL;
832        uint64_t earliest = 0;
833        uint64_t currentts = 0;
834        dag_record_t *daghdr;
835
836        for (i = 0; i < rt->sourcecount; i ++) {
837                if (read_required(rt->sources[i])) {
838                        continue;
839                }
840
841                daghdr = (dag_record_t *)(rt->sources[i].nextread);
842                currentts = bswap_le_to_host64(daghdr->ts);
843
844                if (earliest == 0 || earliest > currentts) {
845                        earliest = currentts;
846                        ssock = &(rt->sources[i]);
847                }
848                /*
849                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
850                                i, currentts,
851                                rt->sources[i].recordcount,
852                                rt->missing_records);
853                */
854        }
855        return ssock;
856}
857
858static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
859
860        int rem;
861        streamsock_t *nextavail = NULL;
862        rem = receive_encap_records(libtrace, &(FORMAT_DATA->receivers[0]),
863                        packet, 1);
864
865        if (rem <= 0) {
866                return rem;
867        }
868
869        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
870        if (nextavail == NULL) {
871                return 0;
872        }
873
874        /* nextread should point at an ERF header, so prepare 'packet' to be
875         * a libtrace ERF packet. */
876
877        return ndag_prepare_packet_stream(libtrace,
878                        &(FORMAT_DATA->receivers[0]), nextavail,
879                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
880}
881
882static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
883                libtrace_packet_t **packets, size_t nb_packets) {
884
885        recvstream_t *rt;
886        int rem;
887        size_t read_packets = 0;
888        streamsock_t *nextavail = NULL;
889
890        rt = (recvstream_t *)t->format_data;
891
892        do {
893                rem = receive_encap_records(libtrace, rt,
894                                packets[read_packets],
895                                read_packets == 0 ? 1 : 0);
896                if (rem < 0) {
897                        return rem;
898                }
899
900                if (rem == 0) {
901                        break;
902                }
903
904                nextavail = select_next_packet(rt);
905                if (nextavail == NULL) {
906                        break;
907                }
908
909                ndag_prepare_packet_stream(libtrace, rt, nextavail,
910                                packets[read_packets],
911                                TRACE_PREP_DO_NOT_OWN_BUFFER);
912
913                read_packets  ++;
914                if (read_packets >= nb_packets) {
915                        break;
916                }
917        } while (1);
918
919        return read_packets;
920
921}
922
923static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
924                libtrace_packet_t *packet) {
925
926
927        libtrace_eventobj_t event = {0,0,0.0,0};
928        int rem;
929        streamsock_t *nextavail = NULL;
930
931        do {
932                rem = receive_encap_records(libtrace,
933                                &(FORMAT_DATA->receivers[0]), packet, 0);
934
935                if (rem < 0) {
936                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
937                                "Received invalid nDAG records.");
938                        event.type = TRACE_EVENT_TERMINATE;
939                        break;
940                }
941
942                if (rem == 0) {
943                        /* Either we've been halted or we've got no packets
944                         * right now. */
945                        if (is_halted(libtrace) == 0) {
946                                event.type = TRACE_EVENT_TERMINATE;
947                                break;
948                        }
949                        event.type = TRACE_EVENT_SLEEP;
950                        event.seconds = 0.0001;
951                        break;
952                }
953
954                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
955                if (nextavail == NULL) {
956                        event.type = TRACE_EVENT_SLEEP;
957                        event.seconds = 0.0001;
958                        break;
959                }
960
961                event.type = TRACE_EVENT_PACKET;
962                ndag_prepare_packet_stream(libtrace,
963                                &(FORMAT_DATA->receivers[0]), nextavail,
964                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
965                event.size = trace_get_capture_length(packet) +
966                                trace_get_framing_length(packet);
967
968                if (libtrace->filter) {
969                        int filtret = trace_apply_filter(libtrace->filter,
970                                        packet);
971                        if (filtret == -1) {
972                                trace_set_err(libtrace,
973                                                TRACE_ERR_BAD_FILTER,
974                                                "Bad BPF Filter");
975                                event.type = TRACE_EVENT_TERMINATE;
976                                break;
977                        }
978
979                        if (filtret == 0) {
980                                /* Didn't match filter, try next one */
981                                libtrace->filtered_packets ++;
982                                trace_clear_cache(packet);
983                                continue;
984                        }
985                }
986
987                if (libtrace->snaplen > 0) {
988                        trace_set_capture_length(packet, libtrace->snaplen);
989                }
990                libtrace->accepted_packets ++;
991                break;
992        } while (1);
993
994        return event;
995}
996
997static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
998
999        int i;
1000
1001        stat->dropped_valid = 1;
1002        stat->dropped = 0;
1003        stat->received_valid = 1;
1004        stat->received = 0;
1005        stat->missing_valid = 1;
1006        stat->missing = 0;
1007
1008        /* TODO Is this thread safe? */
1009        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1010                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1011                stat->received += FORMAT_DATA->receivers[i].received_packets;
1012                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1013        }
1014
1015}
1016
1017static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1018                libtrace_stat_t *stat) {
1019
1020        recvstream_t *recvr = (recvstream_t *)t->format_data;
1021
1022        if (libtrace == NULL)
1023                return;
1024        /* TODO Is this thread safe */
1025        stat->dropped_valid = 1;
1026        stat->dropped = recvr->dropped_upstream;
1027
1028        stat->received_valid = 1;
1029        stat->received = recvr->received_packets;
1030
1031        stat->missing_valid = 1;
1032        stat->missing = recvr->missing_records;
1033
1034}
1035
1036static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1037                bool reader) {
1038        recvstream_t *recvr;
1039
1040        if (!reader || t->type != THREAD_PERPKT) {
1041                return 0;
1042        }
1043
1044        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1045        t->format_data = recvr;
1046
1047        return 0;
1048}
1049
1050static struct libtrace_format_t ndag = {
1051
1052        "ndag",
1053        "",
1054        TRACE_FORMAT_NDAG,
1055        NULL,                   /* probe filename */
1056        NULL,                   /* probe magic */
1057        ndag_init_input,        /* init_input */
1058        NULL,                   /* config_input */
1059        ndag_start_input,       /* start_input */
1060        ndag_pause_input,       /* pause_input */
1061        NULL,                   /* init_output */
1062        NULL,                   /* config_output */
1063        NULL,                   /* start_output */
1064        ndag_fin_input,         /* fin_input */
1065        NULL,                   /* fin_output */
1066        ndag_read_packet,       /* read_packet */
1067        ndag_prepare_packet,    /* prepare_packet */
1068        NULL,                   /* fin_packet */
1069        NULL,                   /* write_packet */
1070        erf_get_link_type,      /* get_link_type */
1071        erf_get_direction,      /* get_direction */
1072        erf_set_direction,      /* set_direction */
1073        erf_get_erf_timestamp,  /* get_erf_timestamp */
1074        NULL,                   /* get_timeval */
1075        NULL,                   /* get_seconds */
1076        NULL,                   /* get_timespec */
1077        NULL,                   /* seek_erf */
1078        NULL,                   /* seek_timeval */
1079        NULL,                   /* seek_seconds */
1080        erf_get_capture_length, /* get_capture_length */
1081        erf_get_wire_length,    /* get_wire_length */
1082        erf_get_framing_length, /* get_framing_length */
1083        erf_set_capture_length, /* set_capture_length */
1084        NULL,                   /* get_received_packets */
1085        NULL,                   /* get_filtered_packets */
1086        NULL,                   /* get_dropped_packets */
1087        ndag_get_statistics,    /* get_statistics */
1088        NULL,                   /* get_fd */
1089        trace_event_ndag,       /* trace_event */
1090        NULL,                   /* help */
1091        NULL,                   /* next pointer */
1092        {true, 0},              /* live packet capture */
1093        ndag_pstart_input,      /* parallel start */
1094        ndag_pread_packets,     /* parallel read */
1095        ndag_pause_input,       /* parallel pause */
1096        NULL,
1097        ndag_pregister_thread,  /* register thread */
1098        NULL,
1099        ndag_get_thread_stats   /* per-thread stats */
1100};
1101
1102void ndag_constructor(void) {
1103        register_format(&ndag);
1104}
Note: See TracBrowser for help on using the repository browser.