source: lib/format_ndag.c @ be32cc7

cachetimestampsdevelopdpdk-ndagetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since be32cc7 was be32cc7, checked in by Shane Alcock <salcock@…>, 3 years ago

Added missing copyright blurbs to format_ndag sources.

  • Property mode set to 100644
File size: 44.1 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#define _GNU_SOURCE
29
30#include "config.h"
31#include "common.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "format_erf.h"
36
37#include <assert.h>
38#include <errno.h>
39#include <fcntl.h>
40#include <stdio.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <net/if.h>
45#include <sys/types.h>
46#include <sys/socket.h>
47#include <netdb.h>
48
49#include "format_ndag.h"
50
51#define NDAG_IDLE_TIMEOUT (600)
52#define ENCAP_BUFSIZE (10000)
53#define CTRL_BUF_SIZE (10000)
54#define ENCAP_BUFFERS (1000)
55
56#define RECV_BATCH_SIZE (50)
57
58#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
59
60static struct libtrace_format_t ndag;
61
62volatile int ndag_paused = 0;
63
64typedef struct monitor {
65        uint16_t monitorid;
66        uint64_t laststart;
67} ndag_monitor_t;
68
69
70typedef struct streamsource {
71        uint16_t monitor;
72        char *groupaddr;
73        char *localiface;
74        uint16_t port;
75} streamsource_t;
76
77typedef struct streamsock {
78        char *groupaddr;
79        int sock;
80        struct addrinfo *srcaddr;
81        uint16_t port;
82        uint32_t expectedseq;
83        ndag_monitor_t *monitorptr;
84        char **saved;
85        char *nextread;
86        int nextreadind;
87        int nextwriteind;
88        int savedsize[ENCAP_BUFFERS];
89        uint32_t startidle;
90        uint64_t recordcount;
91
92        int bufavail;
93
94#if HAVE_RECVMMSG
95        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
96#else
97        struct msghdr singlemsg;
98#endif
99
100} streamsock_t;
101
102typedef struct recvstream {
103        streamsock_t *sources;
104        uint16_t sourcecount;
105        libtrace_message_queue_t mqueue;
106        int threadindex;
107        ndag_monitor_t *knownmonitors;
108        uint16_t monitorcount;
109
110        uint64_t dropped_upstream;
111        uint64_t missing_records;
112        uint64_t received_packets;
113} recvstream_t;
114
115typedef struct ndag_format_data {
116        char *multicastgroup;
117        char *portstr;
118        char *localiface;
119        uint16_t nextthreadid;
120        recvstream_t *receivers;
121
122        pthread_t controlthread;
123        libtrace_message_queue_t controlqueue;
124} ndag_format_data_t;
125
126enum {
127        NDAG_CLIENT_HALT = 0x01,
128        NDAG_CLIENT_RESTARTED = 0x02,   // redundant
129        NDAG_CLIENT_NEWGROUP = 0x03
130};
131
132typedef struct ndagreadermessage {
133        uint8_t type;
134        streamsource_t contents;
135} ndag_internal_message_t;
136
137
138static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
139
140        /* Calculate seq_a - seq_b, taking wraparound into account */
141        if (seq_a == seq_b) return 0;
142
143        if (seq_a > seq_b) {
144                return (int) (seq_a - seq_b);
145        }
146
147        /* -1 for the wrap and another -1 because we don't use zero */
148        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
149}
150
151static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
152        ndag_common_t *header = (ndag_common_t *)msgbuf;
153
154        if (msgsize < sizeof(ndag_common_t)) {
155                fprintf(stderr,
156                        "nDAG message does not have a complete nDAG header.\n");
157                return 0;
158        }
159
160        if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
161                fprintf(stderr,
162                        "nDAG message does not have a valid magic number.\n");
163                return 0;
164        }
165
166        if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
167                fprintf(stderr,
168                        "nDAG message has an invalid header version: %u\n",
169                                header->version);
170                return 0;
171        }
172
173        return header->type;
174}
175
176static int join_multicast_group(char *groupaddr, char *localiface,
177        char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
178
179        struct addrinfo hints;
180        struct addrinfo *gotten;
181        struct addrinfo *group;
182        unsigned int interface;
183        char pstr[16];
184        struct group_req greq;
185        int bufsize;
186
187        int sock;
188
189        if (portstr == NULL) {
190                snprintf(pstr, 15, "%u", portnum);
191                portstr = pstr;
192        }
193
194        interface = if_nametoindex(localiface);
195        if (interface == 0) {
196                fprintf(stderr, "Failed to lookup interface %s -- %s\n",
197                                localiface, strerror(errno));
198                return -1;
199        }
200
201        hints.ai_family = PF_UNSPEC;
202        hints.ai_socktype = SOCK_DGRAM;
203        hints.ai_flags = AI_PASSIVE;
204        hints.ai_protocol = 0;
205
206        if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
207                fprintf(stderr,
208                        "Call to getaddrinfo failed for NULL:%s -- %s\n",
209                                portstr, strerror(errno));
210                return -1;
211        }
212
213        if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
214                fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
215                                groupaddr, strerror(errno));
216                return -1;
217        }
218
219        *srcinfo = gotten;
220        sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
221        if (sock < 0) {
222                fprintf(stderr,
223                        "Failed to create multicast socket for %s:%s -- %s\n",
224                                groupaddr, portstr, strerror(errno));
225                goto sockcreateover;
226        }
227
228        if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
229        {
230                fprintf(stderr,
231                        "Failed to bind to multicast socket %s:%s -- %s\n",
232                                groupaddr, portstr, strerror(errno));
233                close(sock);
234                sock = -1;
235                goto sockcreateover;
236        }
237
238        greq.gr_interface = interface;
239        memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
240
241        if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
242                        sizeof(greq)) < 0) {
243                fprintf(stderr,
244                        "Failed to join multicast group %s:%s -- %s\n",
245                                groupaddr, portstr, strerror(errno));
246                close(sock);
247                sock = -1;
248                goto sockcreateover;
249        }
250
251        bufsize = 16 * 1024 * 1024;
252        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
253                                (socklen_t)sizeof(int)) < 0) {
254
255                fprintf(stderr,
256                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
257                                groupaddr, portstr, strerror(errno));
258                close(sock);
259                sock = -1;
260                goto sockcreateover;
261        }
262
263sockcreateover:
264        freeaddrinfo(group);
265        return sock;
266}
267
268
269static int ndag_init_input(libtrace_t *libtrace) {
270
271        char *scan = NULL;
272        char *next = NULL;
273
274        libtrace->format_data = (ndag_format_data_t *)malloc(
275                        sizeof(ndag_format_data_t));
276
277        FORMAT_DATA->multicastgroup = NULL;
278        FORMAT_DATA->portstr = NULL;
279        FORMAT_DATA->localiface = NULL;
280        FORMAT_DATA->nextthreadid = 0;
281        FORMAT_DATA->receivers = NULL;
282
283        scan = strchr(libtrace->uridata, ',');
284        if (scan == NULL) {
285                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
286                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
287                return -1;
288        }
289        FORMAT_DATA->localiface = strndup(libtrace->uridata,
290                        (size_t)(scan - libtrace->uridata));
291        next = scan + 1;
292
293        scan = strchr(next, ',');
294        if (scan == NULL) {
295                FORMAT_DATA->portstr = strdup("9001");
296                FORMAT_DATA->multicastgroup = strdup(next);
297        } else {
298                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
299
300                FORMAT_DATA->portstr = strdup(scan + 1);
301        }
302        return 0;
303}
304
305static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
306                uint16_t portnum, uint16_t monid) {
307
308        ndag_internal_message_t alert;
309
310        alert.type = NDAG_CLIENT_NEWGROUP;
311        alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
312        alert.contents.localiface = FORMAT_DATA->localiface;
313        alert.contents.port = portnum;
314        alert.contents.monitor = monid;
315
316        libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
317                        (void *)&alert);
318
319}
320       
321static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
322                int msgsize, uint16_t *ptmap) {
323
324        int i;
325        ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
326        uint8_t msgtype;
327
328        msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
329        if (msgtype == 0) {
330                return -1;
331        }
332
333        msgsize -= sizeof(ndag_common_t);
334        if (msgtype == NDAG_PKT_BEACON) {
335                /* If message is a beacon, make sure every port included in the
336                 * beacon is assigned to a receive thread.
337                 */
338                uint16_t *ptr, numstreams;
339
340                if ((uint32_t)msgsize < sizeof(uint16_t)) {
341                        fprintf(stderr, "Malformed beacon (missing number of streams).\n");
342                        return -1;
343                }
344
345                ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
346                numstreams = ntohs(*ptr);
347                ptr ++;
348
349                if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
350                {
351                        fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
352                        fprintf(stderr, "%u %u\n", msgsize, numstreams);
353                        return -1;
354                }
355
356                for (i = 0; i < numstreams; i++) {
357                        uint16_t streamport = ntohs(*ptr);
358
359                        if (ptmap[streamport] == 0xffff) {
360                                new_group_alert(libtrace,
361                                        FORMAT_DATA->nextthreadid, streamport,
362                                        ntohs(ndaghdr->monitorid));
363
364                                ptmap[streamport] = FORMAT_DATA->nextthreadid;
365
366                                if (libtrace->perpkt_thread_count == 0) {
367                                        FORMAT_DATA->nextthreadid = 0;
368                                } else {
369                                        FORMAT_DATA->nextthreadid =
370                                                ((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
371                                }
372                        }
373
374                        ptr ++;
375                }
376        } else {
377                fprintf(stderr,
378                        "Unexpected message type on control channel: %u\n",
379                         msgtype);
380                return -1;
381        }
382
383        return 0;
384
385}
386
387static void *ndag_controller_run(void *tdata) {
388
389        libtrace_t *libtrace = (libtrace_t *)tdata;
390        uint16_t ptmap[65536];
391        int sock = -1;
392        struct addrinfo *receiveaddr = NULL;
393        fd_set listening;
394        struct timeval timeout;
395
396        /* ptmap is a dirty hack to allow us to quickly check if we've already
397         * assigned a stream to a thread.
398         */
399        memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
400
401        sock = join_multicast_group(FORMAT_DATA->multicastgroup,
402                        FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
403                        &receiveaddr);
404        if (sock == -1) {
405                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
406                        "Unable to join multicast group for nDAG control channel");
407                trace_interrupt();
408        }
409
410        ndag_paused = 0;
411        while ((is_halted(libtrace) == -1) && !ndag_paused) {
412                int ret;
413                char buf[CTRL_BUF_SIZE];
414
415                FD_ZERO(&listening);
416                FD_SET(sock, &listening);
417
418                timeout.tv_sec = 0;
419                timeout.tv_usec = 500000;
420
421                ret = select(sock + 1, &listening, NULL, NULL, &timeout);
422                if (ret < 0) {
423                        fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
424                        break;
425                }
426
427                if (!FD_ISSET(sock, &listening)) {
428                        continue;
429                }
430
431                ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
432                                receiveaddr->ai_addr,
433                                &(receiveaddr->ai_addrlen));
434                if (ret < 0) {
435                        fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
436                        break;
437                }
438
439                if (ret == 0) {
440                        break;
441                }
442
443                if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
444                        fprintf(stderr, "Error while parsing nDAG control message.\n");
445                        continue;
446                }
447        }
448
449        if (sock >= 0) {
450                close(sock);
451        }
452
453        /* Control channel has fallen over, should probably encourage libtrace
454         * to halt the receiver threads as well.
455         */
456        if (!is_halted(libtrace)) {
457                trace_interrupt();
458        }
459
460        pthread_exit(NULL);
461}
462
463static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
464{
465        int ret;
466        uint32_t i;
467        /* Configure the set of receiver threads */
468
469        if (FORMAT_DATA->receivers == NULL) {
470                /* What if the number of threads changes between a pause and
471                 * a restart? Can this happen? */
472                FORMAT_DATA->receivers = (recvstream_t *)
473                                malloc(sizeof(recvstream_t) * maxthreads);
474        }
475
476        for (i = 0; i < maxthreads; i++) {
477                FORMAT_DATA->receivers[i].sources = NULL;
478                FORMAT_DATA->receivers[i].sourcecount = 0;
479                FORMAT_DATA->receivers[i].knownmonitors = NULL;
480                FORMAT_DATA->receivers[i].monitorcount = 0;
481                FORMAT_DATA->receivers[i].threadindex = i;
482                FORMAT_DATA->receivers[i].dropped_upstream = 0;
483                FORMAT_DATA->receivers[i].received_packets = 0;
484                FORMAT_DATA->receivers[i].missing_records = 0;
485
486                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
487                                sizeof(ndag_internal_message_t));
488        }
489
490        /* Start the controller thread */
491        /* TODO consider affinity of this thread? */
492
493        ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
494                        ndag_controller_run, libtrace);
495        if (ret != 0) {
496                return -1;
497        }
498        return maxthreads;
499}
500
501static int ndag_start_input(libtrace_t *libtrace) {
502        return ndag_start_threads(libtrace, 1);
503}
504
505static int ndag_pstart_input(libtrace_t *libtrace) {
506        if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
507                        libtrace->perpkt_thread_count)
508                return 0;
509        return -1;
510}
511
512static void halt_ndag_receiver(recvstream_t *receiver) {
513        int j, i;
514        libtrace_message_queue_destroy(&(receiver->mqueue));
515
516        if (receiver->sources == NULL)
517                return;
518        for (i = 0; i < receiver->sourcecount; i++) {
519                streamsock_t src = receiver->sources[i];
520                if (src.saved) {
521                        for (j = 0; j < ENCAP_BUFFERS; j++) {
522                                if (src.saved[j]) {
523                                        free(src.saved[j]);
524                                }
525                        }
526                        free(src.saved);
527                }
528
529#if HAVE_RECVMMSG
530                for (j = 0; j < RECV_BATCH_SIZE; j++) {
531                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
532                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
533                        }
534                }
535#else
536                free(src.singlemsg.msg_iov);
537#endif
538
539                close(src.sock);
540        }
541        if (receiver->knownmonitors) {
542                free(receiver->knownmonitors);
543        }
544
545        if (receiver->sources) {
546                free(receiver->sources);
547        }
548}
549
550static int ndag_pause_input(libtrace_t *libtrace) {
551        int i;
552
553        /* Close the existing receiver sockets */
554        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
555               halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
556        }
557        return 0;
558}
559
560static int ndag_fin_input(libtrace_t *libtrace) {
561
562        if (FORMAT_DATA->receivers) {
563                free(FORMAT_DATA->receivers);
564        }
565        if (FORMAT_DATA->multicastgroup) {
566                free(FORMAT_DATA->multicastgroup);
567        }
568        if (FORMAT_DATA->portstr) {
569                free(FORMAT_DATA->portstr);
570        }
571        if (FORMAT_DATA->localiface) {
572                free(FORMAT_DATA->localiface);
573        }
574
575        free(libtrace->format_data);
576        return 0;
577}
578
579static int ndag_prepare_packet_stream(libtrace_t *libtrace,
580                recvstream_t *rt,
581                streamsock_t *ssock, libtrace_packet_t *packet,
582                uint32_t flags) {
583
584        dag_record_t *erfptr;
585        ndag_encap_t *encaphdr;
586        uint16_t ndag_reccount = 0;
587        int nr;
588
589        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
590                packet->buf_control = TRACE_CTRL_PACKET;
591        } else {
592                packet->buf_control = TRACE_CTRL_EXTERNAL;
593        }
594
595        packet->trace = libtrace;
596        packet->buffer = ssock->nextread;
597        packet->header = ssock->nextread;
598        packet->type = TRACE_RT_DATA_ERF;
599
600        erfptr = (dag_record_t *)packet->header;
601
602        if (erfptr->flags.rxerror == 1) {
603                packet->payload = NULL;
604                erfptr->rlen = htons(erf_get_framing_length(packet));
605        } else {
606                packet->payload = (char *)packet->buffer +
607                                erf_get_framing_length(packet);
608        }
609
610        /* Update upstream drops using lctr */
611
612        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
613                /* TODO */
614        } else {
615                if (rt->received_packets > 0) {
616                        rt->dropped_upstream += ntohs(erfptr->lctr);
617                }
618        }
619
620        rt->received_packets ++;
621        ssock->recordcount += 1;
622
623        nr = ssock->nextreadind;
624        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
625                        sizeof(ndag_common_t));
626
627        ndag_reccount = ntohs(encaphdr->recordcount);
628        if ((ndag_reccount & 0x8000) != 0) {
629                /* Record was truncated -- update rlen appropriately */
630                erfptr->rlen = htons(ssock->savedsize[nr] -
631                                (ssock->nextread - ssock->saved[nr]));
632        }
633        ssock->nextread += ntohs(erfptr->rlen);
634
635        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
636                /* Read everything from this buffer, mark as empty and
637                 * move on. */
638                ssock->savedsize[nr] = 0;
639                ssock->bufavail ++;
640
641                assert(ssock->bufavail > 0 && ssock->bufavail <= ENCAP_BUFFERS);
642                nr ++;
643                if (nr == ENCAP_BUFFERS) {
644                        nr = 0;
645                }
646                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
647                                sizeof(ndag_encap_t);
648                ssock->nextreadind = nr;
649        }
650
651        packet->order = erf_get_erf_timestamp(packet);
652        packet->error = packet->payload ? ntohs(erfptr->rlen) :
653                        erf_get_framing_length(packet);
654
655        return ntohs(erfptr->rlen);
656}
657
658static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
659                libtrace_packet_t *packet UNUSED,
660                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
661                uint32_t flags UNUSED) {
662
663        assert(0 && "Sending nDAG records over RT doesn't make sense! Please stop.");
664        return 0;
665
666}
667
668static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
669
670        ndag_monitor_t *mon;
671
672        if (rt->monitorcount == 0) {
673                rt->knownmonitors = (ndag_monitor_t *)
674                                malloc(sizeof(ndag_monitor_t) * 5);
675        } else {
676                rt->knownmonitors = (ndag_monitor_t *)
677                            realloc(rt->knownmonitors,
678                            sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
679        }
680
681        mon = &(rt->knownmonitors[rt->monitorcount]);
682        mon->monitorid = monid;
683        mon->laststart = 0;
684
685        rt->monitorcount ++;
686        return mon;
687}
688
689static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
690
691        streamsock_t *ssock = NULL;
692        ndag_monitor_t *mon = NULL;
693        int i;
694
695        /* TODO consider replacing this with a list or vector so we can
696         * easily remove sources that are no longer in use, rather than
697         * just setting the sock to -1 and having to check them every
698         * time we want to read a packet.
699         */
700        if (rt->sourcecount == 0) {
701                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
702        } else if ((rt->sourcecount % 10) == 0) {
703                rt->sources = (streamsock_t *)realloc(rt->sources,
704                        sizeof(streamsock_t) * (rt->sourcecount + 10));
705        }
706
707        ssock = &(rt->sources[rt->sourcecount]);
708
709        ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
710                        NULL, src.port, &(ssock->srcaddr));
711
712        if (ssock->sock < 0) {
713                return -1;
714        }
715
716        for (i = 0; i < rt->monitorcount; i++) {
717                if (rt->knownmonitors[i].monitorid == src.monitor) {
718                        mon = &(rt->knownmonitors[i]);
719                        break;
720                }
721        }
722
723        if (mon == NULL) {
724                mon = add_new_knownmonitor(rt, src.monitor);
725        }
726
727        ssock->port = src.port;
728        ssock->groupaddr = src.groupaddr;
729        ssock->expectedseq = 0;
730        ssock->monitorptr = mon;
731        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
732        ssock->bufavail = ENCAP_BUFFERS;
733        ssock->startidle = 0;
734
735        for (i = 0; i < ENCAP_BUFFERS; i++) {
736                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
737                ssock->savedsize[i] = 0;
738        }
739
740#if HAVE_RECVMMSG
741        for (i = 0; i < RECV_BATCH_SIZE; i++) {
742                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
743                                malloc(sizeof(struct iovec));
744                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
745                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
746                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
747                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
748                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
749                ssock->mmsgbufs[i].msg_len = 0;
750        }
751#else
752        ssock->singlemsg.msg_iov = (struct iovec *) malloc(sizeof(struct iovec));
753#endif
754
755        ssock->nextread = NULL;;
756        ssock->nextreadind = 0;
757        ssock->recordcount = 0;
758        rt->sourcecount += 1;
759
760        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
761                        ssock->groupaddr, ssock->port, rt->threadindex);
762
763        return ssock->port;
764}
765
766static int receiver_read_messages(recvstream_t *rt) {
767
768        ndag_internal_message_t msg;
769
770        while (libtrace_message_queue_try_get(&(rt->mqueue),
771                                (void *)&msg) != LIBTRACE_MQ_FAILED) {
772                switch(msg.type) {
773                        case NDAG_CLIENT_NEWGROUP:
774                                if (add_new_streamsock(rt, msg.contents) < 0) {
775                                        return -1;
776                                }
777                                break;
778                        case NDAG_CLIENT_HALT:
779                                return 0;
780                }
781        }
782        return 1;
783
784}
785
786static inline int readable_data(streamsock_t *ssock) {
787
788        if (ssock->sock == -1) {
789                return 0;
790        }
791        if (ssock->savedsize[ssock->nextreadind] == 0) {
792                return 0;
793        }
794        /*
795        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
796                        ssock->savedsize[ssock->nextreadind]) {
797                return 0;
798        }
799        */
800        return 1;
801
802
803}
804
805static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
806
807        int i;
808        for (i = 0; i < rt->sourcecount; i++) {
809                if (rt->sources[i].monitorptr == mon) {
810                        rt->sources[i].expectedseq = 0;
811                }
812        }
813
814}
815
816static int init_receivers(streamsock_t *ssock, int required) {
817
818        int wind = ssock->nextwriteind;
819        int i = 1;
820
821#if HAVE_RECVMMSG
822        for (i = 0; i < required; i++) {
823                if (i >= RECV_BATCH_SIZE) {
824                        break;
825                }
826
827                if (wind >= ENCAP_BUFFERS) {
828                        wind = 0;
829                }
830
831                ssock->mmsgbufs[i].msg_len = 0;
832                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
833                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
834                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
835
836                wind ++;
837        }
838#else
839        assert(required > 0);
840        ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
841        ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
842        ssock->singlemsg.msg_iovlen = 1;
843#endif
844        return i;
845}
846
847static int check_ndag_received(streamsock_t *ssock, int index,
848                unsigned int msglen, recvstream_t *rt) {
849
850        ndag_encap_t *encaphdr;
851        ndag_monitor_t *mon;
852        uint8_t rectype;
853
854        /* Check that we have a valid nDAG encap record */
855        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
856
857        if (rectype == NDAG_PKT_KEEPALIVE) {
858                /* Keep-alive, reset startidle and carry on. Don't
859                 * change nextwrite -- we want to overwrite the
860                 * keep-alive with usable content. */
861                return 0;
862        } else if (rectype != NDAG_PKT_ENCAPERF) {
863                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
864                                ssock->groupaddr, ssock->port);
865                close(ssock->sock);
866                ssock->sock = -1;
867                return -1;
868        }
869
870        ssock->savedsize[index] = msglen;
871        ssock->nextwriteind ++;
872        ssock->bufavail --;
873
874        assert(ssock->bufavail >= 0);
875
876        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
877                ssock->nextwriteind = 0;
878        }
879
880        /* Get the useful info from the encap header */
881        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
882
883        mon = ssock->monitorptr;
884
885        if (mon->laststart == 0) {
886                mon->laststart = bswap_be_to_host64(encaphdr->started);
887        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
888                mon->laststart = bswap_be_to_host64(encaphdr->started);
889                reset_expected_seqs(rt, mon);
890
891                /* TODO what is a good way to indicate this to clients?
892                 * set the loss counter in the ERF header? a bit rude?
893                 * use another bit in the ERF header?
894                 * add a queryable flag to libtrace_packet_t?
895                 */
896
897        }
898
899        if (ssock->expectedseq != 0) {
900                rt->missing_records += seq_cmp(
901                                ntohl(encaphdr->seqno), ssock->expectedseq);
902        }
903        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
904        if (ssock->expectedseq == 0) {
905                ssock->expectedseq ++;
906        }
907
908        if (ssock->nextread == NULL) {
909                /* If this is our first read, set up 'nextread'
910                 * by skipping past the nDAG headers */
911                ssock->nextread = ssock->saved[0] +
912                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
913        }
914        return 1;
915
916}
917
918static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
919                int *gottime, recvstream_t *rt) {
920
921        int ret, ndagstat, avail;
922        int toret = 0;
923
924#if HAVE_RECVMMSG
925        int i;
926#endif
927
928        if (ssock->sock == -1) {
929                return 0;
930        }
931
932#if HAVE_RECVMMSG
933        /* Plenty of full buffers, just use the packets in those */
934        if (ssock->bufavail < RECV_BATCH_SIZE / 2) {
935                return 1;
936        }
937#else
938        if (ssock->bufavail == 0) {
939                return 1;
940        }
941#endif
942
943        avail = init_receivers(ssock, ssock->bufavail);
944
945#if HAVE_RECVMMSG
946        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
947                        MSG_DONTWAIT, NULL);
948#else
949        ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
950#endif
951        if (ret < 0) {
952                /* Nothing to receive right now, but we should still
953                 * count as 'ready' if at least one buffer is full */
954                if (errno == EAGAIN || errno == EWOULDBLOCK) {
955                        if (readable_data(ssock)) {
956                                toret = 1;
957                        }
958                        if (!(*gottime)) {
959                                gettimeofday(tv, NULL);
960                                *gottime = 1;
961                        }
962                        if (ssock->startidle == 0) {
963                                ssock->startidle = tv->tv_sec;
964                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
965                                fprintf(stderr,
966                                        "Closing channel %s:%u due to inactivity.\n",
967                                        ssock->groupaddr,
968                                        ssock->port);
969
970                                close(ssock->sock);
971                                ssock->sock = -1;
972                        }
973                } else {
974
975                        fprintf(stderr,
976                                "Error receiving encapsulated records from %s:%u -- %s \n",
977                                ssock->groupaddr, ssock->port,
978                                strerror(errno));
979                        close(ssock->sock);
980                        ssock->sock = -1;
981                }
982                return toret;
983        }
984
985        ssock->startidle = 0;
986
987#if HAVE_RECVMMSG
988        for (i = 0; i < ret; i++) {
989                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
990                                ssock->mmsgbufs[i].msg_len, rt);
991                if (ndagstat == -1) {
992                        break;
993                }
994
995                if (ndagstat == 1) {
996                        toret = 1;
997                }
998        }
999#else
1000        ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
1001        if (ndagstat <= 0) {
1002                toret = 0;
1003        } else {
1004                toret = 1;
1005        }
1006#endif
1007
1008        return toret;
1009}
1010
1011static int receive_from_sockets(recvstream_t *rt) {
1012
1013        int i, readybufs, gottime;
1014        struct timeval tv;
1015
1016        readybufs = 0;
1017        gottime = 0;
1018
1019        for (i = 0; i < rt->sourcecount; i ++) {
1020                readybufs += receive_from_single_socket(&(rt->sources[i]),
1021                                &tv, &gottime, rt);
1022        }
1023
1024        return readybufs;
1025
1026}
1027
1028
1029static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
1030                libtrace_packet_t *packet) {
1031
1032        int iserr = 0;
1033
1034        if (packet->buf_control == TRACE_CTRL_PACKET) {
1035                free(packet->buffer);
1036                packet->buffer = NULL;
1037        }
1038
1039        do {
1040                /* Make sure we shouldn't be halting */
1041                if ((iserr = is_halted(libtrace)) != -1) {
1042                        return iserr;
1043                }
1044
1045                /* Check for any messages from the control thread */
1046                iserr = receiver_read_messages(rt);
1047
1048                if (iserr <= 0) {
1049                        return iserr;
1050                }
1051
1052                /* If blocking and no sources, sleep for a bit and then try
1053                 * checking for messages again.
1054                 */
1055                if (rt->sourcecount == 0) {
1056                        usleep(10000);
1057                        continue;
1058                }
1059
1060                if ((iserr = receive_from_sockets(rt)) < 0) {
1061                        return iserr;
1062                } else if (iserr > 0) {
1063                        /* At least one of our input sockets has available
1064                         * data, let's go ahead and use what we have. */
1065                        break;
1066                }
1067
1068                /* None of our sources have anything available, we can take
1069                 * a short break rather than immediately trying again.
1070                 */
1071                if (iserr == 0) {
1072                        usleep(100);
1073                }
1074
1075        } while (1);
1076
1077        return iserr;
1078}
1079
1080static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
1081                libtrace_packet_t *packet) {
1082
1083        int iserr = 0;
1084
1085        if (packet->buf_control == TRACE_CTRL_PACKET) {
1086                free(packet->buffer);
1087                packet->buffer = NULL;
1088        }
1089
1090        /* Make sure we shouldn't be halting */
1091        if ((iserr = is_halted(libtrace)) != -1) {
1092                return iserr;
1093        }
1094
1095        /* If non-blocking and there are no sources, just break */
1096        if (rt->sourcecount == 0) {
1097                return 0;
1098        }
1099
1100        return receive_from_sockets(rt);
1101}
1102
1103static streamsock_t *select_next_packet(recvstream_t *rt) {
1104        int i;
1105        streamsock_t *ssock = NULL;
1106        uint64_t earliest = 0;
1107        uint64_t currentts = 0;
1108        dag_record_t *daghdr;
1109
1110        for (i = 0; i < rt->sourcecount; i ++) {
1111                if (!readable_data(&(rt->sources[i]))) {
1112                        continue;
1113                }
1114
1115                daghdr = (dag_record_t *)(rt->sources[i].nextread);
1116                currentts = bswap_le_to_host64(daghdr->ts);
1117
1118                if (earliest == 0 || earliest > currentts) {
1119                        earliest = currentts;
1120                        ssock = &(rt->sources[i]);
1121                }
1122                /*
1123                fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
1124                                i, currentts,
1125                                rt->sources[i].recordcount,
1126                                rt->missing_records);
1127                */
1128        }
1129        return ssock;
1130}
1131
1132static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
1133
1134        int rem;
1135        streamsock_t *nextavail = NULL;
1136        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
1137                        packet);
1138
1139        if (rem <= 0) {
1140                return rem;
1141        }
1142
1143        nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1144        if (nextavail == NULL) {
1145                return 0;
1146        }
1147
1148        /* nextread should point at an ERF header, so prepare 'packet' to be
1149         * a libtrace ERF packet. */
1150
1151        return ndag_prepare_packet_stream(libtrace,
1152                        &(FORMAT_DATA->receivers[0]), nextavail,
1153                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1154}
1155
1156static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
1157                libtrace_packet_t **packets, size_t nb_packets) {
1158
1159        recvstream_t *rt;
1160        int rem;
1161        size_t read_packets = 0;
1162        streamsock_t *nextavail = NULL;
1163
1164        rt = (recvstream_t *)t->format_data;
1165
1166
1167        do {
1168                /* Only check for messages once per batch */
1169                if (read_packets == 0) {
1170                        rem = receive_encap_records_block(libtrace, rt,
1171                                packets[read_packets]);
1172                } else {
1173                        rem = receive_encap_records_nonblock(libtrace, rt,
1174                                packets[read_packets]);
1175                }
1176
1177                if (rem < 0) {
1178                        return rem;
1179                }
1180
1181                if (rem == 0) {
1182                        break;
1183                }
1184
1185                nextavail = select_next_packet(rt);
1186                if (nextavail == NULL) {
1187                        break;
1188                }
1189
1190                ndag_prepare_packet_stream(libtrace, rt, nextavail,
1191                                packets[read_packets],
1192                                TRACE_PREP_DO_NOT_OWN_BUFFER);
1193
1194                read_packets  ++;
1195                if (read_packets >= nb_packets) {
1196                        break;
1197                }
1198        } while (1);
1199
1200        return read_packets;
1201
1202}
1203
1204static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
1205                libtrace_packet_t *packet) {
1206
1207
1208        libtrace_eventobj_t event = {0,0,0.0,0};
1209        int rem;
1210        streamsock_t *nextavail = NULL;
1211
1212        /* Only check for messages once per call */
1213        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
1214        if (rem <= 0) {
1215                event.type = TRACE_EVENT_TERMINATE;
1216                return event;
1217        }
1218
1219        do {
1220                rem = receive_encap_records_nonblock(libtrace,
1221                                &(FORMAT_DATA->receivers[0]), packet);
1222
1223                if (rem < 0) {
1224                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
1225                                "Received invalid nDAG records.");
1226                        event.type = TRACE_EVENT_TERMINATE;
1227                        break;
1228                }
1229
1230                if (rem == 0) {
1231                        /* Either we've been halted or we've got no packets
1232                         * right now. */
1233                        if (is_halted(libtrace) == 0) {
1234                                event.type = TRACE_EVENT_TERMINATE;
1235                                break;
1236                        }
1237                        event.type = TRACE_EVENT_SLEEP;
1238                        event.seconds = 0.0001;
1239                        break;
1240                }
1241
1242                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
1243                if (nextavail == NULL) {
1244                        event.type = TRACE_EVENT_SLEEP;
1245                        event.seconds = 0.0001;
1246                        break;
1247                }
1248
1249                event.type = TRACE_EVENT_PACKET;
1250                ndag_prepare_packet_stream(libtrace,
1251                                &(FORMAT_DATA->receivers[0]), nextavail,
1252                                packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
1253                event.size = trace_get_capture_length(packet) +
1254                                trace_get_framing_length(packet);
1255
1256                if (libtrace->filter) {
1257                        int filtret = trace_apply_filter(libtrace->filter,
1258                                        packet);
1259                        if (filtret == -1) {
1260                                trace_set_err(libtrace,
1261                                                TRACE_ERR_BAD_FILTER,
1262                                                "Bad BPF Filter");
1263                                event.type = TRACE_EVENT_TERMINATE;
1264                                break;
1265                        }
1266
1267                        if (filtret == 0) {
1268                                /* Didn't match filter, try next one */
1269                                libtrace->filtered_packets ++;
1270                                trace_clear_cache(packet);
1271                                continue;
1272                        }
1273                }
1274
1275                if (libtrace->snaplen > 0) {
1276                        trace_set_capture_length(packet, libtrace->snaplen);
1277                }
1278                libtrace->accepted_packets ++;
1279                break;
1280        } while (1);
1281
1282        return event;
1283}
1284
1285static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
1286
1287        int i;
1288
1289        stat->dropped_valid = 1;
1290        stat->dropped = 0;
1291        stat->received_valid = 1;
1292        stat->received = 0;
1293        stat->missing_valid = 1;
1294        stat->missing = 0;
1295
1296        /* TODO Is this thread safe? */
1297        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
1298                stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
1299                stat->received += FORMAT_DATA->receivers[i].received_packets;
1300                stat->missing += FORMAT_DATA->receivers[i].missing_records;
1301        }
1302
1303}
1304
1305static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
1306                libtrace_stat_t *stat) {
1307
1308        recvstream_t *recvr = (recvstream_t *)t->format_data;
1309
1310        if (libtrace == NULL)
1311                return;
1312        /* TODO Is this thread safe */
1313        stat->dropped_valid = 1;
1314        stat->dropped = recvr->dropped_upstream;
1315
1316        stat->received_valid = 1;
1317        stat->received = recvr->received_packets;
1318
1319        stat->missing_valid = 1;
1320        stat->missing = recvr->missing_records;
1321
1322}
1323
1324static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
1325                bool reader) {
1326        recvstream_t *recvr;
1327
1328        if (!reader || t->type != THREAD_PERPKT) {
1329                return 0;
1330        }
1331
1332        recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
1333        t->format_data = recvr;
1334
1335        return 0;
1336}
1337
1338static struct libtrace_format_t ndag = {
1339
1340        "ndag",
1341        "",
1342        TRACE_FORMAT_NDAG,
1343        NULL,                   /* probe filename */
1344        NULL,                   /* probe magic */
1345        ndag_init_input,        /* init_input */
1346        NULL,                   /* config_input */
1347        ndag_start_input,       /* start_input */
1348        ndag_pause_input,       /* pause_input */
1349        NULL,                   /* init_output */
1350        NULL,                   /* config_output */
1351        NULL,                   /* start_output */
1352        ndag_fin_input,         /* fin_input */
1353        NULL,                   /* fin_output */
1354        ndag_read_packet,       /* read_packet */
1355        ndag_prepare_packet,    /* prepare_packet */
1356        NULL,                   /* fin_packet */
1357        NULL,                   /* write_packet */
1358        erf_get_link_type,      /* get_link_type */
1359        erf_get_direction,      /* get_direction */
1360        erf_set_direction,      /* set_direction */
1361        erf_get_erf_timestamp,  /* get_erf_timestamp */
1362        NULL,                   /* get_timeval */
1363        NULL,                   /* get_seconds */
1364        NULL,                   /* get_timespec */
1365        NULL,                   /* seek_erf */
1366        NULL,                   /* seek_timeval */
1367        NULL,                   /* seek_seconds */
1368        erf_get_capture_length, /* get_capture_length */
1369        erf_get_wire_length,    /* get_wire_length */
1370        erf_get_framing_length, /* get_framing_length */
1371        erf_set_capture_length, /* set_capture_length */
1372        NULL,                   /* get_received_packets */
1373        NULL,                   /* get_filtered_packets */
1374        NULL,                   /* get_dropped_packets */
1375        ndag_get_statistics,    /* get_statistics */
1376        NULL,                   /* get_fd */
1377        trace_event_ndag,       /* trace_event */
1378        NULL,                   /* help */
1379        NULL,                   /* next pointer */
1380        {true, 0},              /* live packet capture */
1381        ndag_pstart_input,      /* parallel start */
1382        ndag_pread_packets,     /* parallel read */
1383        ndag_pause_input,       /* parallel pause */
1384        NULL,
1385        ndag_pregister_thread,  /* register thread */
1386        NULL,
1387        ndag_get_thread_stats   /* per-thread stats */
1388};
1389
1390void ndag_constructor(void) {
1391        register_format(&ndag);
1392}
Note: See TracBrowser for help on using the repository browser.