source: lib/format_dpdkndag.c @ e54bd5f

cachetimestampsdevelopdpdk-ndagetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since e54bd5f was e54bd5f, checked in by Shane Alcock <salcock@…>, 3 years ago

Tweaks to dpdkndag: format

  • Set snap length for dpdk receiver to 9K so that we can receive jumbo frames.
  • Enforce a unidirectional hasher on the dpdk receiver, just to ensure ndag streams don't get spread across multiple CPUs.
  • Add a mutex around any statistic updates to keep helgrind happy.
  • Increase number of mbufs used from 20 to 40.
  • Dropped packets by the dpdk receiver are included in stats as 'errors'.
  • Property mode set to 100644
File size: 23.1 KB
Line 
1
2#include "config.h"
3#include "common.h"
4#include "libtrace.h"
5#include "libtrace_int.h"
6#include "format_helper.h"
7#include "format_erf.h"
8
9#include <assert.h>
10#include <errno.h>
11#include <fcntl.h>
12#include <stdio.h>
13#include <string.h>
14#include <unistd.h>
15#include <stdlib.h>
16#include <sys/types.h>
17#include <sys/socket.h>
18#include <netdb.h>
19
20#include "format_dpdk.h"
21#include "format_ndag.h"
22
23static struct libtrace_format_t dpdkndag;
24
25typedef struct capstream {
26
27        uint16_t port;
28        uint32_t expectedseq;
29        uint64_t recordcount;
30} capstream_t;
31
32typedef struct perthread {
33        capstream_t *capstreams;
34        uint16_t streamcount;
35        uint64_t dropped_upstream;
36        uint64_t missing_records;
37        uint64_t received_packets;
38
39        libtrace_packet_t *dpdkpkt;
40        char *ndagheader;
41        char *nextrec;
42        uint32_t ndagsize;
43
44        pthread_mutex_t ndag_lock;
45        dpdk_per_stream_t *dpdkstreamdata;
46        int burstsize;
47        int burstoffset;
48        struct rte_mbuf* burstspace[40];
49
50} perthread_t;
51
52
53typedef struct dpdkndag_format_data {
54        libtrace_t *dpdkrecv;
55
56        struct addrinfo *multicastgroup;
57        char *localiface;
58
59        perthread_t *threaddatas;
60
61} dpdkndag_format_data_t;
62
63#define FORMAT_DATA ((dpdkndag_format_data_t *)libtrace->format_data)
64
65static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
66
67        /* Calculate seq_a - seq_b, taking wraparound into account */
68        if (seq_a == seq_b) return 0;
69
70        if (seq_a > seq_b) {
71                return (int) (seq_a - seq_b);
72        }
73
74        /* -1 for the wrap and another -1 because we don't use zero */
75        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
76}
77
78
79static int dpdkndag_init_input(libtrace_t *libtrace) {
80
81        char *scan = NULL;
82        char *next = NULL;
83        char dpdkuri[1280];
84        struct addrinfo hints, *result;
85
86        libtrace->format_data = (dpdkndag_format_data_t *)malloc(
87                        sizeof(dpdkndag_format_data_t));
88
89        FORMAT_DATA->localiface = NULL;
90        FORMAT_DATA->threaddatas = NULL;
91        FORMAT_DATA->dpdkrecv = NULL;
92
93        scan = strchr(libtrace->uridata, ',');
94        if (scan == NULL) {
95                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
96                        "Bad dpdkndag URI. Should be dpdkndag:<interface>,<multicast group>");
97                return -1;
98        }
99        FORMAT_DATA->localiface = strndup(libtrace->uridata,
100                        (size_t)(scan - libtrace->uridata));
101        next = scan + 1;
102
103        memset(&hints, 0, sizeof(struct addrinfo));
104        hints.ai_family = AF_UNSPEC;
105        hints.ai_socktype = SOCK_DGRAM;
106        hints.ai_flags = AI_PASSIVE;
107        hints.ai_protocol = 0;
108
109        if (getaddrinfo(next, NULL, &hints, &result) != 0) {
110                perror("getaddrinfo");
111                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
112                        "Invalid multicast address: %s", next);
113                return -1;
114        }
115
116        FORMAT_DATA->multicastgroup = result;
117
118        snprintf(dpdkuri, 1279, "dpdk:%s", FORMAT_DATA->localiface);
119        FORMAT_DATA->dpdkrecv = trace_create(dpdkuri);
120
121        if (trace_is_err(FORMAT_DATA->dpdkrecv)) {
122                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
123                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
124                free(libtrace->format_data);
125                libtrace->format_data = NULL;
126                return -1;
127        }
128
129        return 0;
130}
131
132static int dpdkndag_config_input (libtrace_t *libtrace, trace_option_t option,
133                void *data) {
134
135        return dpdk_config_input(FORMAT_DATA->dpdkrecv, option, data);
136}
137
138static int dpdkndag_init_threads(libtrace_t *libtrace, uint32_t maxthreads) {
139
140        uint32_t i;
141        if (FORMAT_DATA->threaddatas == NULL) {
142                FORMAT_DATA->threaddatas = (perthread_t *)malloc(
143                                sizeof(perthread_t) * maxthreads);
144        }
145
146        for (i = 0; i < maxthreads; i++) {
147                FORMAT_DATA->threaddatas[i].capstreams = NULL;
148                FORMAT_DATA->threaddatas[i].streamcount = 0;
149                FORMAT_DATA->threaddatas[i].dropped_upstream = 0;
150                FORMAT_DATA->threaddatas[i].received_packets = 0;
151                FORMAT_DATA->threaddatas[i].missing_records = 0;
152                FORMAT_DATA->threaddatas[i].dpdkstreamdata = NULL;
153                FORMAT_DATA->threaddatas[i].dpdkpkt = trace_create_packet();
154                FORMAT_DATA->threaddatas[i].ndagheader = NULL;
155                FORMAT_DATA->threaddatas[i].nextrec = NULL;
156                FORMAT_DATA->threaddatas[i].burstsize = 0;
157                FORMAT_DATA->threaddatas[i].burstoffset = 0;
158                memset(FORMAT_DATA->threaddatas[i].burstspace, 0,
159                                sizeof(struct rte_mbuf *) * 40);
160                pthread_mutex_init(&(FORMAT_DATA->threaddatas[i].ndag_lock),
161                                NULL);
162        }
163        return maxthreads;
164}
165
166static int dpdkndag_start_input(libtrace_t *libtrace) {
167        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
168        int snaplen = 9000;
169
170        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
171                                &hash) == -1) {
172                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
173                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
174                return -1;
175        }
176
177        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
178                                &snaplen) == -1) {
179                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
180                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
181                return -1;
182        }
183
184        if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) {
185                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
186                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
187                return -1;
188        }
189
190        dpdkndag_init_threads(libtrace, 1);
191
192        return 0;
193}
194
195static int dpdkndag_pstart_input(libtrace_t *libtrace) {
196
197        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
198        int snaplen = 9000;
199        FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count;
200        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
201                                &hash) == -1) {
202                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
203                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
204                return -1;
205        }
206
207        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
208                                &snaplen) == -1) {
209                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
210                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
211                return -1;
212        }
213        if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) {
214                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
215                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
216                return -1;
217        }
218        dpdkndag_init_threads(libtrace, libtrace->perpkt_thread_count);
219        return 0;
220}
221
222static void clear_threaddata(perthread_t *pt) {
223
224        int i;
225
226        if (pt->dpdkpkt) {
227                trace_destroy_packet(pt->dpdkpkt);
228        }
229        pt->dpdkpkt = NULL;
230
231        if (pt->capstreams) {
232                free(pt->capstreams);
233        }
234
235        for (i = 0; i < 40; i++) {
236                if (pt->burstspace[i]) {
237                        rte_pktmbuf_free(pt->burstspace[i]);
238                }
239        }
240        pthread_mutex_destroy(&(pt->ndag_lock));
241}
242
243static int dpdkndag_pause_input(libtrace_t *libtrace) {
244
245        int i;
246        /* Pause DPDK receive */
247        dpdk_pause_input(FORMAT_DATA->dpdkrecv);
248
249        /* Clear the threaddatas */
250        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
251                clear_threaddata(&(FORMAT_DATA->threaddatas[i]));
252        }
253        return 0;
254}
255
256static int dpdkndag_fin_input(libtrace_t *libtrace) {
257
258        if (FORMAT_DATA->dpdkrecv) {
259                trace_destroy(FORMAT_DATA->dpdkrecv);
260        }
261
262        if (FORMAT_DATA->threaddatas) {
263                free(FORMAT_DATA->threaddatas);
264        }
265
266        if (FORMAT_DATA->localiface) {
267                free(FORMAT_DATA->localiface);
268        }
269
270        if (FORMAT_DATA->multicastgroup) {
271                freeaddrinfo(FORMAT_DATA->multicastgroup);
272        }
273
274        free(FORMAT_DATA);
275        return 0;
276}
277
278static int dpdkndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
279                bool reader) {
280
281        perthread_t *pt;
282
283        if (!reader || t->type != THREAD_PERPKT) {
284                return 0;
285        }
286
287        if (dpdk_pregister_thread(FORMAT_DATA->dpdkrecv, t, reader) == -1) {
288                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
289                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
290                return -1;
291        }
292
293        /* t->format_data now contains our dpdk stream data */
294        pt = &(FORMAT_DATA->threaddatas[t->perpkt_num]);
295        pt->dpdkstreamdata = t->format_data;
296        t->format_data = pt;
297
298        return 0;
299}
300
301static void dpdkndag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) {
302
303        dpdk_punregister_thread(libtrace, t);
304}
305
306static void dpdkndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
307                libtrace_stat_t *stat) {
308
309        perthread_t *pt = (perthread_t *)t->format_data;
310
311        if (libtrace == NULL) {
312                return;
313        }
314
315                /* TODO Is this thread safe */
316        stat->dropped_valid = 1;
317        stat->dropped = pt->dropped_upstream;
318
319        stat->received_valid = 1;
320        stat->received = pt->received_packets;
321
322        stat->missing_valid = 1;
323        stat->missing = pt->missing_records;
324}
325
326static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
327        int i;
328
329        libtrace_stat_t *dpdkstat;
330
331        stat->dropped_valid = 1;
332        stat->dropped = 0;
333        stat->received_valid = 1;
334        stat->received = 0;
335        stat->missing_valid = 1;
336        stat->missing = 0;
337
338        dpdkstat = trace_create_statistics();
339        dpdk_get_stats(FORMAT_DATA->dpdkrecv, dpdkstat);
340
341        if (dpdkstat->dropped_valid) {
342                stat->errors_valid = 1;
343                stat->errors = dpdkstat->dropped;
344        }
345
346        /* TODO Is this thread safe? */
347        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
348                pthread_mutex_lock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
349                stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream;
350                stat->received += FORMAT_DATA->threaddatas[i].received_packets;
351                stat->missing += FORMAT_DATA->threaddatas[i].missing_records;
352                pthread_mutex_unlock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
353        }
354        free(dpdkstat);
355}
356
357static int is_ndag_packet(libtrace_packet_t *packet, perthread_t *pt) {
358
359        void *trans = NULL;
360        uint32_t rem = 0;
361        uint8_t proto;
362        char *payload;
363
364        trans = trace_get_transport(packet, &proto, &rem);
365        if (trans == NULL) {
366                return 0;
367        }
368
369        if (proto != TRACE_IPPROTO_UDP) {
370                return 0;
371        }
372
373        payload = (char *)trace_get_payload_from_udp((libtrace_udp_t *)trans,
374                        &rem);
375
376        if (payload == NULL) {
377                return 0;
378        }
379
380        if (rem < 4) {
381                return 0;
382        }
383
384        if (payload[0] == 'N' && payload[1] == 'D' && payload[2] == 'A'
385                        && payload[3] == 'G') {
386                pt->ndagsize = rem;
387                pt->ndagheader = payload;
388                return 1;
389        }
390
391        return 0;
392
393}
394
395static int sockaddr_same(struct sockaddr *a, struct sockaddr *b) {
396
397        if (a->sa_family != b->sa_family) {
398                return 0;
399        }
400
401        if (a->sa_family == AF_INET) {
402                struct sockaddr_in *ain = (struct sockaddr_in *)a;
403                struct sockaddr_in *bin = (struct sockaddr_in *)b;
404
405                if (ain->sin_addr.s_addr != bin->sin_addr.s_addr) {
406                        return 0;
407                }
408                return 1;
409        } else if (a->sa_family == AF_INET6) {
410                struct sockaddr_in6 *ain6 = (struct sockaddr_in6 *)a;
411                struct sockaddr_in6 *bin6 = (struct sockaddr_in6 *)b;
412
413                if (memcmp(ain6->sin6_addr.s6_addr, bin6->sin6_addr.s6_addr,
414                                sizeof(ain6->sin6_addr.s6_addr)) != 0) {
415                        return 0;
416                }
417                return 1;
418        }
419        return 0;
420}
421
422static int process_fresh_packet(perthread_t *pt, struct addrinfo *expectedaddr) {
423
424        ndag_common_t *header = (ndag_common_t *)pt->ndagheader;
425        ndag_encap_t *encaphdr = (ndag_encap_t *)(pt->ndagheader +
426                        sizeof(ndag_common_t));
427        uint16_t targetport;
428        struct sockaddr_storage targetaddr;
429        struct sockaddr *p;
430        capstream_t *cap = NULL;
431        int i;
432
433        memset((&targetaddr), 0, sizeof(targetaddr));
434        if (header->type != NDAG_PKT_ENCAPERF) {
435                pt->nextrec = NULL;
436                pt->ndagsize = 0;
437                pt->ndagheader = NULL;
438                return 1;
439        }
440
441        if ((p = trace_get_destination_address(pt->dpdkpkt,
442                        (struct sockaddr *)(&targetaddr))) == NULL) {
443                pt->nextrec = NULL;
444                pt->ndagsize = 0;
445                pt->ndagheader = NULL;
446                return 1;
447        }
448
449        if (!(sockaddr_same(p, expectedaddr->ai_addr))) {
450                pt->nextrec = NULL;
451                pt->ndagsize = 0;
452                pt->ndagheader = NULL;
453                return 1;
454        }
455
456        targetport = trace_get_destination_port(pt->dpdkpkt);
457        if (pt->streamcount == 0) {
458                pt->capstreams = (capstream_t *)malloc(sizeof(capstream_t));
459                pt->streamcount = 1;
460                pt->capstreams[0].port = targetport;
461                pt->capstreams[0].expectedseq = 0;
462                pt->capstreams[0].recordcount = 0;
463                cap = pt->capstreams;
464
465        } else {
466                for (i = 0; i < pt->streamcount; i++) {
467                        if (pt->capstreams[i].port == targetport) {
468                                cap = (&pt->capstreams[i]);
469                                break;
470                        }
471                }
472
473                if (cap == NULL) {
474                        uint16_t next = pt->streamcount;
475                        pt->capstreams = (capstream_t *)realloc(pt->capstreams,
476                                    sizeof(capstream_t) * (pt->streamcount + 1));
477                        pt->streamcount += 1;
478                        pt->capstreams[next].port = targetport;
479                        pt->capstreams[next].expectedseq = 0;
480                        pt->capstreams[next].recordcount = 0;
481                        cap = &(pt->capstreams[next]);
482                }
483        }
484        if (cap->expectedseq != 0) {
485                pthread_mutex_lock(&pt->ndag_lock);
486                pt->missing_records += seq_cmp(
487                                ntohl(encaphdr->seqno), cap->expectedseq);
488                pthread_mutex_unlock(&pt->ndag_lock);
489        }
490        cap->expectedseq = ntohl(encaphdr->seqno) + 1;
491        if (cap->expectedseq == 0) {
492                cap->expectedseq ++;
493        }
494        cap->recordcount ++;
495
496        pt->nextrec = ((char *)header) + sizeof(ndag_common_t) +
497                        sizeof(ndag_encap_t);
498
499        return 1;
500}
501
502static int ndagrec_to_libtrace_packet(libtrace_t *libtrace, perthread_t *pt,
503                libtrace_packet_t *packet) {
504
505        /* This is mostly borrowed from ndag_prepare_packet_stream, minus
506         * the ndag socket-specific stuff */
507
508        dag_record_t *erfptr;
509        ndag_encap_t *encaphdr;
510
511        if (pt->nextrec == NULL) {
512                return -1;
513        }
514
515        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
516                return -1;
517        }
518
519        packet->buf_control = TRACE_CTRL_EXTERNAL;
520
521        packet->trace = libtrace;
522        packet->buffer = pt->nextrec;
523        packet->header = pt->nextrec;
524        packet->type = TRACE_RT_DATA_ERF;
525
526        erfptr = (dag_record_t *)packet->header;
527
528        if (erfptr->flags.rxerror == 1) {
529                packet->payload = NULL;
530                erfptr->rlen = htons(erf_get_framing_length(packet));
531        } else {
532                packet->payload = (char *)packet->buffer +
533                                erf_get_framing_length(packet);
534        }
535
536        /* Update upstream drops using lctr */
537
538        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
539                /* TODO */
540        } else {
541                pthread_mutex_lock(&(pt->ndag_lock));
542                if (pt->received_packets > 0) {
543                        pt->dropped_upstream += ntohs(erfptr->lctr);
544                }
545                pthread_mutex_unlock(&(pt->ndag_lock));
546        }
547
548        pthread_mutex_lock(&(pt->ndag_lock));
549        pt->received_packets ++;
550        pthread_mutex_unlock(&(pt->ndag_lock));
551        encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t));
552
553        if ((ntohs(encaphdr->recordcount) & 0x8000) != 0) {
554                /* Record was truncated */
555                erfptr->rlen = htons(pt->ndagsize - (pt->nextrec -
556                                pt->ndagheader));
557        }
558
559        pt->nextrec += ntohs(erfptr->rlen);
560
561        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
562                pt->ndagheader = NULL;
563                pt->nextrec = NULL;
564                pt->ndagsize = 0;
565        }
566
567        packet->order = erf_get_erf_timestamp(packet);
568        packet->error = packet->payload ? ntohs(erfptr->rlen) :
569                        erf_get_framing_length(packet);
570        return ntohs(erfptr->rlen);
571}
572
573static int dpdkndag_pread_packets(libtrace_t *libtrace,
574                                    libtrace_thread_t *t,
575                                    libtrace_packet_t **packets,
576                                    size_t nb_packets) {
577
578        perthread_t *pt = (perthread_t *)t->format_data;
579        size_t read_packets = 0;
580        int ret;
581
582        while (pt->nextrec == NULL) {
583                trace_fin_packet(pt->dpdkpkt);
584
585                if (pt->burstsize > 0 && pt->burstsize != pt->burstoffset) {
586                        pt->dpdkpkt->buffer = pt->burstspace[pt->burstoffset];
587                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
588                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
589                                        pt->dpdkpkt->buffer,
590                                        TRACE_RT_DATA_DPDK, 0);
591                        pt->burstoffset ++;
592                } else {
593                        ret = dpdk_read_packet_stream(FORMAT_DATA->dpdkrecv,
594                                        pt->dpdkstreamdata,
595                                        &t->messages,
596                                        pt->burstspace,
597                                        40);
598                        if (ret <= 0) {
599                                return ret;
600                        }
601
602                        pt->dpdkpkt->buffer = pt->burstspace[0];
603                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
604                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
605                                        pt->dpdkpkt->buffer,
606                                        TRACE_RT_DATA_DPDK, 0);
607                        pt->burstsize = ret;
608                        pt->burstoffset = 1;
609                }
610
611                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
612                        continue;
613                }
614
615                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
616                if (ret <= 0) {
617                        return ret;
618                }
619        }
620
621        while (pt->nextrec != NULL) {
622                if (read_packets == nb_packets) {
623                        break;
624                }
625
626                if (packets[read_packets]->buf_control == TRACE_CTRL_PACKET) {
627                        free(packets[read_packets]->buffer);
628                        packets[read_packets]->buffer = NULL;
629                }
630                ret = ndagrec_to_libtrace_packet(libtrace, pt,
631                                packets[read_packets]);
632                if (ret < 0) {
633                        return ret;
634                }
635                read_packets ++;
636
637        }
638
639        return read_packets;
640}
641
642static int dpdkndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
643
644        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
645        int ret;
646
647        if (packet->buf_control == TRACE_CTRL_PACKET) {
648                free(packet->buffer);
649                packet->buffer = NULL;
650        }
651
652        while (pt->nextrec == NULL) {
653                trace_fin_packet(pt->dpdkpkt);
654
655                ret = dpdk_read_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt);
656                if (ret <= 0) {
657                        return ret;
658                }
659
660                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
661                        continue;
662                }
663
664                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
665                if (ret <= 0) {
666                        return ret;
667                }
668        }
669
670        return ndagrec_to_libtrace_packet(libtrace, pt, packet);
671}
672
673static libtrace_eventobj_t trace_event_dpdkndag(libtrace_t *libtrace,
674                libtrace_packet_t *packet) {
675
676
677        libtrace_eventobj_t event;
678        int ret;
679        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
680
681        if (packet->buf_control == TRACE_CTRL_PACKET) {
682                free(packet->buffer);
683                packet->buffer = NULL;
684        }
685
686        while (pt->nextrec == NULL) {
687
688                event = dpdk_trace_event(libtrace, pt->dpdkpkt);
689
690                if (event.type != TRACE_EVENT_PACKET) {
691                        return event;
692                }
693
694                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
695                        continue;
696                }
697
698                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
699                if (ret <= 0) {
700                        event.type = TRACE_EVENT_TERMINATE;
701                        return event;
702                }
703        }
704
705        ret = ndagrec_to_libtrace_packet(libtrace, pt, packet);
706        if (ret < 0) {
707                event.type = TRACE_EVENT_TERMINATE;
708        } else {
709                event.type = TRACE_EVENT_PACKET;
710                event.size = 1;
711        }
712        return event;
713}
714
715static struct libtrace_format_t dpdkndag = {
716
717        "dpdkndag",
718        "",
719        TRACE_FORMAT_DPDK_NDAG,
720        NULL,                   /* probe filename */
721        NULL,                   /* probe magic */
722        dpdkndag_init_input,        /* init_input */
723        dpdkndag_config_input,      /* config_input */
724        dpdkndag_start_input,       /* start_input */
725        dpdkndag_pause_input,       /* pause_input */
726        NULL,                   /* init_output */
727        NULL,                   /* config_output */
728        NULL,                   /* start_output */
729        dpdkndag_fin_input,         /* fin_input */
730        NULL,                   /* fin_output */
731        dpdkndag_read_packet,   /* read_packet */
732        NULL,                   /* prepare_packet */
733        NULL,                   /* fin_packet */
734        NULL,                   /* write_packet */
735        erf_get_link_type,      /* get_link_type */
736        erf_get_direction,      /* get_direction */
737        erf_set_direction,      /* set_direction */
738        erf_get_erf_timestamp,  /* get_erf_timestamp */
739        NULL,                   /* get_timeval */
740        NULL,                   /* get_seconds */
741        NULL,                   /* get_timespec */
742        NULL,                   /* seek_erf */
743        NULL,                   /* seek_timeval */
744        NULL,                   /* seek_seconds */
745        erf_get_capture_length, /* get_capture_length */
746        erf_get_wire_length,    /* get_wire_length */
747        erf_get_framing_length, /* get_framing_length */
748        erf_set_capture_length, /* set_capture_length */
749        NULL,                   /* get_received_packets */
750        NULL,                   /* get_filtered_packets */
751        NULL,                   /* get_dropped_packets */
752        dpdkndag_get_statistics,    /* get_statistics */
753        NULL,                   /* get_fd */
754        trace_event_dpdkndag,       /* trace_event */
755        NULL,                   /* help */
756        NULL,                   /* next pointer */
757        {true, 0},              /* live packet capture */
758        dpdkndag_pstart_input,      /* parallel start */
759        dpdkndag_pread_packets,     /* parallel read */
760        dpdkndag_pause_input,       /* parallel pause */
761        NULL,
762        dpdkndag_pregister_thread,  /* register thread */
763        dpdkndag_punregister_thread,
764        dpdkndag_get_thread_stats   /* per-thread stats */
765};
766
767void dpdkndag_constructor(void) {
768        register_format(&dpdkndag);
769}
Note: See TracBrowser for help on using the repository browser.