source: lib/format_dpdkndag.c @ 254c926

develop
Last change on this file since 254c926 was 254c926, checked in by Jacob Van Walraven <jcv9@…>, 21 months ago

Cleanup some duplicate code, Added datatype/option_name for libtrace_meta_t structure

  • Property mode set to 100644
File size: 23.4 KB
Line 
1
2#include "config.h"
3#include "common.h"
4#include "libtrace.h"
5#include "libtrace_int.h"
6#include "format_helper.h"
7#include "format_erf.h"
8
9#include <errno.h>
10#include <fcntl.h>
11#include <stdio.h>
12#include <string.h>
13#include <unistd.h>
14#include <stdlib.h>
15#include <sys/types.h>
16#include <sys/socket.h>
17#include <netdb.h>
18
19#include "format_dpdk.h"
20#include "format_ndag.h"
21
22static struct libtrace_format_t dpdkndag;
23
24typedef struct capstream {
25
26        uint16_t port;
27        uint32_t expectedseq;
28        uint64_t recordcount;
29} capstream_t;
30
31typedef struct perthread {
32        capstream_t *capstreams;
33        uint16_t streamcount;
34        uint64_t dropped_upstream;
35        uint64_t missing_records;
36        uint64_t received_packets;
37
38        libtrace_packet_t *dpdkpkt;
39        char *ndagheader;
40        char *nextrec;
41        uint32_t ndagsize;
42
43        pthread_mutex_t ndag_lock;
44        dpdk_per_stream_t *dpdkstreamdata;
45        int burstsize;
46        int burstoffset;
47        struct rte_mbuf* burstspace[40];
48
49} perthread_t;
50
51
52typedef struct dpdkndag_format_data {
53        libtrace_t *dpdkrecv;
54
55        struct addrinfo *multicastgroup;
56        char *localiface;
57
58        perthread_t *threaddatas;
59
60} dpdkndag_format_data_t;
61
62#define FORMAT_DATA ((dpdkndag_format_data_t *)libtrace->format_data)
63
64static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
65
66        /* Calculate seq_a - seq_b, taking wraparound into account */
67        if (seq_a == seq_b) return 0;
68
69        if (seq_a > seq_b) {
70                return (int) (seq_a - seq_b);
71        }
72
73        /* -1 for the wrap and another -1 because we don't use zero */
74        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
75}
76
77
78static int dpdkndag_init_input(libtrace_t *libtrace) {
79
80        char *scan = NULL;
81        char *next = NULL;
82        char dpdkuri[1280];
83        struct addrinfo hints, *result;
84
85        libtrace->format_data = (dpdkndag_format_data_t *)malloc(
86                        sizeof(dpdkndag_format_data_t));
87
88        if (!libtrace->format_data) {
89                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
90                        "format data inside dpdkndag_init_input()");
91                return -1;
92        }
93
94        FORMAT_DATA->localiface = NULL;
95        FORMAT_DATA->threaddatas = NULL;
96        FORMAT_DATA->dpdkrecv = NULL;
97
98        scan = strchr(libtrace->uridata, ',');
99        if (scan == NULL) {
100                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
101                        "Bad dpdkndag URI. Should be dpdkndag:<interface>,<multicast group>");
102                return -1;
103        }
104        FORMAT_DATA->localiface = strndup(libtrace->uridata,
105                        (size_t)(scan - libtrace->uridata));
106        next = scan + 1;
107
108        memset(&hints, 0, sizeof(struct addrinfo));
109        hints.ai_family = AF_UNSPEC;
110        hints.ai_socktype = SOCK_DGRAM;
111        hints.ai_flags = AI_PASSIVE;
112        hints.ai_protocol = 0;
113
114        if (getaddrinfo(next, NULL, &hints, &result) != 0) {
115                perror("getaddrinfo");
116                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
117                        "Invalid multicast address: %s", next);
118                return -1;
119        }
120
121        FORMAT_DATA->multicastgroup = result;
122
123        snprintf(dpdkuri, 1279, "dpdk:%s", FORMAT_DATA->localiface);
124        FORMAT_DATA->dpdkrecv = trace_create(dpdkuri);
125
126        if (trace_is_err(FORMAT_DATA->dpdkrecv)) {
127                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
128                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
129                free(libtrace->format_data);
130                libtrace->format_data = NULL;
131                return -1;
132        }
133
134        return 0;
135}
136
137static int dpdkndag_config_input (libtrace_t *libtrace, trace_option_t option,
138                void *data) {
139
140        return dpdk_config_input(FORMAT_DATA->dpdkrecv, option, data);
141}
142
143static int dpdkndag_init_threads(libtrace_t *libtrace, uint32_t maxthreads) {
144
145        uint32_t i;
146        if (FORMAT_DATA->threaddatas == NULL) {
147                FORMAT_DATA->threaddatas = (perthread_t *)malloc(
148                                sizeof(perthread_t) * maxthreads);
149        }
150
151        for (i = 0; i < maxthreads; i++) {
152                FORMAT_DATA->threaddatas[i].capstreams = NULL;
153                FORMAT_DATA->threaddatas[i].streamcount = 0;
154                FORMAT_DATA->threaddatas[i].dropped_upstream = 0;
155                FORMAT_DATA->threaddatas[i].received_packets = 0;
156                FORMAT_DATA->threaddatas[i].missing_records = 0;
157                FORMAT_DATA->threaddatas[i].dpdkstreamdata = NULL;
158                FORMAT_DATA->threaddatas[i].dpdkpkt = trace_create_packet();
159                FORMAT_DATA->threaddatas[i].ndagheader = NULL;
160                FORMAT_DATA->threaddatas[i].nextrec = NULL;
161                FORMAT_DATA->threaddatas[i].burstsize = 0;
162                FORMAT_DATA->threaddatas[i].burstoffset = 0;
163                memset(FORMAT_DATA->threaddatas[i].burstspace, 0,
164                                sizeof(struct rte_mbuf *) * 40);
165                pthread_mutex_init(&(FORMAT_DATA->threaddatas[i].ndag_lock),
166                                NULL);
167        }
168        return maxthreads;
169}
170
171static int dpdkndag_start_input(libtrace_t *libtrace) {
172        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
173        int snaplen = 9000;
174
175        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
176                                &hash) == -1) {
177                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
178                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
179                return -1;
180        }
181
182        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
183                                &snaplen) == -1) {
184                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
185                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
186                return -1;
187        }
188
189        if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) {
190                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
191                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
192                return -1;
193        }
194
195        dpdkndag_init_threads(libtrace, 1);
196
197        return 0;
198}
199
200static int dpdkndag_pstart_input(libtrace_t *libtrace) {
201
202        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
203        int snaplen = 9000;
204        FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count;
205        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
206                                &hash) == -1) {
207                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
208                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
209                return -1;
210        }
211
212        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
213                                &snaplen) == -1) {
214                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
215                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
216                return -1;
217        }
218        if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) {
219                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
220                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
221                return -1;
222        }
223        dpdkndag_init_threads(libtrace, libtrace->perpkt_thread_count);
224        return 0;
225}
226
227static void clear_threaddata(perthread_t *pt) {
228
229        int i;
230
231        if (pt->dpdkpkt) {
232                trace_destroy_packet(pt->dpdkpkt);
233        }
234        pt->dpdkpkt = NULL;
235
236        if (pt->capstreams) {
237                free(pt->capstreams);
238        }
239
240        for (i = 0; i < 40; i++) {
241                if (pt->burstspace[i]) {
242                        rte_pktmbuf_free(pt->burstspace[i]);
243                }
244        }
245        pthread_mutex_destroy(&(pt->ndag_lock));
246}
247
248static int dpdkndag_pause_input(libtrace_t *libtrace) {
249
250        int i;
251        /* Pause DPDK receive */
252        dpdk_pause_input(FORMAT_DATA->dpdkrecv);
253
254        /* Clear the threaddatas */
255        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
256                clear_threaddata(&(FORMAT_DATA->threaddatas[i]));
257        }
258        return 0;
259}
260
261static int dpdkndag_fin_input(libtrace_t *libtrace) {
262
263        if (FORMAT_DATA->dpdkrecv) {
264                trace_destroy(FORMAT_DATA->dpdkrecv);
265        }
266
267        if (FORMAT_DATA->threaddatas) {
268                free(FORMAT_DATA->threaddatas);
269        }
270
271        if (FORMAT_DATA->localiface) {
272                free(FORMAT_DATA->localiface);
273        }
274
275        if (FORMAT_DATA->multicastgroup) {
276                freeaddrinfo(FORMAT_DATA->multicastgroup);
277        }
278
279        free(FORMAT_DATA);
280        return 0;
281}
282
283static int dpdkndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
284                bool reader) {
285
286        perthread_t *pt;
287
288        if (!reader || t->type != THREAD_PERPKT) {
289                return 0;
290        }
291
292        if (dpdk_pregister_thread(FORMAT_DATA->dpdkrecv, t, reader) == -1) {
293                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
294                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
295                return -1;
296        }
297
298        /* t->format_data now contains our dpdk stream data */
299        pt = &(FORMAT_DATA->threaddatas[t->perpkt_num]);
300        pt->dpdkstreamdata = t->format_data;
301        t->format_data = pt;
302
303        return 0;
304}
305
306static void dpdkndag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) {
307
308        dpdk_punregister_thread(libtrace, t);
309}
310
311static void dpdkndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
312                libtrace_stat_t *stat) {
313
314        perthread_t *pt = (perthread_t *)t->format_data;
315
316        if (libtrace == NULL) {
317                return;
318        }
319
320                /* TODO Is this thread safe */
321        stat->dropped_valid = 1;
322        stat->dropped = pt->dropped_upstream;
323
324        stat->received_valid = 1;
325        stat->received = pt->received_packets;
326
327        stat->missing_valid = 1;
328        stat->missing = pt->missing_records;
329}
330
331static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
332        int i;
333
334        libtrace_stat_t *dpdkstat;
335
336        stat->dropped_valid = 1;
337        stat->dropped = 0;
338        stat->received_valid = 1;
339        stat->received = 0;
340        stat->missing_valid = 1;
341        stat->missing = 0;
342
343        dpdkstat = trace_create_statistics();
344        dpdk_get_stats(FORMAT_DATA->dpdkrecv, dpdkstat);
345
346        if (dpdkstat->dropped_valid) {
347                stat->errors_valid = 1;
348                stat->errors = dpdkstat->dropped;
349        }
350
351        /* TODO Is this thread safe? */
352        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
353                pthread_mutex_lock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
354                stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream;
355                stat->received += FORMAT_DATA->threaddatas[i].received_packets;
356                stat->missing += FORMAT_DATA->threaddatas[i].missing_records;
357                pthread_mutex_unlock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
358        }
359        free(dpdkstat);
360}
361
362static int is_ndag_packet(libtrace_packet_t *packet, perthread_t *pt) {
363
364        void *trans = NULL;
365        uint32_t rem = 0;
366        uint8_t proto;
367        char *payload;
368
369        trans = trace_get_transport(packet, &proto, &rem);
370        if (trans == NULL) {
371                return 0;
372        }
373
374        if (proto != TRACE_IPPROTO_UDP) {
375                return 0;
376        }
377
378        payload = (char *)trace_get_payload_from_udp((libtrace_udp_t *)trans,
379                        &rem);
380
381        if (payload == NULL) {
382                return 0;
383        }
384
385        if (rem < 4) {
386                return 0;
387        }
388
389        if (payload[0] == 'N' && payload[1] == 'D' && payload[2] == 'A'
390                        && payload[3] == 'G') {
391                pt->ndagsize = rem;
392                pt->ndagheader = payload;
393                return 1;
394        }
395
396        return 0;
397
398}
399
400static int sockaddr_same(struct sockaddr *a, struct sockaddr *b) {
401
402        if (a->sa_family != b->sa_family) {
403                return 0;
404        }
405
406        if (a->sa_family == AF_INET) {
407                struct sockaddr_in *ain = (struct sockaddr_in *)a;
408                struct sockaddr_in *bin = (struct sockaddr_in *)b;
409
410                if (ain->sin_addr.s_addr != bin->sin_addr.s_addr) {
411                        return 0;
412                }
413                return 1;
414        } else if (a->sa_family == AF_INET6) {
415                struct sockaddr_in6 *ain6 = (struct sockaddr_in6 *)a;
416                struct sockaddr_in6 *bin6 = (struct sockaddr_in6 *)b;
417
418                if (memcmp(ain6->sin6_addr.s6_addr, bin6->sin6_addr.s6_addr,
419                                sizeof(ain6->sin6_addr.s6_addr)) != 0) {
420                        return 0;
421                }
422                return 1;
423        }
424        return 0;
425}
426
427static int process_fresh_packet(perthread_t *pt, struct addrinfo *expectedaddr) {
428
429        ndag_common_t *header = (ndag_common_t *)pt->ndagheader;
430        ndag_encap_t *encaphdr = (ndag_encap_t *)(pt->ndagheader +
431                        sizeof(ndag_common_t));
432        uint16_t targetport;
433        struct sockaddr_storage targetaddr;
434        struct sockaddr *p;
435        capstream_t *cap = NULL;
436        int i;
437
438        memset((&targetaddr), 0, sizeof(targetaddr));
439        if (header->type != NDAG_PKT_ENCAPERF) {
440                pt->nextrec = NULL;
441                pt->ndagsize = 0;
442                pt->ndagheader = NULL;
443                return 1;
444        }
445
446        if ((p = trace_get_destination_address(pt->dpdkpkt,
447                        (struct sockaddr *)(&targetaddr))) == NULL) {
448                pt->nextrec = NULL;
449                pt->ndagsize = 0;
450                pt->ndagheader = NULL;
451                return 1;
452        }
453
454        if (!(sockaddr_same(p, expectedaddr->ai_addr))) {
455                pt->nextrec = NULL;
456                pt->ndagsize = 0;
457                pt->ndagheader = NULL;
458                return 1;
459        }
460
461        targetport = trace_get_destination_port(pt->dpdkpkt);
462        if (pt->streamcount == 0) {
463                pt->capstreams = (capstream_t *)malloc(sizeof(capstream_t));
464                pt->streamcount = 1;
465                pt->capstreams[0].port = targetport;
466                pt->capstreams[0].expectedseq = 0;
467                pt->capstreams[0].recordcount = 0;
468                cap = pt->capstreams;
469
470        } else {
471                for (i = 0; i < pt->streamcount; i++) {
472                        if (pt->capstreams[i].port == targetport) {
473                                cap = (&pt->capstreams[i]);
474                                break;
475                        }
476                }
477
478                if (cap == NULL) {
479                        uint16_t next = pt->streamcount;
480                        pt->capstreams = (capstream_t *)realloc(pt->capstreams,
481                                    sizeof(capstream_t) * (pt->streamcount + 1));
482                        pt->streamcount += 1;
483                        pt->capstreams[next].port = targetport;
484                        pt->capstreams[next].expectedseq = 0;
485                        pt->capstreams[next].recordcount = 0;
486                        cap = &(pt->capstreams[next]);
487                }
488        }
489        if (cap->expectedseq != 0) {
490                pthread_mutex_lock(&pt->ndag_lock);
491                pt->missing_records += seq_cmp(
492                                ntohl(encaphdr->seqno), cap->expectedseq);
493                pthread_mutex_unlock(&pt->ndag_lock);
494        }
495        cap->expectedseq = ntohl(encaphdr->seqno) + 1;
496        if (cap->expectedseq == 0) {
497                cap->expectedseq ++;
498        }
499        cap->recordcount ++;
500
501        pt->nextrec = ((char *)header) + sizeof(ndag_common_t) +
502                        sizeof(ndag_encap_t);
503
504        return 1;
505}
506
507static int ndagrec_to_libtrace_packet(libtrace_t *libtrace, perthread_t *pt,
508                libtrace_packet_t *packet) {
509
510        /* This is mostly borrowed from ndag_prepare_packet_stream, minus
511         * the ndag socket-specific stuff */
512
513        dag_record_t *erfptr;
514        ndag_encap_t *encaphdr;
515
516        if (pt->nextrec == NULL) {
517                return -1;
518        }
519
520        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
521                return -1;
522        }
523
524        packet->buf_control = TRACE_CTRL_EXTERNAL;
525
526        packet->trace = libtrace;
527        packet->buffer = pt->nextrec;
528        packet->header = pt->nextrec;
529        packet->type = TRACE_RT_DATA_ERF;
530
531        erfptr = (dag_record_t *)packet->header;
532
533        if (erfptr->flags.rxerror == 1) {
534                packet->payload = NULL;
535                erfptr->rlen = htons(erf_get_framing_length(packet));
536        } else {
537                packet->payload = (char *)packet->buffer +
538                                erf_get_framing_length(packet);
539        }
540
541        /* Update upstream drops using lctr */
542
543        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
544                /* TODO */
545        } else {
546                pthread_mutex_lock(&(pt->ndag_lock));
547                if (pt->received_packets > 0) {
548                        pt->dropped_upstream += ntohs(erfptr->lctr);
549                }
550                pthread_mutex_unlock(&(pt->ndag_lock));
551        }
552
553        pthread_mutex_lock(&(pt->ndag_lock));
554        pt->received_packets ++;
555        pthread_mutex_unlock(&(pt->ndag_lock));
556        encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t));
557
558        if ((ntohs(encaphdr->recordcount) & 0x8000) != 0) {
559                /* Record was truncated */
560                erfptr->rlen = htons(pt->ndagsize - (pt->nextrec -
561                                pt->ndagheader));
562        }
563
564        pt->nextrec += ntohs(erfptr->rlen);
565
566        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
567                pt->ndagheader = NULL;
568                pt->nextrec = NULL;
569                pt->ndagsize = 0;
570        }
571
572        packet->order = erf_get_erf_timestamp(packet);
573        packet->error = packet->payload ? ntohs(erfptr->rlen) :
574                        erf_get_framing_length(packet);
575        return ntohs(erfptr->rlen);
576}
577
578static int dpdkndag_pread_packets(libtrace_t *libtrace,
579                                    libtrace_thread_t *t,
580                                    libtrace_packet_t **packets,
581                                    size_t nb_packets) {
582
583        perthread_t *pt = (perthread_t *)t->format_data;
584        size_t read_packets = 0;
585        int ret;
586
587        while (pt->nextrec == NULL) {
588                trace_fin_packet(pt->dpdkpkt);
589
590                if (pt->burstsize > 0 && pt->burstsize != pt->burstoffset) {
591                        pt->dpdkpkt->buffer = pt->burstspace[pt->burstoffset];
592                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
593                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
594                                        pt->dpdkpkt->buffer,
595                                        TRACE_RT_DATA_DPDK, 0);
596                        pt->burstoffset ++;
597                } else {
598                        ret = dpdk_read_packet_stream(FORMAT_DATA->dpdkrecv,
599                                        pt->dpdkstreamdata,
600                                        &t->messages,
601                                        pt->burstspace,
602                                        40);
603                        if (ret <= 0) {
604                                return ret;
605                        }
606
607                        pt->dpdkpkt->buffer = pt->burstspace[0];
608                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
609                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
610                                        pt->dpdkpkt->buffer,
611                                        TRACE_RT_DATA_DPDK, 0);
612                        pt->burstsize = ret;
613                        pt->burstoffset = 1;
614                }
615
616                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
617                        continue;
618                }
619
620                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
621                if (ret <= 0) {
622                        return ret;
623                }
624        }
625
626        while (pt->nextrec != NULL) {
627                if (read_packets == nb_packets) {
628                        break;
629                }
630
631                if (packets[read_packets]->buf_control == TRACE_CTRL_PACKET) {
632                        free(packets[read_packets]->buffer);
633                        packets[read_packets]->buffer = NULL;
634                }
635                ret = ndagrec_to_libtrace_packet(libtrace, pt,
636                                packets[read_packets]);
637                if (ret < 0) {
638                        return ret;
639                }
640                read_packets ++;
641
642        }
643
644        return read_packets;
645}
646
647static int dpdkndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
648
649        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
650        int ret;
651
652        if (packet->buf_control == TRACE_CTRL_PACKET) {
653                free(packet->buffer);
654                packet->buffer = NULL;
655        }
656
657        while (pt->nextrec == NULL) {
658                trace_fin_packet(pt->dpdkpkt);
659
660                ret = dpdk_read_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt);
661                if (ret <= 0) {
662                        return ret;
663                }
664
665                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
666                        continue;
667                }
668
669                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
670                if (ret <= 0) {
671                        return ret;
672                }
673        }
674
675        return ndagrec_to_libtrace_packet(libtrace, pt, packet);
676}
677
678static libtrace_eventobj_t trace_event_dpdkndag(libtrace_t *libtrace,
679                libtrace_packet_t *packet) {
680
681
682        libtrace_eventobj_t event;
683        int ret;
684        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
685
686        if (packet->buf_control == TRACE_CTRL_PACKET) {
687                free(packet->buffer);
688                packet->buffer = NULL;
689        }
690
691        while (pt->nextrec == NULL) {
692
693                event = dpdk_trace_event(libtrace, pt->dpdkpkt);
694
695                if (event.type != TRACE_EVENT_PACKET) {
696                        return event;
697                }
698
699                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
700                        continue;
701                }
702
703                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
704                if (ret <= 0) {
705                        event.type = TRACE_EVENT_TERMINATE;
706                        return event;
707                }
708        }
709
710        ret = ndagrec_to_libtrace_packet(libtrace, pt, packet);
711        if (ret < 0) {
712                event.type = TRACE_EVENT_TERMINATE;
713        } else {
714                event.type = TRACE_EVENT_PACKET;
715                event.size = 1;
716        }
717        return event;
718}
719
720static struct libtrace_format_t dpdkndag = {
721
722        "dpdkndag",
723        "",
724        TRACE_FORMAT_DPDK_NDAG,
725        NULL,                   /* probe filename */
726        NULL,                   /* probe magic */
727        dpdkndag_init_input,        /* init_input */
728        dpdkndag_config_input,      /* config_input */
729        dpdkndag_start_input,       /* start_input */
730        dpdkndag_pause_input,       /* pause_input */
731        NULL,                   /* init_output */
732        NULL,                   /* config_output */
733        NULL,                   /* start_output */
734        dpdkndag_fin_input,         /* fin_input */
735        NULL,                   /* fin_output */
736        dpdkndag_read_packet,   /* read_packet */
737        NULL,                   /* prepare_packet */
738        NULL,                   /* fin_packet */
739        NULL,                   /* write_packet */
740        NULL,                   /* flush_output */
741        erf_get_link_type,      /* get_link_type */
742        erf_get_direction,      /* get_direction */
743        erf_set_direction,      /* set_direction */
744        erf_get_erf_timestamp,  /* get_erf_timestamp */
745        NULL,                   /* get_timeval */
746        NULL,                   /* get_seconds */
747        NULL,                   /* get_timespec */
748        NULL,                   /* seek_erf */
749        NULL,                   /* seek_timeval */
750        NULL,                   /* seek_seconds */
751        NULL,                   /* get_meta_section */
752        erf_get_capture_length, /* get_capture_length */
753        erf_get_wire_length,    /* get_wire_length */
754        erf_get_framing_length, /* get_framing_length */
755        erf_set_capture_length, /* set_capture_length */
756        NULL,                   /* get_received_packets */
757        NULL,                   /* get_filtered_packets */
758        NULL,                   /* get_dropped_packets */
759        dpdkndag_get_statistics,    /* get_statistics */
760        NULL,                   /* get_fd */
761        trace_event_dpdkndag,       /* trace_event */
762        NULL,                   /* help */
763        NULL,                   /* next pointer */
764        {true, 0},              /* live packet capture */
765        dpdkndag_pstart_input,      /* parallel start */
766        dpdkndag_pread_packets,     /* parallel read */
767        dpdkndag_pause_input,       /* parallel pause */
768        NULL,
769        dpdkndag_pregister_thread,  /* register thread */
770        dpdkndag_punregister_thread,
771        dpdkndag_get_thread_stats   /* per-thread stats */
772};
773
774void dpdkndag_constructor(void) {
775        register_format(&dpdkndag);
776}
Note: See TracBrowser for help on using the repository browser.