source: lib/format_dpdkndag.c @ 32ee9b2

cachetimestampsdeveloprc-4.0.4ringdecrementfixringperformance
Last change on this file since 32ee9b2 was 32ee9b2, checked in by Shane Alcock <salcock@…>, 2 years ago

Add new trace_flush_output() to public API

Can be used to force a libtrace output to dump any buffered output
to disk immediately.

Note that if the file is compressed or the output trace format
requires a trailer, the flushed file will still not be properly
readable afterwards as this will not result in any trailers
being written. You'll still have to close the file for that.

Mainly this is useful for ensuring that output file sizes grow
over time in situations where the amount of output is relatively
small, rather than staying stuck at 0 bytes until we either reach
1MB of output or the file is closed. For instance, you could have
a timer that calls trace_flush_output() every 30 seconds so that
the output file size will grow if any packets were written in the
last 30 seconds.

  • Property mode set to 100644
File size: 23.2 KB
Line 
1
2#include "config.h"
3#include "common.h"
4#include "libtrace.h"
5#include "libtrace_int.h"
6#include "format_helper.h"
7#include "format_erf.h"
8
9#include <assert.h>
10#include <errno.h>
11#include <fcntl.h>
12#include <stdio.h>
13#include <string.h>
14#include <unistd.h>
15#include <stdlib.h>
16#include <sys/types.h>
17#include <sys/socket.h>
18#include <netdb.h>
19
20#include "format_dpdk.h"
21#include "format_ndag.h"
22
23static struct libtrace_format_t dpdkndag;
24
25typedef struct capstream {
26
27        uint16_t port;
28        uint32_t expectedseq;
29        uint64_t recordcount;
30} capstream_t;
31
32typedef struct perthread {
33        capstream_t *capstreams;
34        uint16_t streamcount;
35        uint64_t dropped_upstream;
36        uint64_t missing_records;
37        uint64_t received_packets;
38
39        libtrace_packet_t *dpdkpkt;
40        char *ndagheader;
41        char *nextrec;
42        uint32_t ndagsize;
43
44        pthread_mutex_t ndag_lock;
45        dpdk_per_stream_t *dpdkstreamdata;
46        int burstsize;
47        int burstoffset;
48        struct rte_mbuf* burstspace[40];
49
50} perthread_t;
51
52
53typedef struct dpdkndag_format_data {
54        libtrace_t *dpdkrecv;
55
56        struct addrinfo *multicastgroup;
57        char *localiface;
58
59        perthread_t *threaddatas;
60
61} dpdkndag_format_data_t;
62
63#define FORMAT_DATA ((dpdkndag_format_data_t *)libtrace->format_data)
64
65static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
66
67        /* Calculate seq_a - seq_b, taking wraparound into account */
68        if (seq_a == seq_b) return 0;
69
70        if (seq_a > seq_b) {
71                return (int) (seq_a - seq_b);
72        }
73
74        /* -1 for the wrap and another -1 because we don't use zero */
75        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
76}
77
78
79static int dpdkndag_init_input(libtrace_t *libtrace) {
80
81        char *scan = NULL;
82        char *next = NULL;
83        char dpdkuri[1280];
84        struct addrinfo hints, *result;
85
86        libtrace->format_data = (dpdkndag_format_data_t *)malloc(
87                        sizeof(dpdkndag_format_data_t));
88
89        FORMAT_DATA->localiface = NULL;
90        FORMAT_DATA->threaddatas = NULL;
91        FORMAT_DATA->dpdkrecv = NULL;
92
93        scan = strchr(libtrace->uridata, ',');
94        if (scan == NULL) {
95                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
96                        "Bad dpdkndag URI. Should be dpdkndag:<interface>,<multicast group>");
97                return -1;
98        }
99        FORMAT_DATA->localiface = strndup(libtrace->uridata,
100                        (size_t)(scan - libtrace->uridata));
101        next = scan + 1;
102
103        memset(&hints, 0, sizeof(struct addrinfo));
104        hints.ai_family = AF_UNSPEC;
105        hints.ai_socktype = SOCK_DGRAM;
106        hints.ai_flags = AI_PASSIVE;
107        hints.ai_protocol = 0;
108
109        if (getaddrinfo(next, NULL, &hints, &result) != 0) {
110                perror("getaddrinfo");
111                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
112                        "Invalid multicast address: %s", next);
113                return -1;
114        }
115
116        FORMAT_DATA->multicastgroup = result;
117
118        snprintf(dpdkuri, 1279, "dpdk:%s", FORMAT_DATA->localiface);
119        FORMAT_DATA->dpdkrecv = trace_create(dpdkuri);
120
121        if (trace_is_err(FORMAT_DATA->dpdkrecv)) {
122                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
123                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
124                free(libtrace->format_data);
125                libtrace->format_data = NULL;
126                return -1;
127        }
128
129        return 0;
130}
131
132static int dpdkndag_config_input (libtrace_t *libtrace, trace_option_t option,
133                void *data) {
134
135        return dpdk_config_input(FORMAT_DATA->dpdkrecv, option, data);
136}
137
138static int dpdkndag_init_threads(libtrace_t *libtrace, uint32_t maxthreads) {
139
140        uint32_t i;
141        if (FORMAT_DATA->threaddatas == NULL) {
142                FORMAT_DATA->threaddatas = (perthread_t *)malloc(
143                                sizeof(perthread_t) * maxthreads);
144        }
145
146        for (i = 0; i < maxthreads; i++) {
147                FORMAT_DATA->threaddatas[i].capstreams = NULL;
148                FORMAT_DATA->threaddatas[i].streamcount = 0;
149                FORMAT_DATA->threaddatas[i].dropped_upstream = 0;
150                FORMAT_DATA->threaddatas[i].received_packets = 0;
151                FORMAT_DATA->threaddatas[i].missing_records = 0;
152                FORMAT_DATA->threaddatas[i].dpdkstreamdata = NULL;
153                FORMAT_DATA->threaddatas[i].dpdkpkt = trace_create_packet();
154                FORMAT_DATA->threaddatas[i].ndagheader = NULL;
155                FORMAT_DATA->threaddatas[i].nextrec = NULL;
156                FORMAT_DATA->threaddatas[i].burstsize = 0;
157                FORMAT_DATA->threaddatas[i].burstoffset = 0;
158                memset(FORMAT_DATA->threaddatas[i].burstspace, 0,
159                                sizeof(struct rte_mbuf *) * 40);
160                pthread_mutex_init(&(FORMAT_DATA->threaddatas[i].ndag_lock),
161                                NULL);
162        }
163        return maxthreads;
164}
165
166static int dpdkndag_start_input(libtrace_t *libtrace) {
167        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
168        int snaplen = 9000;
169
170        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
171                                &hash) == -1) {
172                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
173                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
174                return -1;
175        }
176
177        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
178                                &snaplen) == -1) {
179                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
180                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
181                return -1;
182        }
183
184        if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) {
185                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
186                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
187                return -1;
188        }
189
190        dpdkndag_init_threads(libtrace, 1);
191
192        return 0;
193}
194
195static int dpdkndag_pstart_input(libtrace_t *libtrace) {
196
197        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
198        int snaplen = 9000;
199        FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count;
200        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
201                                &hash) == -1) {
202                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
203                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
204                return -1;
205        }
206
207        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
208                                &snaplen) == -1) {
209                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
210                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
211                return -1;
212        }
213        if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) {
214                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
215                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
216                return -1;
217        }
218        dpdkndag_init_threads(libtrace, libtrace->perpkt_thread_count);
219        return 0;
220}
221
222static void clear_threaddata(perthread_t *pt) {
223
224        int i;
225
226        if (pt->dpdkpkt) {
227                trace_destroy_packet(pt->dpdkpkt);
228        }
229        pt->dpdkpkt = NULL;
230
231        if (pt->capstreams) {
232                free(pt->capstreams);
233        }
234
235        for (i = 0; i < 40; i++) {
236                if (pt->burstspace[i]) {
237                        rte_pktmbuf_free(pt->burstspace[i]);
238                }
239        }
240        pthread_mutex_destroy(&(pt->ndag_lock));
241}
242
243static int dpdkndag_pause_input(libtrace_t *libtrace) {
244
245        int i;
246        /* Pause DPDK receive */
247        dpdk_pause_input(FORMAT_DATA->dpdkrecv);
248
249        /* Clear the threaddatas */
250        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
251                clear_threaddata(&(FORMAT_DATA->threaddatas[i]));
252        }
253        return 0;
254}
255
256static int dpdkndag_fin_input(libtrace_t *libtrace) {
257
258        if (FORMAT_DATA->dpdkrecv) {
259                trace_destroy(FORMAT_DATA->dpdkrecv);
260        }
261
262        if (FORMAT_DATA->threaddatas) {
263                free(FORMAT_DATA->threaddatas);
264        }
265
266        if (FORMAT_DATA->localiface) {
267                free(FORMAT_DATA->localiface);
268        }
269
270        if (FORMAT_DATA->multicastgroup) {
271                freeaddrinfo(FORMAT_DATA->multicastgroup);
272        }
273
274        free(FORMAT_DATA);
275        return 0;
276}
277
278static int dpdkndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
279                bool reader) {
280
281        perthread_t *pt;
282
283        if (!reader || t->type != THREAD_PERPKT) {
284                return 0;
285        }
286
287        if (dpdk_pregister_thread(FORMAT_DATA->dpdkrecv, t, reader) == -1) {
288                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
289                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
290                return -1;
291        }
292
293        /* t->format_data now contains our dpdk stream data */
294        pt = &(FORMAT_DATA->threaddatas[t->perpkt_num]);
295        pt->dpdkstreamdata = t->format_data;
296        t->format_data = pt;
297
298        return 0;
299}
300
301static void dpdkndag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) {
302
303        dpdk_punregister_thread(libtrace, t);
304}
305
306static void dpdkndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
307                libtrace_stat_t *stat) {
308
309        perthread_t *pt = (perthread_t *)t->format_data;
310
311        if (libtrace == NULL) {
312                return;
313        }
314
315                /* TODO Is this thread safe */
316        stat->dropped_valid = 1;
317        stat->dropped = pt->dropped_upstream;
318
319        stat->received_valid = 1;
320        stat->received = pt->received_packets;
321
322        stat->missing_valid = 1;
323        stat->missing = pt->missing_records;
324}
325
326static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
327        int i;
328
329        libtrace_stat_t *dpdkstat;
330
331        stat->dropped_valid = 1;
332        stat->dropped = 0;
333        stat->received_valid = 1;
334        stat->received = 0;
335        stat->missing_valid = 1;
336        stat->missing = 0;
337
338        dpdkstat = trace_create_statistics();
339        dpdk_get_stats(FORMAT_DATA->dpdkrecv, dpdkstat);
340
341        if (dpdkstat->dropped_valid) {
342                stat->errors_valid = 1;
343                stat->errors = dpdkstat->dropped;
344        }
345
346        /* TODO Is this thread safe? */
347        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
348                pthread_mutex_lock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
349                stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream;
350                stat->received += FORMAT_DATA->threaddatas[i].received_packets;
351                stat->missing += FORMAT_DATA->threaddatas[i].missing_records;
352                pthread_mutex_unlock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
353        }
354        free(dpdkstat);
355}
356
357static int is_ndag_packet(libtrace_packet_t *packet, perthread_t *pt) {
358
359        void *trans = NULL;
360        uint32_t rem = 0;
361        uint8_t proto;
362        char *payload;
363
364        trans = trace_get_transport(packet, &proto, &rem);
365        if (trans == NULL) {
366                return 0;
367        }
368
369        if (proto != TRACE_IPPROTO_UDP) {
370                return 0;
371        }
372
373        payload = (char *)trace_get_payload_from_udp((libtrace_udp_t *)trans,
374                        &rem);
375
376        if (payload == NULL) {
377                return 0;
378        }
379
380        if (rem < 4) {
381                return 0;
382        }
383
384        if (payload[0] == 'N' && payload[1] == 'D' && payload[2] == 'A'
385                        && payload[3] == 'G') {
386                pt->ndagsize = rem;
387                pt->ndagheader = payload;
388                return 1;
389        }
390
391        return 0;
392
393}
394
395static int sockaddr_same(struct sockaddr *a, struct sockaddr *b) {
396
397        if (a->sa_family != b->sa_family) {
398                return 0;
399        }
400
401        if (a->sa_family == AF_INET) {
402                struct sockaddr_in *ain = (struct sockaddr_in *)a;
403                struct sockaddr_in *bin = (struct sockaddr_in *)b;
404
405                if (ain->sin_addr.s_addr != bin->sin_addr.s_addr) {
406                        return 0;
407                }
408                return 1;
409        } else if (a->sa_family == AF_INET6) {
410                struct sockaddr_in6 *ain6 = (struct sockaddr_in6 *)a;
411                struct sockaddr_in6 *bin6 = (struct sockaddr_in6 *)b;
412
413                if (memcmp(ain6->sin6_addr.s6_addr, bin6->sin6_addr.s6_addr,
414                                sizeof(ain6->sin6_addr.s6_addr)) != 0) {
415                        return 0;
416                }
417                return 1;
418        }
419        return 0;
420}
421
422static int process_fresh_packet(perthread_t *pt, struct addrinfo *expectedaddr) {
423
424        ndag_common_t *header = (ndag_common_t *)pt->ndagheader;
425        ndag_encap_t *encaphdr = (ndag_encap_t *)(pt->ndagheader +
426                        sizeof(ndag_common_t));
427        uint16_t targetport;
428        struct sockaddr_storage targetaddr;
429        struct sockaddr *p;
430        capstream_t *cap = NULL;
431        int i;
432
433        memset((&targetaddr), 0, sizeof(targetaddr));
434        if (header->type != NDAG_PKT_ENCAPERF) {
435                pt->nextrec = NULL;
436                pt->ndagsize = 0;
437                pt->ndagheader = NULL;
438                return 1;
439        }
440
441        if ((p = trace_get_destination_address(pt->dpdkpkt,
442                        (struct sockaddr *)(&targetaddr))) == NULL) {
443                pt->nextrec = NULL;
444                pt->ndagsize = 0;
445                pt->ndagheader = NULL;
446                return 1;
447        }
448
449        if (!(sockaddr_same(p, expectedaddr->ai_addr))) {
450                pt->nextrec = NULL;
451                pt->ndagsize = 0;
452                pt->ndagheader = NULL;
453                return 1;
454        }
455
456        targetport = trace_get_destination_port(pt->dpdkpkt);
457        if (pt->streamcount == 0) {
458                pt->capstreams = (capstream_t *)malloc(sizeof(capstream_t));
459                pt->streamcount = 1;
460                pt->capstreams[0].port = targetport;
461                pt->capstreams[0].expectedseq = 0;
462                pt->capstreams[0].recordcount = 0;
463                cap = pt->capstreams;
464
465        } else {
466                for (i = 0; i < pt->streamcount; i++) {
467                        if (pt->capstreams[i].port == targetport) {
468                                cap = (&pt->capstreams[i]);
469                                break;
470                        }
471                }
472
473                if (cap == NULL) {
474                        uint16_t next = pt->streamcount;
475                        pt->capstreams = (capstream_t *)realloc(pt->capstreams,
476                                    sizeof(capstream_t) * (pt->streamcount + 1));
477                        pt->streamcount += 1;
478                        pt->capstreams[next].port = targetport;
479                        pt->capstreams[next].expectedseq = 0;
480                        pt->capstreams[next].recordcount = 0;
481                        cap = &(pt->capstreams[next]);
482                }
483        }
484        if (cap->expectedseq != 0) {
485                pthread_mutex_lock(&pt->ndag_lock);
486                pt->missing_records += seq_cmp(
487                                ntohl(encaphdr->seqno), cap->expectedseq);
488                pthread_mutex_unlock(&pt->ndag_lock);
489        }
490        cap->expectedseq = ntohl(encaphdr->seqno) + 1;
491        if (cap->expectedseq == 0) {
492                cap->expectedseq ++;
493        }
494        cap->recordcount ++;
495
496        pt->nextrec = ((char *)header) + sizeof(ndag_common_t) +
497                        sizeof(ndag_encap_t);
498
499        return 1;
500}
501
502static int ndagrec_to_libtrace_packet(libtrace_t *libtrace, perthread_t *pt,
503                libtrace_packet_t *packet) {
504
505        /* This is mostly borrowed from ndag_prepare_packet_stream, minus
506         * the ndag socket-specific stuff */
507
508        dag_record_t *erfptr;
509        ndag_encap_t *encaphdr;
510
511        if (pt->nextrec == NULL) {
512                return -1;
513        }
514
515        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
516                return -1;
517        }
518
519        packet->buf_control = TRACE_CTRL_EXTERNAL;
520
521        packet->trace = libtrace;
522        packet->buffer = pt->nextrec;
523        packet->header = pt->nextrec;
524        packet->type = TRACE_RT_DATA_ERF;
525
526        erfptr = (dag_record_t *)packet->header;
527
528        if (erfptr->flags.rxerror == 1) {
529                packet->payload = NULL;
530                erfptr->rlen = htons(erf_get_framing_length(packet));
531        } else {
532                packet->payload = (char *)packet->buffer +
533                                erf_get_framing_length(packet);
534        }
535
536        /* Update upstream drops using lctr */
537
538        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
539                /* TODO */
540        } else {
541                pthread_mutex_lock(&(pt->ndag_lock));
542                if (pt->received_packets > 0) {
543                        pt->dropped_upstream += ntohs(erfptr->lctr);
544                }
545                pthread_mutex_unlock(&(pt->ndag_lock));
546        }
547
548        pthread_mutex_lock(&(pt->ndag_lock));
549        pt->received_packets ++;
550        pthread_mutex_unlock(&(pt->ndag_lock));
551        encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t));
552
553        if ((ntohs(encaphdr->recordcount) & 0x8000) != 0) {
554                /* Record was truncated */
555                erfptr->rlen = htons(pt->ndagsize - (pt->nextrec -
556                                pt->ndagheader));
557        }
558
559        pt->nextrec += ntohs(erfptr->rlen);
560
561        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
562                pt->ndagheader = NULL;
563                pt->nextrec = NULL;
564                pt->ndagsize = 0;
565        }
566
567        packet->order = erf_get_erf_timestamp(packet);
568        packet->error = packet->payload ? ntohs(erfptr->rlen) :
569                        erf_get_framing_length(packet);
570        return ntohs(erfptr->rlen);
571}
572
573static int dpdkndag_pread_packets(libtrace_t *libtrace,
574                                    libtrace_thread_t *t,
575                                    libtrace_packet_t **packets,
576                                    size_t nb_packets) {
577
578        perthread_t *pt = (perthread_t *)t->format_data;
579        size_t read_packets = 0;
580        int ret;
581
582        while (pt->nextrec == NULL) {
583                trace_fin_packet(pt->dpdkpkt);
584
585                if (pt->burstsize > 0 && pt->burstsize != pt->burstoffset) {
586                        pt->dpdkpkt->buffer = pt->burstspace[pt->burstoffset];
587                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
588                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
589                                        pt->dpdkpkt->buffer,
590                                        TRACE_RT_DATA_DPDK, 0);
591                        pt->burstoffset ++;
592                } else {
593                        ret = dpdk_read_packet_stream(FORMAT_DATA->dpdkrecv,
594                                        pt->dpdkstreamdata,
595                                        &t->messages,
596                                        pt->burstspace,
597                                        40);
598                        if (ret <= 0) {
599                                return ret;
600                        }
601
602                        pt->dpdkpkt->buffer = pt->burstspace[0];
603                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
604                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
605                                        pt->dpdkpkt->buffer,
606                                        TRACE_RT_DATA_DPDK, 0);
607                        pt->burstsize = ret;
608                        pt->burstoffset = 1;
609                }
610
611                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
612                        continue;
613                }
614
615                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
616                if (ret <= 0) {
617                        return ret;
618                }
619        }
620
621        while (pt->nextrec != NULL) {
622                if (read_packets == nb_packets) {
623                        break;
624                }
625
626                if (packets[read_packets]->buf_control == TRACE_CTRL_PACKET) {
627                        free(packets[read_packets]->buffer);
628                        packets[read_packets]->buffer = NULL;
629                }
630                ret = ndagrec_to_libtrace_packet(libtrace, pt,
631                                packets[read_packets]);
632                if (ret < 0) {
633                        return ret;
634                }
635                read_packets ++;
636
637        }
638
639        return read_packets;
640}
641
642static int dpdkndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
643
644        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
645        int ret;
646
647        if (packet->buf_control == TRACE_CTRL_PACKET) {
648                free(packet->buffer);
649                packet->buffer = NULL;
650        }
651
652        while (pt->nextrec == NULL) {
653                trace_fin_packet(pt->dpdkpkt);
654
655                ret = dpdk_read_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt);
656                if (ret <= 0) {
657                        return ret;
658                }
659
660                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
661                        continue;
662                }
663
664                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
665                if (ret <= 0) {
666                        return ret;
667                }
668        }
669
670        return ndagrec_to_libtrace_packet(libtrace, pt, packet);
671}
672
673static libtrace_eventobj_t trace_event_dpdkndag(libtrace_t *libtrace,
674                libtrace_packet_t *packet) {
675
676
677        libtrace_eventobj_t event;
678        int ret;
679        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
680
681        if (packet->buf_control == TRACE_CTRL_PACKET) {
682                free(packet->buffer);
683                packet->buffer = NULL;
684        }
685
686        while (pt->nextrec == NULL) {
687
688                event = dpdk_trace_event(libtrace, pt->dpdkpkt);
689
690                if (event.type != TRACE_EVENT_PACKET) {
691                        return event;
692                }
693
694                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
695                        continue;
696                }
697
698                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
699                if (ret <= 0) {
700                        event.type = TRACE_EVENT_TERMINATE;
701                        return event;
702                }
703        }
704
705        ret = ndagrec_to_libtrace_packet(libtrace, pt, packet);
706        if (ret < 0) {
707                event.type = TRACE_EVENT_TERMINATE;
708        } else {
709                event.type = TRACE_EVENT_PACKET;
710                event.size = 1;
711        }
712        return event;
713}
714
715static struct libtrace_format_t dpdkndag = {
716
717        "dpdkndag",
718        "",
719        TRACE_FORMAT_DPDK_NDAG,
720        NULL,                   /* probe filename */
721        NULL,                   /* probe magic */
722        dpdkndag_init_input,        /* init_input */
723        dpdkndag_config_input,      /* config_input */
724        dpdkndag_start_input,       /* start_input */
725        dpdkndag_pause_input,       /* pause_input */
726        NULL,                   /* init_output */
727        NULL,                   /* config_output */
728        NULL,                   /* start_output */
729        dpdkndag_fin_input,         /* fin_input */
730        NULL,                   /* fin_output */
731        dpdkndag_read_packet,   /* read_packet */
732        NULL,                   /* prepare_packet */
733        NULL,                   /* fin_packet */
734        NULL,                   /* write_packet */
735        NULL,                   /* flush_output */
736        erf_get_link_type,      /* get_link_type */
737        erf_get_direction,      /* get_direction */
738        erf_set_direction,      /* set_direction */
739        erf_get_erf_timestamp,  /* get_erf_timestamp */
740        NULL,                   /* get_timeval */
741        NULL,                   /* get_seconds */
742        NULL,                   /* get_timespec */
743        NULL,                   /* seek_erf */
744        NULL,                   /* seek_timeval */
745        NULL,                   /* seek_seconds */
746        erf_get_capture_length, /* get_capture_length */
747        erf_get_wire_length,    /* get_wire_length */
748        erf_get_framing_length, /* get_framing_length */
749        erf_set_capture_length, /* set_capture_length */
750        NULL,                   /* get_received_packets */
751        NULL,                   /* get_filtered_packets */
752        NULL,                   /* get_dropped_packets */
753        dpdkndag_get_statistics,    /* get_statistics */
754        NULL,                   /* get_fd */
755        trace_event_dpdkndag,       /* trace_event */
756        NULL,                   /* help */
757        NULL,                   /* next pointer */
758        {true, 0},              /* live packet capture */
759        dpdkndag_pstart_input,      /* parallel start */
760        dpdkndag_pread_packets,     /* parallel read */
761        dpdkndag_pause_input,       /* parallel pause */
762        NULL,
763        dpdkndag_pregister_thread,  /* register thread */
764        dpdkndag_punregister_thread,
765        dpdkndag_get_thread_stats   /* per-thread stats */
766};
767
768void dpdkndag_constructor(void) {
769        register_format(&dpdkndag);
770}
Note: See TracBrowser for help on using the repository browser.