source: lib/format_dpdkndag.c @ c7e547e

cachetimestampsdevelopdpdk-ndagetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since c7e547e was c7e547e, checked in by Shane Alcock <salcock@…>, 3 years ago

Added a dpdkndag format for faster ndag reading

Instead of joining a multicast group and receiving nDAG packets
via the networking stack, this new format uses DPDK to sniff
the multicast direct from the wire. This should save some effort
shuffling the packets back through the kernel's networking stack.

  • Property mode set to 100644
File size: 19.8 KB
Line 
1
2#include "config.h"
3#include "common.h"
4#include "libtrace.h"
5#include "libtrace_int.h"
6#include "format_helper.h"
7#include "format_erf.h"
8
9#include <assert.h>
10#include <errno.h>
11#include <fcntl.h>
12#include <stdio.h>
13#include <string.h>
14#include <unistd.h>
15#include <stdlib.h>
16
17#include "format_dpdk.h"
18#include "format_ndag.h"
19
20static struct libtrace_format_t dpdkndag;
21
22typedef struct capstream {
23
24        uint16_t port;
25        uint32_t expectedseq;
26        uint64_t recordcount;
27} capstream_t;
28
29typedef struct perthread {
30        capstream_t *capstreams;
31        uint16_t streamcount;
32        uint64_t dropped_upstream;
33        uint64_t missing_records;
34        uint64_t received_packets;
35
36        libtrace_packet_t *dpdkpkt;
37        char *ndagheader;
38        char *nextrec;
39        uint32_t ndagsize;
40
41        dpdk_per_stream_t *dpdkstreamdata;
42        int burstsize;
43        int burstoffset;
44        struct rte_mbuf* burstspace[20];
45
46} perthread_t;
47
48
49typedef struct dpdkndag_format_data {
50        libtrace_t *dpdkrecv;
51
52        char *multicastgroup;
53        uint16_t beaconport;
54        char *localiface;
55
56        perthread_t *threaddatas;
57
58} dpdkndag_format_data_t;
59
60#define FORMAT_DATA ((dpdkndag_format_data_t *)libtrace->format_data)
61
62static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
63
64        /* Calculate seq_a - seq_b, taking wraparound into account */
65        if (seq_a == seq_b) return 0;
66
67        if (seq_a > seq_b) {
68                return (int) (seq_a - seq_b);
69        }
70
71        /* -1 for the wrap and another -1 because we don't use zero */
72        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
73}
74
75
76static int dpdkndag_init_input(libtrace_t *libtrace) {
77
78        char *scan = NULL;
79        char *next = NULL;
80        char dpdkuri[1280];
81
82        libtrace->format_data = (dpdkndag_format_data_t *)malloc(
83                        sizeof(dpdkndag_format_data_t));
84
85        FORMAT_DATA->multicastgroup = NULL;
86        FORMAT_DATA->beaconport = 9001;
87        FORMAT_DATA->localiface = NULL;
88        FORMAT_DATA->threaddatas = NULL;
89        FORMAT_DATA->dpdkrecv = NULL;
90
91        scan = strchr(libtrace->uridata, ',');
92        if (scan == NULL) {
93                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
94                        "Bad ndag URI. Should be ndag:<interface>,<multicast group>,<port number>");
95                return -1;
96        }
97        FORMAT_DATA->localiface = strndup(libtrace->uridata,
98                        (size_t)(scan - libtrace->uridata));
99        next = scan + 1;
100
101        scan = strchr(next, ',');
102        if (scan == NULL) {
103                FORMAT_DATA->multicastgroup = strdup(next);
104        } else {
105                FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
106
107                FORMAT_DATA->beaconport = strtoul(scan + 1, NULL, 0);
108        }
109
110        snprintf(dpdkuri, 1279, "dpdk:%s", FORMAT_DATA->localiface);
111        FORMAT_DATA->dpdkrecv = trace_create(dpdkuri);
112
113        if (trace_is_err(FORMAT_DATA->dpdkrecv)) {
114                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
115                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
116                free(libtrace->format_data);
117                libtrace->format_data = NULL;
118                return -1;
119        }
120
121        return 0;
122}
123
124static int dpdkndag_config_input (libtrace_t *libtrace, trace_option_t option,
125                void *data) {
126
127        return dpdk_config_input(FORMAT_DATA->dpdkrecv, option, data);
128}
129
130static int dpdkndag_init_threads(libtrace_t *libtrace, uint32_t maxthreads) {
131
132        uint32_t i;
133        if (FORMAT_DATA->threaddatas == NULL) {
134                FORMAT_DATA->threaddatas = (perthread_t *)malloc(
135                                sizeof(perthread_t) * maxthreads);
136        }
137
138        for (i = 0; i < maxthreads; i++) {
139                FORMAT_DATA->threaddatas[i].capstreams = NULL;
140                FORMAT_DATA->threaddatas[i].streamcount = 0;
141                FORMAT_DATA->threaddatas[i].dropped_upstream = 0;
142                FORMAT_DATA->threaddatas[i].received_packets = 0;
143                FORMAT_DATA->threaddatas[i].missing_records = 0;
144                FORMAT_DATA->threaddatas[i].dpdkstreamdata = NULL;
145                FORMAT_DATA->threaddatas[i].dpdkpkt = trace_create_packet();
146                FORMAT_DATA->threaddatas[i].ndagheader = NULL;
147                FORMAT_DATA->threaddatas[i].nextrec = NULL;
148                FORMAT_DATA->threaddatas[i].burstsize = 0;
149                FORMAT_DATA->threaddatas[i].burstoffset = 0;
150                memset(FORMAT_DATA->threaddatas[i].burstspace, 0,
151                                sizeof(struct rte_mbuf *) * 20);
152        }
153        return maxthreads;
154}
155
156static int dpdkndag_start_input(libtrace_t *libtrace) {
157
158        if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) {
159                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
160                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
161                return -1;
162        }
163
164        dpdkndag_init_threads(libtrace, 1);
165
166        return 0;
167}
168
169static int dpdkndag_pstart_input(libtrace_t *libtrace) {
170
171        FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count;
172        if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) {
173                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
174                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
175                return -1;
176        }
177        dpdkndag_init_threads(libtrace, libtrace->perpkt_thread_count);
178        return 0;
179}
180
181static void clear_threaddata(perthread_t *pt) {
182
183        int i;
184
185        if (pt->dpdkpkt) {
186                trace_destroy_packet(pt->dpdkpkt);
187        }
188        pt->dpdkpkt = NULL;
189
190        if (pt->capstreams) {
191                free(pt->capstreams);
192        }
193
194        for (i = 0; i < 20; i++) {
195                if (pt->burstspace[i]) {
196                        rte_pktmbuf_free(pt->burstspace[i]);
197                }
198        }
199}
200
201static int dpdkndag_pause_input(libtrace_t *libtrace) {
202
203        int i;
204        /* Pause DPDK receive */
205        dpdk_pause_input(FORMAT_DATA->dpdkrecv);
206
207        /* Clear the threaddatas */
208        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
209                clear_threaddata(&(FORMAT_DATA->threaddatas[i]));
210        }
211        return 0;
212}
213
214static int dpdkndag_fin_input(libtrace_t *libtrace) {
215
216        if (FORMAT_DATA->dpdkrecv) {
217                trace_destroy(FORMAT_DATA->dpdkrecv);
218        }
219
220        if (FORMAT_DATA->threaddatas) {
221                free(FORMAT_DATA->threaddatas);
222        }
223
224        if (FORMAT_DATA->localiface) {
225                free(FORMAT_DATA->localiface);
226        }
227
228        if (FORMAT_DATA->multicastgroup) {
229                free(FORMAT_DATA->multicastgroup);
230        }
231
232        free(FORMAT_DATA);
233        return 0;
234}
235
236static int dpdkndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
237                bool reader) {
238
239        perthread_t *pt;
240
241        if (!reader || t->type != THREAD_PERPKT) {
242                return 0;
243        }
244
245        if (dpdk_pregister_thread(FORMAT_DATA->dpdkrecv, t, reader) == -1) {
246                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
247                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
248                return -1;
249        }
250
251        /* t->format_data now contains our dpdk stream data */
252        pt = &(FORMAT_DATA->threaddatas[t->perpkt_num]);
253        pt->dpdkstreamdata = t->format_data;
254        t->format_data = pt;
255
256        return 0;
257}
258
259static void dpdkndag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) {
260
261        dpdk_punregister_thread(libtrace, t);
262}
263
264static void dpdkndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
265                libtrace_stat_t *stat) {
266
267        perthread_t *pt = (perthread_t *)t->format_data;
268
269        if (libtrace == NULL) {
270                return;
271        }
272
273                /* TODO Is this thread safe */
274        stat->dropped_valid = 1;
275        stat->dropped = pt->dropped_upstream;
276
277        stat->received_valid = 1;
278        stat->received = pt->received_packets;
279
280        stat->missing_valid = 1;
281        stat->missing = pt->missing_records;
282}
283
284static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
285        int i;
286
287        stat->dropped_valid = 1;
288        stat->dropped = 0;
289        stat->received_valid = 1;
290        stat->received = 0;
291        stat->missing_valid = 1;
292        stat->missing = 0;
293
294        /* TODO Is this thread safe? */
295        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
296                stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream;
297                stat->received += FORMAT_DATA->threaddatas[i].received_packets;
298                stat->missing += FORMAT_DATA->threaddatas[i].missing_records;
299        }
300}
301
302static int is_ndag_packet(libtrace_packet_t *packet, perthread_t *pt) {
303
304        void *trans = NULL;
305        uint32_t rem = 0;
306        uint8_t proto;
307        char *payload;
308
309        trans = trace_get_transport(packet, &proto, &rem);
310        if (trans == NULL) {
311                return 0;
312        }
313
314        if (proto != TRACE_IPPROTO_UDP) {
315                return 0;
316        }
317
318        payload = (char *)trace_get_payload_from_udp((libtrace_udp_t *)trans,
319                        &rem);
320
321        if (payload == NULL) {
322                return 0;
323        }
324
325        if (rem < 4) {
326                return 0;
327        }
328
329        if (payload[0] == 'N' && payload[1] == 'D' && payload[2] == 'A'
330                        && payload[3] == 'G') {
331                pt->ndagsize = rem;
332                pt->ndagheader = payload;
333                return 1;
334        }
335
336        return 0;
337
338}
339
340static int process_fresh_packet(perthread_t *pt, char *expectedaddr) {
341
342        ndag_common_t *header = (ndag_common_t *)pt->ndagheader;
343        ndag_encap_t *encaphdr = (ndag_encap_t *)(pt->ndagheader +
344                        sizeof(ndag_common_t));
345        uint16_t targetport;
346        char targetaddr[INET6_ADDRSTRLEN];
347        capstream_t *cap = NULL;
348        int i;
349
350        if (header->type != NDAG_PKT_ENCAPERF) {
351                pt->nextrec = NULL;
352                pt->ndagsize = 0;
353                pt->ndagheader = NULL;
354                return 1;
355        }
356
357        /* TODO check for missing records */
358        targetport = trace_get_destination_port(pt->dpdkpkt);
359        if (trace_get_destination_address_string(pt->dpdkpkt, targetaddr,
360                        INET6_ADDRSTRLEN) == NULL) {
361                pt->nextrec = NULL;
362                pt->ndagsize = 0;
363                pt->ndagheader = NULL;
364                return 1;
365        }
366
367        if (strcmp(targetaddr, expectedaddr) != 0) {
368                pt->nextrec = NULL;
369                pt->ndagsize = 0;
370                pt->ndagheader = NULL;
371                return 1;
372        }
373
374        if (pt->streamcount == 0) {
375                pt->capstreams = (capstream_t *)malloc(sizeof(capstream_t));
376                pt->streamcount = 1;
377                pt->capstreams[0].port = targetport;
378                pt->capstreams[0].expectedseq = 0;
379                pt->capstreams[0].recordcount = 0;
380                cap = pt->capstreams;
381
382        } else {
383                for (i = 0; i < pt->streamcount; i++) {
384                        if (pt->capstreams[i].port == targetport) {
385                                cap = (&pt->capstreams[i]);
386                                break;
387                        }
388                }
389
390                if (cap == NULL) {
391                        uint16_t next = pt->streamcount;
392                        pt->capstreams = (capstream_t *)realloc(pt->capstreams,
393                                    sizeof(capstream_t) * (pt->streamcount + 1));
394                        pt->streamcount += 1;
395                        pt->capstreams[next].port = targetport;
396                        pt->capstreams[next].expectedseq = 0;
397                        pt->capstreams[next].recordcount = 0;
398                        cap = &(pt->capstreams[next]);
399                }
400        }
401
402        if (cap->expectedseq != 0) {
403                pt->missing_records += seq_cmp(
404                                ntohl(encaphdr->seqno), cap->expectedseq);
405        }
406        cap->expectedseq = ntohl(encaphdr->seqno) + 1;
407        if (cap->expectedseq == 0) {
408                cap->expectedseq ++;
409        }
410
411        pt->nextrec = ((char *)header) + sizeof(ndag_common_t) +
412                        sizeof(ndag_encap_t);
413
414        return 1;
415}
416
417static int ndagrec_to_libtrace_packet(libtrace_t *libtrace, perthread_t *pt,
418                libtrace_packet_t *packet) {
419
420        /* This is mostly borrowed from ndag_prepare_packet_stream, minus
421         * the ndag socket-specific stuff */
422
423        dag_record_t *erfptr;
424        ndag_encap_t *encaphdr;
425
426        if (pt->nextrec == NULL) {
427                return -1;
428        }
429
430        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
431                return -1;
432        }
433
434        packet->buf_control = TRACE_CTRL_EXTERNAL;
435
436        packet->trace = libtrace;
437        packet->buffer = pt->nextrec;
438        packet->header = pt->nextrec;
439        packet->type = TRACE_RT_DATA_ERF;
440
441        erfptr = (dag_record_t *)packet->header;
442
443        if (erfptr->flags.rxerror == 1) {
444                packet->payload = NULL;
445                erfptr->rlen = htons(erf_get_framing_length(packet));
446        } else {
447                packet->payload = (char *)packet->buffer +
448                                erf_get_framing_length(packet);
449        }
450
451        /* Update upstream drops using lctr */
452
453        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
454                /* TODO */
455        } else {
456                if (pt->received_packets > 0) {
457                        pt->dropped_upstream += ntohs(erfptr->lctr);
458                }
459        }
460
461        pt->received_packets ++;
462        encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t));
463
464        if ((ntohs(encaphdr->recordcount) & 0x8000) != 0) {
465                /* Record was truncated */
466                erfptr->rlen = htons(pt->ndagsize - (pt->nextrec -
467                                pt->ndagheader));
468        }
469
470        pt->nextrec += ntohs(erfptr->rlen);
471
472        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
473                pt->ndagheader = NULL;
474                pt->nextrec = NULL;
475                pt->ndagsize = 0;
476        }
477
478        packet->order = erf_get_erf_timestamp(packet);
479        packet->error = packet->payload ? ntohs(erfptr->rlen) :
480                        erf_get_framing_length(packet);
481        return ntohs(erfptr->rlen);
482}
483
484static int dpdkndag_pread_packets(libtrace_t *libtrace,
485                                    libtrace_thread_t *t,
486                                    libtrace_packet_t **packets,
487                                    size_t nb_packets) {
488
489        perthread_t *pt = (perthread_t *)t->format_data;
490        size_t read_packets = 0;
491        int ret;
492
493        while (pt->nextrec == NULL) {
494                trace_fin_packet(pt->dpdkpkt);
495
496                if (pt->burstsize > 0 && pt->burstsize != pt->burstoffset) {
497                        pt->dpdkpkt->buffer = pt->burstspace[pt->burstoffset];
498                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
499                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
500                                        pt->dpdkpkt->buffer,
501                                        TRACE_RT_DATA_DPDK, 0);
502                        pt->burstoffset ++;
503                } else {
504                        ret = dpdk_read_packet_stream(FORMAT_DATA->dpdkrecv,
505                                        pt->dpdkstreamdata,
506                                        &t->messages,
507                                        pt->burstspace,
508                                        20);
509                        if (ret <= 0) {
510                                return ret;
511                        }
512                        pt->dpdkpkt->buffer = pt->burstspace[0];
513                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
514                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
515                                        pt->dpdkpkt->buffer,
516                                        TRACE_RT_DATA_DPDK, 0);
517                        pt->burstsize = ret;
518                        pt->burstoffset = 1;
519                }
520
521                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
522                        continue;
523                }
524
525                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
526                if (ret <= 0) {
527                        return ret;
528                }
529        }
530
531        while (pt->nextrec != NULL) {
532                if (read_packets == nb_packets) {
533                        break;
534                }
535
536                if (packets[read_packets]->buf_control == TRACE_CTRL_PACKET) {
537                        free(packets[read_packets]->buffer);
538                        packets[read_packets]->buffer = NULL;
539                }
540                ret = ndagrec_to_libtrace_packet(libtrace, pt,
541                                packets[read_packets]);
542                if (ret < 0) {
543                        return ret;
544                }
545                read_packets ++;
546
547        }
548
549        return read_packets;
550}
551
552static int dpdkndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
553
554        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
555        int ret;
556
557        if (packet->buf_control == TRACE_CTRL_PACKET) {
558                free(packet->buffer);
559                packet->buffer = NULL;
560        }
561
562        while (pt->nextrec == NULL) {
563                trace_fin_packet(pt->dpdkpkt);
564
565                ret = dpdk_read_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt);
566                if (ret <= 0) {
567                        return ret;
568                }
569
570                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
571                        continue;
572                }
573
574                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
575                if (ret <= 0) {
576                        return ret;
577                }
578        }
579
580        return ndagrec_to_libtrace_packet(libtrace, pt, packet);
581}
582
583static libtrace_eventobj_t trace_event_dpdkndag(libtrace_t *libtrace,
584                libtrace_packet_t *packet) {
585
586
587        libtrace_eventobj_t event;
588        int ret;
589        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
590
591        if (packet->buf_control == TRACE_CTRL_PACKET) {
592                free(packet->buffer);
593                packet->buffer = NULL;
594        }
595
596        while (pt->nextrec == NULL) {
597
598                event = dpdk_trace_event(libtrace, pt->dpdkpkt);
599
600                if (event.type != TRACE_EVENT_PACKET) {
601                        return event;
602                }
603
604                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
605                        continue;
606                }
607
608                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
609                if (ret <= 0) {
610                        event.type = TRACE_EVENT_TERMINATE;
611                        return event;
612                }
613        }
614
615        ret = ndagrec_to_libtrace_packet(libtrace, pt, packet);
616        if (ret < 0) {
617                event.type = TRACE_EVENT_TERMINATE;
618        } else {
619                event.type = TRACE_EVENT_PACKET;
620                event.size = 1;
621        }
622        return event;
623}
624
625static struct libtrace_format_t dpdkndag = {
626
627        "dpdkndag",
628        "",
629        TRACE_FORMAT_DPDK_NDAG,
630        NULL,                   /* probe filename */
631        NULL,                   /* probe magic */
632        dpdkndag_init_input,        /* init_input */
633        dpdkndag_config_input,      /* config_input */
634        dpdkndag_start_input,       /* start_input */
635        dpdkndag_pause_input,       /* pause_input */
636        NULL,                   /* init_output */
637        NULL,                   /* config_output */
638        NULL,                   /* start_output */
639        dpdkndag_fin_input,         /* fin_input */
640        NULL,                   /* fin_output */
641        dpdkndag_read_packet,   /* read_packet */
642        NULL,                   /* prepare_packet */
643        NULL,                   /* fin_packet */
644        NULL,                   /* write_packet */
645        erf_get_link_type,      /* get_link_type */
646        erf_get_direction,      /* get_direction */
647        erf_set_direction,      /* set_direction */
648        erf_get_erf_timestamp,  /* get_erf_timestamp */
649        NULL,                   /* get_timeval */
650        NULL,                   /* get_seconds */
651        NULL,                   /* get_timespec */
652        NULL,                   /* seek_erf */
653        NULL,                   /* seek_timeval */
654        NULL,                   /* seek_seconds */
655        erf_get_capture_length, /* get_capture_length */
656        erf_get_wire_length,    /* get_wire_length */
657        erf_get_framing_length, /* get_framing_length */
658        erf_set_capture_length, /* set_capture_length */
659        NULL,                   /* get_received_packets */
660        NULL,                   /* get_filtered_packets */
661        NULL,                   /* get_dropped_packets */
662        dpdkndag_get_statistics,    /* get_statistics */
663        NULL,                   /* get_fd */
664        trace_event_dpdkndag,       /* trace_event */
665        NULL,                   /* help */
666        NULL,                   /* next pointer */
667        {true, 0},              /* live packet capture */
668        dpdkndag_pstart_input,      /* parallel start */
669        dpdkndag_pread_packets,     /* parallel read */
670        dpdkndag_pause_input,       /* parallel pause */
671        NULL,
672        dpdkndag_pregister_thread,  /* register thread */
673        dpdkndag_punregister_thread,
674        dpdkndag_get_thread_stats   /* per-thread stats */
675};
676
677void dpdkndag_constructor(void) {
678        register_format(&dpdkndag);
679}
Note: See TracBrowser for help on using the repository browser.