source: lib/format_ndag.c @ 4f0f93f

cachetimestampsdevelopdpdk-ndagetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 4f0f93f was 4f0f93f, checked in by Shane Alcock <salcock@…>, 4 years ago

Minor bug fixes for ndag:

  • Don't try to close the multicast socket if bind fails.
  • Make sure we exit the control channel thread if it we fail to join the multicast group.
  • Property mode set to 100644
File size: 44.2 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <assert.h>
38#include <errno.h>
39#include <fcntl.h>
40#include <stdio.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <net/if.h>
45#include <sys/types.h>
46#include <sys/socket.h>
47#include <netdb.h>
48
49#include "format_ndag.h"
50
51#define NDAG_IDLE_TIMEOUT (600)
52#define ENCAP_BUFSIZE (10000)
53#define CTRL_BUF_SIZE (10000)
54#define ENCAP_BUFFERS (1000)
55
56#define RECV_BATCH_SIZE (50)
57
58#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
59
60static struct libtrace_format_t ndag;
61
62volatile int ndag_paused = 0;
63
64typedef struct monitor {
65        uint16_t monitorid;
66        uint64_t laststart;
67} ndag_monitor_t;
68
69
70typedef struct streamsource {
71        uint16_t monitor;
72        char *groupaddr;
73        char *localiface;
74        uint16_t port;
75} streamsource_t;
76
77typedef struct streamsock {
78        char *groupaddr;
79        int sock;
80        struct addrinfo *srcaddr;
81        uint16_t port;
82        uint32_t expectedseq;
83        ndag_monitor_t *monitorptr;
84        char **saved;
85        char *nextread;
86        int nextreadind;
87        int nextwriteind;
88        int savedsize[ENCAP_BUFFERS];
89        uint32_t startidle;
90        uint64_t recordcount;
91
92        int bufavail;
93
94#if HAVE_RECVMMSG
95        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
96#else
97        struct msghdr singlemsg;
98#endif
99
100} streamsock_t;
101
102typedef struct recvstream {
103        streamsock_t *sources;
104        uint16_t sourcecount;
105        libtrace_message_queue_t mqueue;
106        int threadindex;
107        ndag_monitor_t *knownmonitors;
108        uint16_t monitorcount;
109
110        uint64_t dropped_upstream;
111        uint64_t missing_records;
112        uint64_t received_packets;
113} recvstream_t;
114
115typedef struct ndag_format_data {
116        char *multicastgroup;
117        char *portstr;
118        char *localiface;
119        uint16_t nextthreadid;
120        recvstream_t *receivers;
121
122        pthread_t controlthread;
123        libtrace_message_queue_t controlqueue;
124} ndag_format_data_t;
125
126enum {
127        NDAG_CLIENT_HALT = 0x01,
128        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
129        NDAG_CLIENT_NEWGROUP = 0x03
130};
131
132typedef struct ndagreadermessage {
133        uint8_t type;
134        streamsource_t contents;
135} ndag_internal_message_t;
136
137
138static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
139
140        /* Calculate seq_a - seq_b, taking wraparound into account */
141        if (seq_a == seq_b) return 0;
142
143        if (seq_a > seq_b) {
144                return (int) (seq_a - seq_b);
145        }
146
147        /* -1 for the wrap and another -1 because we don't use zero */
148        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
149}
150
151static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
152        ndag_common_t *header = (ndag_common_t *)msgbuf;
153
154        if (msgsize < sizeof(ndag_common_t)) {
155                fprintf(stderr,
156                        "nDAG message does not have a complete nDAG header.\n");
157                return 0;
158        }
159
160        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
161                fprintf(stderr,
162                        "nDAG message does not have a valid magic number.\n");
163                return 0;
164        }
165
166        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
167                fprintf(stderr,
168                        "nDAG message has an invalid header version: %u\n",
169                                header->version);
170                return 0;
171        }
172
173        return header->type;
174}
175
176static int join_multicast_group(char *groupaddr, char *localiface,
177        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
178
179        struct addrinfo hints;
180        struct addrinfo *gotten;
181        struct addrinfo *group;
182        unsigned int interface;
183        char pstr[16];
184        struct group_req greq;
185        int bufsize;
186
187        int sock;
188
189        if (portstr == NULL) {
190                snprintf(pstr, 15, "%u", portnum);
191                portstr = pstr;
192        }
193
194        interface = if_nametoindex(localiface);
195        if (interface == 0) {
196                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
197                                localiface, strerror(errno));
198                return -1;
199        }
200
201        hints.ai_family = PF_UNSPEC;
202        hints.ai_socktype = SOCK_DGRAM;
203        hints.ai_flags = AI_PASSIVE;
204        hints.ai_protocol = 0;
205
206        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
207                fprintf(stderr,
208                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
209                                portstr, strerror(errno));
210                return -1;
211        }
212
213        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
214                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
215                                groupaddr, strerror(errno));
216                return -1;
217        }
218
219        *srcinfo = gotten;
220        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
221        if (sock < 0) {
222                fprintf(stderr,
223                        "Failed to create multicast socket for %s:%s -- %s\n",
224                                groupaddr, portstr, strerror(errno));
225                goto sockcreateover;
226        }
227
228        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
229        {
230                fprintf(stderr,
231                        "Failed to bind to multicast socket %s:%s -- %s\n",
232                                groupaddr, portstr, strerror(errno));
233                sock = -1;
234                goto sockcreateover;
235        }
236
237        greq.gr_interface = interface;
238        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
239
240        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
241                        sizeof(greq)) < 0) {
242                fprintf(stderr,
243                        "Failed to join multicast group %s:%s -- %s\n",
244                                groupaddr, portstr, strerror(errno));
245                close(sock);
246                sock = -1;
247                goto sockcreateover;
248        }
249
250        bufsize = 16 * 1024 * 1024;
251        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
252                                (socklen_t)sizeof(int)) < 0) {
253
254                fprintf(stderr,
255                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
256                                groupaddr, portstr, strerror(errno));
257                close(sock);
258                sock = -1;
259                goto sockcreateover;
260        }
261
262sockcreateover:
263        freeaddrinfo(group);
264        return sock;
265}
266
267
268static int ndag_init_input(libtrace_t *libtrace) {
269
270        char *scan = NULL;
271        char *next = NULL;
272
273        libtrace->format_data = (ndag_format_data_t *)malloc(
274                        sizeof(ndag_format_data_t));
275
276        FORMAT_DATA->multicastgroup = NULL;
277        FORMAT_DATA->portstr = NULL;
278        FORMAT_DATA->localiface = NULL;
279        FORMAT_DATA->nextthreadid = 0;
280        FORMAT_DATA->receivers = NULL;
281
282        scan = strchr(libtrace->uridata, ',');
283        if (scan == NULL) {
284                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
285                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
286                return -1;
287        }
288        FORMAT_DATA->localiface = strndup(libtrace->uridata,
289                        (size_t)(scan - libtrace->uridata));
290        next = scan + 1;
291
292        scan = strchr(next, ',');
293        if (scan == NULL) {
294                FORMAT_DATA->portstr = strdup("9001");
295                FORMAT_DATA->multicastgroup = strdup(next);
296        } else {
297                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
298
299                FORMAT_DATA->portstr = strdup(scan + 1);
300        }
301        return 0;
302}
303
304static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
305                uint16_t portnum, uint16_t monid) {
306
307        ndag_internal_message_t alert;
308
309        alert.type = NDAG_CLIENT_NEWGROUP;
310        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
311        alert.contents.localiface = FORMAT_DATA->localiface;
312        alert.contents.port = portnum;
313        alert.contents.monitor = monid;
314
315        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
316                        (void *)&alert);
317
318}
319       
320static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
321                int msgsize, uint16_t *ptmap) {
322
323        int i;
324        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
325        uint8_t msgtype;
326
327        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
328        if (msgtype == 0) {
329                return -1;
330        }
331
332        msgsize -= sizeof(ndag_common_t);
333        if (msgtype == NDAG_PKT_BEACON) {
334                /* If message is a beacon, make sure every port included in the
335                 * beacon is assigned to a receive thread.
336                 */
337                uint16_t *ptr, numstreams;
338
339                if ((uint32_t)msgsize < sizeof(uint16_t)) {
340                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
341                        return -1;
342                }
343
344                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
345                numstreams = ntohs(*ptr);
346                ptr ++;
347
348                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
349                {
350                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
351                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
352                        return -1;
353                }
354
355                for (i = 0; i < numstreams; i++) {
356                        uint16_t streamport = ntohs(*ptr);
357
358                        if (ptmap[streamport] == 0xffff) {
359                                new_group_alert(libtrace,
360                                        FORMAT_DATA->nextthreadid, streamport,
361                                        ntohs(ndaghdr->monitorid));
362
363                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
364
365                                if (libtrace->perpkt_thread_count == 0) {
366                                        FORMAT_DATA->nextthreadid = 0;
367                                } else {
368                                        FORMAT_DATA->nextthreadid =
369                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
370                                }
371                        }
372
373                        ptr ++;
374                }
375        } else {
376                fprintf(stderr,
377                        "Unexpected message type on control channel: %u\n",
378                         msgtype);
379                return -1;
380        }
381
382        return 0;
383
384}
385
386static void *ndag_controller_run(void *tdata) {
387
388        libtrace_t *libtrace = (libtrace_t *)tdata;
389        uint16_t ptmap[65536];
390        int sock = -1;
391        struct addrinfo *receiveaddr = NULL;
392        fd_set listening;
393        struct timeval timeout;
394
395        /* ptmap is a dirty hack to allow us to quickly check if we've already
396         * assigned a stream to a thread.
397         */
398        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
399
400        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
401                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
402                        &receiveaddr);
403        if (sock == -1) {
404                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
405                        "Unable to join multicast group for nDAG control channel");
406                trace_interrupt();
407                pthread_exit(NULL);
408        }
409
410        ndag_paused = 0;
411        while ((is_halted(libtrace) == -1) && !ndag_paused) {
412                int ret;
413                char buf[CTRL_BUF_SIZE];
414
415                FD_ZERO(&listening);
416                FD_SET(sock, &listening);
417
418                timeout.tv_sec = 0;
419                timeout.tv_usec = 500000;
420
421                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
422                if (ret < 0) {
423                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
424                        break;
425                }
426
427                if (!FD_ISSET(sock, &listening)) {
428                        continue;
429                }
430
431                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
432                                receiveaddr->ai_addr,
433                                &(receiveaddr->ai_addrlen));
434                if (ret < 0) {
435                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
436                        break;
437                }
438
439                if (ret == 0) {
440                        break;
441                }
442
443                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
444                        fprintf(stderr, "Error while parsing nDAG control message.\n");
445                        continue;
446                }
447        }
448
449        if (sock >= 0) {
450                close(sock);
451        }
452
453        /* Control channel has fallen over, should probably encourage libtrace
454         * to halt the receiver threads as well.
455         */
456        if (!is_halted(libtrace)) {
457                trace_interrupt();
458        }
459
460        pthread_exit(NULL);
461}
462
463static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
464{
465        int ret;
466        uint32_t i;
467        /* Configure the set of receiver threads */
468
469        if (FORMAT_DATA->receivers == NULL) {
470                /* What if the number of threads changes between a pause and
471                 * a restart? Can this happen? */
472                FORMAT_DATA->receivers = (recvstream_t *)
473                                malloc(sizeof(recvstream_t) * maxthreads);
474        }
475
476        for (i = 0; i < maxthreads; i++) {
477                FORMAT_DATA->receivers[i].sources = NULL;
478                FORMAT_DATA->receivers[i].sourcecount = 0;
479                FORMAT_DATA->receivers[i].knownmonitors = NULL;
480                FORMAT_DATA->receivers[i].monitorcount = 0;
481                FORMAT_DATA->receivers[i].threadindex = i;
482                FORMAT_DATA->receivers[i].dropped_upstream = 0;
483                FORMAT_DATA->receivers[i].received_packets = 0;
484                FORMAT_DATA->receivers[i].missing_records = 0;
485
486                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
487                                sizeof(ndag_internal_message_t));
488        }
489
490        /* Start the controller thread */
491        /* TODO consider affinity of this thread? */
492
493        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
494                        ndag_controller_run, libtrace);
495        if (ret != 0) {
496                return -1;
497        }
498        return maxthreads;
499}
500
501static int ndag_start_input(libtrace_t *libtrace) {
502        return ndag_start_threads(libtrace, 1);
503}
504
505static int ndag_pstart_input(libtrace_t *libtrace) {
506        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
507                        libtrace->perpkt_thread_count)
508                return 0;
509        return -1;
510}
511
512static void halt_ndag_receiver(recvstream_t *receiver) {
513        int j, i;
514        libtrace_message_queue_destroy(&(receiver->mqueue));
515
516        if (receiver->sources == NULL)
517                return;
518        for (i = 0; i < receiver->sourcecount; i++) {
519                streamsock_t src = receiver->sources[i];
520                if (src.saved) {
521                        for (j = 0; j < ENCAP_BUFFERS; j++) {
522                                if (src.saved[j]) {
523                                        free(src.saved[j]);
524                                }
525                        }
526                        free(src.saved);
527                }
528
529#if HAVE_RECVMMSG
530                for (j = 0; j < RECV_BATCH_SIZE; j++) {
531                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
532                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
533                        }
534                }
535#else
536                free(src.singlemsg.msg_iov);
537#endif
538
539                close(src.sock);
540        }
541        if (receiver->knownmonitors) {
542                free(receiver->knownmonitors);
543        }
544
545        if (receiver->sources) {
546                free(receiver->sources);
547        }
548}
549
550static int ndag_pause_input(libtrace_t *libtrace) {
551        int i;
552
553        /* Close the existing receiver sockets */
554        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
555               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
556        }
557        return 0;
558}
559
560static int ndag_fin_input(libtrace_t *libtrace) {
561
562        if (FORMAT_DATA->receivers) {
563                free(FORMAT_DATA->receivers);
564        }
565        if (FORMAT_DATA->multicastgroup) {
566                free(FORMAT_DATA->multicastgroup);
567        }
568        if (FORMAT_DATA->portstr) {
569                free(FORMAT_DATA->portstr);
570        }
571        if (FORMAT_DATA->localiface) {
572                free(FORMAT_DATA->localiface);
573        }
574
575        free(libtrace->format_data);
576        return 0;
577}
578
579static int ndag_prepare_packet_stream(libtrace_t *libtrace,
580                recvstream_t *rt,
581                streamsock_t *ssock, libtrace_packet_t *packet,
582                uint32_t flags) {
583
584        dag_record_t *erfptr;
585        ndag_encap_t *encaphdr;
586        uint16_t ndag_reccount = 0;
587        int nr;
588
589        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
590                packet->buf_control = TRACE_CTRL_PACKET;
591        } else {
592                packet->buf_control = TRACE_CTRL_EXTERNAL;
593        }
594
595        packet->trace = libtrace;
596        packet->buffer = ssock->nextread;
597        packet->header = ssock->nextread;
598        packet->type = TRACE_RT_DATA_ERF;
599
600        erfptr = (dag_record_t *)packet->header;
601
602        if (erfptr->flags.rxerror == 1) {
603                packet->payload = NULL;
604                erfptr->rlen = htons(erf_get_framing_length(packet));
605        } else {
606                packet->payload = (char *)packet->buffer +
607                                erf_get_framing_length(packet);
608        }
609
610        /* Update upstream drops using lctr */
611
612        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
613                /* TODO */
614        } else {
615                if (rt->received_packets > 0) {
616                        rt->dropped_upstream += ntohs(erfptr->lctr);
617                }
618        }
619
620        rt->received_packets ++;
621        ssock->recordcount += 1;
622
623        nr = ssock->nextreadind;
624        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
625                        sizeof(ndag_common_t));
626
627        ndag_reccount = ntohs(encaphdr->recordcount);
628        if ((ndag_reccount & 0x8000) != 0) {
629                /* Record was truncated -- update rlen appropriately */
630                erfptr->rlen = htons(ssock->savedsize[nr] -
631                                (ssock->nextread - ssock->saved[nr]));
632        }
633        ssock->nextread += ntohs(erfptr->rlen);
634
635        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
636                /* Read everything from this buffer, mark as empty and
637                 * move on. */
638                ssock->savedsize[nr] = 0;
639                ssock->bufavail ++;
640
641                assert(ssock->bufavail > 0 && ssock->bufavail <= ENCAP_BUFFERS);
642                nr ++;
643                if (nr == ENCAP_BUFFERS) {
644                        nr = 0;
645                }
646                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
647                                sizeof(ndag_encap_t);
648                ssock->nextreadind = nr;
649        }
650
651        packet->order = erf_get_erf_timestamp(packet);
652        packet->error = packet->payload ? ntohs(erfptr->rlen) :
653                        erf_get_framing_length(packet);
654
655        return ntohs(erfptr->rlen);
656}
657
658static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
659                libtrace_packet_t *packet UNUSED,
660                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
661                uint32_t flags UNUSED) {
662
663        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
664        return 0;
665
666}
667
668static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
669
670        ndag_monitor_t *mon;
671
672        if (rt->monitorcount == 0) {
673                rt->knownmonitors = (ndag_monitor_t *)
674                                malloc(sizeof(ndag_monitor_t) * 5);
675        } else {
676                rt->knownmonitors = (ndag_monitor_t *)
677                            realloc(rt->knownmonitors,
678                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
679        }
680
681        mon = &(rt->knownmonitors[rt->monitorcount]);
682        mon->monitorid = monid;
683        mon->laststart = 0;
684
685        rt->monitorcount ++;
686        return mon;
687}
688
689static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
690
691        streamsock_t *ssock = NULL;
692        ndag_monitor_t *mon = NULL;
693        int i;
694
695        /* TODO consider replacing this with a list or vector so we can
696         * easily remove sources that are no longer in use, rather than
697         * just setting the sock to -1 and having to check them every
698         * time we want to read a packet.
699         */
700        if (rt->sourcecount == 0) {
701                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
702        } else if ((rt->sourcecount % 10) == 0) {
703                rt->sources = (streamsock_t *)realloc(rt->sources,
704                        sizeof(streamsock_t) * (rt->sourcecount + 10));
705        }
706
707        ssock = &(rt->sources[rt->sourcecount]);
708
709        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
710                        NULL, src.port, &(ssock->srcaddr));
711
712        if (ssock->sock < 0) {
713                return -1;
714        }
715
716        for (i = 0; i < rt->monitorcount; i++) {
717                if (rt->knownmonitors[i].monitorid == src.monitor) {
718                        mon = &(rt->knownmonitors[i]);
719                        break;
720                }
721        }
722
723        if (mon == NULL) {
724                mon = add_new_knownmonitor(rt, src.monitor);
725        }
726
727        ssock->port = src.port;
728        ssock->groupaddr = src.groupaddr;
729        ssock->expectedseq = 0;
730        ssock->monitorptr = mon;
731        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
732        ssock->bufavail = ENCAP_BUFFERS;
733        ssock->startidle = 0;
734
735        for (i = 0; i < ENCAP_BUFFERS; i++) {
736                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
737                ssock->savedsize[i] = 0;
738        }
739
740#if HAVE_RECVMMSG
741        for (i = 0; i < RECV_BATCH_SIZE; i++) {
742                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
743                                malloc(sizeof(struct iovec));
744                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
745                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
746                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
747                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
748                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
749                ssock->mmsgbufs[i].msg_len = 0;
750        }
751#else
752        ssock->singlemsg.msg_iov = (struct iovec *) malloc(sizeof(struct iovec));
753#endif
754
755        ssock->nextread = NULL;;
756        ssock->nextreadind = 0;
757        ssock->nextwriteind = 0;
758        ssock->recordcount = 0;
759        rt->sourcecount += 1;
760
761        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
762                        ssock->groupaddr, ssock->port, rt->threadindex);
763
764        return ssock->port;
765}
766
767static int receiver_read_messages(recvstream_t *rt) {
768
769        ndag_internal_message_t msg;
770
771        while (libtrace_message_queue_try_get(&(rt->mqueue),
772                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
773                switch(msg.type) {
774                        case NDAG_CLIENT_NEWGROUP:
775                                if (add_new_streamsock(rt, msg.contents) < 0) {
776                                        return -1;
777                                }
778                                break;
779                        case NDAG_CLIENT_HALT:
780                                return 0;
781                }
782        }
783        return 1;
784
785}
786
787static inline int readable_data(streamsock_t *ssock) {
788
789        if (ssock->sock == -1) {
790                return 0;
791        }
792        if (ssock->savedsize[ssock->nextreadind] == 0) {
793                return 0;
794        }
795        /*
796        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
797                        ssock->savedsize[ssock->nextreadind]) {
798                return 0;
799        }
800        */
801        return 1;
802
803
804}
805
806static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
807
808        int i;
809        for (i = 0; i < rt->sourcecount; i++) {
810                if (rt->sources[i].monitorptr == mon) {
811                        rt->sources[i].expectedseq = 0;
812                }
813        }
814
815}
816
817static int init_receivers(streamsock_t *ssock, int required) {
818
819        int wind = ssock->nextwriteind;
820        int i = 1;
821
822#if HAVE_RECVMMSG
823        for (i = 0; i < required; i++) {
824                if (i >= RECV_BATCH_SIZE) {
825                        break;
826                }
827
828                if (wind >= ENCAP_BUFFERS) {
829                        wind = 0;
830                }
831
832                ssock->mmsgbufs[i].msg_len = 0;
833                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
834                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
835                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
836
837                wind ++;
838        }
839#else
840        assert(required > 0);
841        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
842        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
843        ssock->singlemsg.msg_iovlen = 1;
844#endif
845        return i;
846}
847
848static int check_ndag_received(streamsock_t *ssock, int index,
849                unsigned int msglen, recvstream_t *rt) {
850
851        ndag_encap_t *encaphdr;
852        ndag_monitor_t *mon;
853        uint8_t rectype;
854
855        /* Check that we have a valid nDAG encap record */
856        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
857
858        if (rectype == NDAG_PKT_KEEPALIVE) {
859                /* Keep-alive, reset startidle and carry on. Don't
860                 * change nextwrite -- we want to overwrite the
861                 * keep-alive with usable content. */
862                return 0;
863        } else if (rectype != NDAG_PKT_ENCAPERF) {
864                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
865                                ssock->groupaddr, ssock->port);
866                close(ssock->sock);
867                ssock->sock = -1;
868                return -1;
869        }
870
871        ssock->savedsize[index] = msglen;
872        ssock->nextwriteind ++;
873        ssock->bufavail --;
874
875        assert(ssock->bufavail >= 0);
876
877        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
878                ssock->nextwriteind = 0;
879        }
880
881        /* Get the useful info from the encap header */
882        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
883
884        mon = ssock->monitorptr;
885
886        if (mon->laststart == 0) {
887                mon->laststart = bswap_be_to_host64(encaphdr->started);
888        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
889                mon->laststart = bswap_be_to_host64(encaphdr->started);
890                reset_expected_seqs(rt, mon);
891
892                /* TODO what is a good way to indicate this to clients?
893                 * set the loss counter in the ERF header? a bit rude?
894                 * use another bit in the ERF header?
895                 * add a queryable flag to libtrace_packet_t?
896                 */
897
898        }
899
900        if (ssock->expectedseq != 0) {
901                rt->missing_records += seq_cmp(
902                                ntohl(encaphdr->seqno), ssock->expectedseq);
903        }
904        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
905        if (ssock->expectedseq == 0) {
906                ssock->expectedseq ++;
907        }
908
909        if (ssock->nextread == NULL) {
910                /* If this is our first read, set up 'nextread'
911                 * by skipping past the nDAG headers */
912                ssock->nextread = ssock->saved[0] +
913                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
914        }
915        return 1;
916
917}
918
919static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
920                int *gottime, recvstream_t *rt) {
921
922        int ret, ndagstat, avail;
923        int toret = 0;
924
925#if HAVE_RECVMMSG
926        int i;
927#endif
928
929        if (ssock->sock == -1) {
930                return 0;
931        }
932
933#if HAVE_RECVMMSG
934        /* Plenty of full buffers, just use the packets in those */
935        if (ssock->bufavail < RECV_BATCH_SIZE / 2) {
936                return 1;
937        }
938#else
939        if (ssock->bufavail == 0) {
940                return 1;
941        }
942#endif
943
944        avail = init_receivers(ssock, ssock->bufavail);
945
946#if HAVE_RECVMMSG
947        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
948                        MSG_DONTWAIT, NULL);
949#else
950        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
951#endif
952        if (ret < 0) {
953                /* Nothing to receive right now, but we should still
954                 * count as 'ready' if at least one buffer is full */
955                if (errno == EAGAIN || errno == EWOULDBLOCK) {
956                        if (readable_data(ssock)) {
957                                toret = 1;
958                        }
959                        if (!(*gottime)) {
960                                gettimeofday(tv, NULL);
961                                *gottime = 1;
962                        }
963                        if (ssock->startidle == 0) {
964                                ssock->startidle = tv->tv_sec;
965                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
966                                fprintf(stderr,
967                                        "Closing channel %s:%u due to inactivity.\n",
968                                        ssock->groupaddr,
969                                        ssock->port);
970
971                                close(ssock->sock);
972                                ssock->sock = -1;
973                        }
974                } else {
975
976                        fprintf(stderr,
977                                "Error receiving encapsulated records from %s:%u -- %s \n",
978                                ssock->groupaddr, ssock->port,
979                                strerror(errno));
980                        close(ssock->sock);
981                        ssock->sock = -1;
982                }
983                return toret;
984        }
985
986        ssock->startidle = 0;
987
988#if HAVE_RECVMMSG
989        for (i = 0; i < ret; i++) {
990                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
991                                ssock->mmsgbufs[i].msg_len, rt);
992                if (ndagstat == -1) {
993                        break;
994                }
995
996                if (ndagstat == 1) {
997                        toret = 1;
998                }
999        }
1000#else
1001        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1002        if (ndagstat <= 0) {
1003                toret = 0;
1004        } else {
1005                toret = 1;
1006        }
1007#endif
1008
1009        return toret;
1010}
1011
1012static int receive_from_sockets(recvstream_t *rt) {
1013
1014        int i, readybufs, gottime;
1015        struct timeval tv;
1016
1017        readybufs = 0;
1018        gottime = 0;
1019
1020        for (i = 0; i < rt->sourcecount; i ++) {
1021                readybufs += receive_from_single_socket(&(rt->sources[i]),
1022                                &tv, &gottime, rt);
1023        }
1024
1025        return readybufs;
1026
1027}
1028
1029
1030static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1031                libtrace_packet_t *packet) {
1032
1033        int iserr = 0;
1034
1035        if (packet->buf_control == TRACE_CTRL_PACKET) {
1036                free(packet->buffer);
1037                packet->buffer = NULL;
1038        }
1039
1040        do {
1041                /* Make sure we shouldn't be halting */
1042                if ((iserr = is_halted(libtrace)) != -1) {
1043                        return iserr;
1044                }
1045
1046                /* Check for any messages from the control thread */
1047                iserr = receiver_read_messages(rt);
1048
1049                if (iserr <= 0) {
1050                        return iserr;
1051                }
1052
1053                /* If blocking and no sources, sleep for a bit and then try
1054                 * checking for messages again.
1055                 */
1056                if (rt->sourcecount == 0) {
1057                        usleep(10000);
1058                        continue;
1059                }
1060
1061                if ((iserr = receive_from_sockets(rt)) < 0) {
1062                        return iserr;
1063                } else if (iserr > 0) {
1064                        /* At least one of our input sockets has available
1065                         * data, let's go ahead and use what we have. */
1066                        break;
1067                }
1068
1069                /* None of our sources have anything available, we can take
1070                 * a short break rather than immediately trying again.
1071                 */
1072                if (iserr == 0) {
1073                        usleep(100);
1074                }
1075
1076        } while (1);
1077
1078        return iserr;
1079}
1080
1081static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1082                libtrace_packet_t *packet) {
1083
1084        int iserr = 0;
1085
1086        if (packet->buf_control == TRACE_CTRL_PACKET) {
1087                free(packet->buffer);
1088                packet->buffer = NULL;
1089        }
1090
1091        /* Make sure we shouldn't be halting */
1092        if ((iserr = is_halted(libtrace)) != -1) {
1093                return iserr;
1094        }
1095
1096        /* If non-blocking and there are no sources, just break */
1097        if (rt->sourcecount == 0) {
1098                return 0;
1099        }
1100
1101        return receive_from_sockets(rt);
1102}
1103
1104static streamsock_t *select_next_packet(recvstream_t *rt) {
1105        int i;
1106        streamsock_t *ssock = NULL;
1107        uint64_t earliest = 0;
1108        uint64_t currentts = 0;
1109        dag_record_t *daghdr;
1110
1111        for (i = 0; i < rt->sourcecount; i ++) {
1112                if (!readable_data(&(rt->sources[i]))) {
1113                        continue;
1114                }
1115
1116                daghdr = (dag_record_t *)(rt->sources[i].nextread);
1117                currentts = bswap_le_to_host64(daghdr->ts);
1118
1119                if (earliest == 0 || earliest > currentts) {
1120                        earliest = currentts;
1121                        ssock = &(rt->sources[i]);
1122                }
1123                /*
1124                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
1125                                i, currentts,
1126                                rt->sources[i].recordcount,
1127                                rt->missing_records);
1128                */
1129        }
1130        return ssock;
1131}
1132
1133static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1134
1135        int rem;
1136        streamsock_t *nextavail = NULL;
1137        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1138                        packet);
1139
1140        if (rem <= 0) {
1141                return rem;
1142        }
1143
1144        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1145        if (nextavail == NULL) {
1146                return 0;
1147        }
1148
1149        /* nextread should point at an ERF header, so prepare 'packet' to be
1150         * a libtrace ERF packet. */
1151
1152        return ndag_prepare_packet_stream(libtrace,
1153                        &(FORMAT_DATA->receivers[0]), nextavail,
1154                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1155}
1156
1157static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1158                libtrace_packet_t **packets, size_t nb_packets) {
1159
1160        recvstream_t *rt;
1161        int rem;
1162        size_t read_packets = 0;
1163        streamsock_t *nextavail = NULL;
1164
1165        rt = (recvstream_t *)t->format_data;
1166
1167
1168        do {
1169                /* Only check for messages once per batch */
1170                if (read_packets == 0) {
1171                        rem = receive_encap_records_block(libtrace, rt,
1172                                packets[read_packets]);
1173                } else {
1174                        rem = receive_encap_records_nonblock(libtrace, rt,
1175                                packets[read_packets]);
1176                }
1177
1178                if (rem < 0) {
1179                        return rem;
1180                }
1181
1182                if (rem == 0) {
1183                        break;
1184                }
1185
1186                nextavail = select_next_packet(rt);
1187                if (nextavail == NULL) {
1188                        break;
1189                }
1190
1191                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1192                                packets[read_packets],
1193                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1194
1195                read_packets  ++;
1196                if (read_packets >= nb_packets) {
1197                        break;
1198                }
1199        } while (1);
1200
1201        return read_packets;
1202
1203}
1204
1205static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1206                libtrace_packet_t *packet) {
1207
1208
1209        libtrace_eventobj_t event = {0,0,0.0,0};
1210        int rem;
1211        streamsock_t *nextavail = NULL;
1212
1213        /* Only check for messages once per call */
1214        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1215        if (rem <= 0) {
1216                event.type = TRACE_EVENT_TERMINATE;
1217                return event;
1218        }
1219
1220        do {
1221                rem = receive_encap_records_nonblock(libtrace,
1222                                &(FORMAT_DATA->receivers[0]), packet);
1223
1224                if (rem < 0) {
1225                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1226                                "Received invalid nDAG records.");
1227                        event.type = TRACE_EVENT_TERMINATE;
1228                        break;
1229                }
1230
1231                if (rem == 0) {
1232                        /* Either we've been halted or we've got no packets
1233                         * right now. */
1234                        if (is_halted(libtrace) == 0) {
1235                                event.type = TRACE_EVENT_TERMINATE;
1236                                break;
1237                        }
1238                        event.type = TRACE_EVENT_SLEEP;
1239                        event.seconds = 0.0001;
1240                        break;
1241                }
1242
1243                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1244                if (nextavail == NULL) {
1245                        event.type = TRACE_EVENT_SLEEP;
1246                        event.seconds = 0.0001;
1247                        break;
1248                }
1249
1250                event.type = TRACE_EVENT_PACKET;
1251                ndag_prepare_packet_stream(libtrace,
1252                                &(FORMAT_DATA->receivers[0]), nextavail,
1253                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1254                event.size = trace_get_capture_length(packet) +
1255                                trace_get_framing_length(packet);
1256
1257                if (libtrace->filter) {
1258                        int filtret = trace_apply_filter(libtrace->filter,
1259                                        packet);
1260                        if (filtret == -1) {
1261                                trace_set_err(libtrace,
1262                                                TRACE_ERR_BAD_FILTER,
1263                                                "Bad BPF Filter");
1264                                event.type = TRACE_EVENT_TERMINATE;
1265                                break;
1266                        }
1267
1268                        if (filtret == 0) {
1269                                /* Didn't match filter, try next one */
1270                                libtrace->filtered_packets ++;
1271                                trace_clear_cache(packet);
1272                                continue;
1273                        }
1274                }
1275
1276                if (libtrace->snaplen > 0) {
1277                        trace_set_capture_length(packet, libtrace->snaplen);
1278                }
1279                libtrace->accepted_packets ++;
1280                break;
1281        } while (1);
1282
1283        return event;
1284}
1285
1286static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1287
1288        int i;
1289
1290        stat->dropped_valid = 1;
1291        stat->dropped = 0;
1292        stat->received_valid = 1;
1293        stat->received = 0;
1294        stat->missing_valid = 1;
1295        stat->missing = 0;
1296
1297        /* TODO Is this thread safe? */
1298        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1299                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1300                stat->received += FORMAT_DATA->receivers[i].received_packets;
1301                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1302        }
1303
1304}
1305
1306static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1307                libtrace_stat_t *stat) {
1308
1309        recvstream_t *recvr = (recvstream_t *)t->format_data;
1310
1311        if (libtrace == NULL)
1312                return;
1313        /* TODO Is this thread safe */
1314        stat->dropped_valid = 1;
1315        stat->dropped = recvr->dropped_upstream;
1316
1317        stat->received_valid = 1;
1318        stat->received = recvr->received_packets;
1319
1320        stat->missing_valid = 1;
1321        stat->missing = recvr->missing_records;
1322
1323}
1324
1325static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1326                bool reader) {
1327        recvstream_t *recvr;
1328
1329        if (!reader || t->type != THREAD_PERPKT) {
1330                return 0;
1331        }
1332
1333        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1334        t->format_data = recvr;
1335
1336        return 0;
1337}
1338
1339static struct libtrace_format_t ndag = {
1340
1341        "ndag",
1342        "",
1343        TRACE_FORMAT_NDAG,
1344        NULL,                   /* probe filename */
1345        NULL,                   /* probe magic */
1346        ndag_init_input,        /* init_input */
1347        NULL,                   /* config_input */
1348        ndag_start_input,       /* start_input */
1349        ndag_pause_input,       /* pause_input */
1350        NULL,                   /* init_output */
1351        NULL,                   /* config_output */
1352        NULL,                   /* start_output */
1353        ndag_fin_input,         /* fin_input */
1354        NULL,                   /* fin_output */
1355        ndag_read_packet,       /* read_packet */
1356        ndag_prepare_packet,    /* prepare_packet */
1357        NULL,                   /* fin_packet */
1358        NULL,                   /* write_packet */
1359        erf_get_link_type,      /* get_link_type */
1360        erf_get_direction,      /* get_direction */
1361        erf_set_direction,      /* set_direction */
1362        erf_get_erf_timestamp,  /* get_erf_timestamp */
1363        NULL,                   /* get_timeval */
1364        NULL,                   /* get_seconds */
1365        NULL,                   /* get_timespec */
1366        NULL,                   /* seek_erf */
1367        NULL,                   /* seek_timeval */
1368        NULL,                   /* seek_seconds */
1369        erf_get_capture_length, /* get_capture_length */
1370        erf_get_wire_length,    /* get_wire_length */
1371        erf_get_framing_length, /* get_framing_length */
1372        erf_set_capture_length, /* set_capture_length */
1373        NULL,                   /* get_received_packets */
1374        NULL,                   /* get_filtered_packets */
1375        NULL,                   /* get_dropped_packets */
1376        ndag_get_statistics,    /* get_statistics */
1377        NULL,                   /* get_fd */
1378        trace_event_ndag,       /* trace_event */
1379        NULL,                   /* help */
1380        NULL,                   /* next pointer */
1381        {true, 0},              /* live packet capture */
1382        ndag_pstart_input,      /* parallel start */
1383        ndag_pread_packets,     /* parallel read */
1384        ndag_pause_input,       /* parallel pause */
1385        NULL,
1386        ndag_pregister_thread,  /* register thread */
1387        NULL,
1388        ndag_get_thread_stats   /* per-thread stats */
1389};
1390
1391void ndag_constructor(void) {
1392        register_format(&ndag);
1393}
Note: See TracBrowser for help on using the repository browser.