source: lib/format_dpdkndag.c @ 2725318

develop
Last change on this file since 2725318 was 2725318, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Cleanup some of the assertions

  • Property mode set to 100644
File size: 23.4 KB
Line 
1
2#include "config.h"
3#include "common.h"
4#include "libtrace.h"
5#include "libtrace_int.h"
6#include "format_helper.h"
7#include "format_erf.h"
8
9#include <assert.h>
10#include <errno.h>
11#include <fcntl.h>
12#include <stdio.h>
13#include <string.h>
14#include <unistd.h>
15#include <stdlib.h>
16#include <sys/types.h>
17#include <sys/socket.h>
18#include <netdb.h>
19
20#include "format_dpdk.h"
21#include "format_ndag.h"
22
23static struct libtrace_format_t dpdkndag;
24
25typedef struct capstream {
26
27        uint16_t port;
28        uint32_t expectedseq;
29        uint64_t recordcount;
30} capstream_t;
31
32typedef struct perthread {
33        capstream_t *capstreams;
34        uint16_t streamcount;
35        uint64_t dropped_upstream;
36        uint64_t missing_records;
37        uint64_t received_packets;
38
39        libtrace_packet_t *dpdkpkt;
40        char *ndagheader;
41        char *nextrec;
42        uint32_t ndagsize;
43
44        pthread_mutex_t ndag_lock;
45        dpdk_per_stream_t *dpdkstreamdata;
46        int burstsize;
47        int burstoffset;
48        struct rte_mbuf* burstspace[40];
49
50} perthread_t;
51
52
53typedef struct dpdkndag_format_data {
54        libtrace_t *dpdkrecv;
55
56        struct addrinfo *multicastgroup;
57        char *localiface;
58
59        perthread_t *threaddatas;
60
61} dpdkndag_format_data_t;
62
63#define FORMAT_DATA ((dpdkndag_format_data_t *)libtrace->format_data)
64
65static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
66
67        /* Calculate seq_a - seq_b, taking wraparound into account */
68        if (seq_a == seq_b) return 0;
69
70        if (seq_a > seq_b) {
71                return (int) (seq_a - seq_b);
72        }
73
74        /* -1 for the wrap and another -1 because we don't use zero */
75        return (int) (0xffffffff - ((seq_b - seq_a) - 2));
76}
77
78
79static int dpdkndag_init_input(libtrace_t *libtrace) {
80
81        char *scan = NULL;
82        char *next = NULL;
83        char dpdkuri[1280];
84        struct addrinfo hints, *result;
85
86        libtrace->format_data = (dpdkndag_format_data_t *)malloc(
87                        sizeof(dpdkndag_format_data_t));
88
89        if (!libtrace->format_data) {
90                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory dpdknday_init_input()");
91                return 1;
92        }
93
94        FORMAT_DATA->localiface = NULL;
95        FORMAT_DATA->threaddatas = NULL;
96        FORMAT_DATA->dpdkrecv = NULL;
97
98        scan = strchr(libtrace->uridata, ',');
99        if (scan == NULL) {
100                free(libtrace->format_data);
101                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
102                        "Bad dpdkndag URI. Should be dpdkndag:<interface>,<multicast group>");
103                return -1;
104        }
105        FORMAT_DATA->localiface = strndup(libtrace->uridata,
106                        (size_t)(scan - libtrace->uridata));
107        next = scan + 1;
108
109        memset(&hints, 0, sizeof(struct addrinfo));
110        hints.ai_family = AF_UNSPEC;
111        hints.ai_socktype = SOCK_DGRAM;
112        hints.ai_flags = AI_PASSIVE;
113        hints.ai_protocol = 0;
114
115        if (getaddrinfo(next, NULL, &hints, &result) != 0) {
116                perror("getaddrinfo");
117                free(libtrace->format_data);
118                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
119                        "Invalid multicast address: %s", next);
120                return -1;
121        }
122
123        FORMAT_DATA->multicastgroup = result;
124
125        snprintf(dpdkuri, 1279, "dpdk:%s", FORMAT_DATA->localiface);
126        FORMAT_DATA->dpdkrecv = trace_create(dpdkuri);
127
128        if (trace_is_err(FORMAT_DATA->dpdkrecv)) {
129                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
130                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
131                free(libtrace->format_data);
132                libtrace->format_data = NULL;
133                return -1;
134        }
135
136        return 0;
137}
138
139static int dpdkndag_config_input (libtrace_t *libtrace, trace_option_t option,
140                void *data) {
141
142        return dpdk_config_input(FORMAT_DATA->dpdkrecv, option, data);
143}
144
145static int dpdkndag_init_threads(libtrace_t *libtrace, uint32_t maxthreads) {
146
147        uint32_t i;
148        if (FORMAT_DATA->threaddatas == NULL) {
149                FORMAT_DATA->threaddatas = (perthread_t *)malloc(
150                                sizeof(perthread_t) * maxthreads);
151        }
152
153        for (i = 0; i < maxthreads; i++) {
154                FORMAT_DATA->threaddatas[i].capstreams = NULL;
155                FORMAT_DATA->threaddatas[i].streamcount = 0;
156                FORMAT_DATA->threaddatas[i].dropped_upstream = 0;
157                FORMAT_DATA->threaddatas[i].received_packets = 0;
158                FORMAT_DATA->threaddatas[i].missing_records = 0;
159                FORMAT_DATA->threaddatas[i].dpdkstreamdata = NULL;
160                FORMAT_DATA->threaddatas[i].dpdkpkt = trace_create_packet();
161                FORMAT_DATA->threaddatas[i].ndagheader = NULL;
162                FORMAT_DATA->threaddatas[i].nextrec = NULL;
163                FORMAT_DATA->threaddatas[i].burstsize = 0;
164                FORMAT_DATA->threaddatas[i].burstoffset = 0;
165                memset(FORMAT_DATA->threaddatas[i].burstspace, 0,
166                                sizeof(struct rte_mbuf *) * 40);
167                pthread_mutex_init(&(FORMAT_DATA->threaddatas[i].ndag_lock),
168                                NULL);
169        }
170        return maxthreads;
171}
172
173static int dpdkndag_start_input(libtrace_t *libtrace) {
174        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
175        int snaplen = 9000;
176
177        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
178                                &hash) == -1) {
179                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
180                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
181                return -1;
182        }
183
184        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
185                                &snaplen) == -1) {
186                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
187                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
188                return -1;
189        }
190
191        if (dpdk_start_input(FORMAT_DATA->dpdkrecv) == -1) {
192                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
193                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
194                return -1;
195        }
196
197        dpdkndag_init_threads(libtrace, 1);
198
199        return 0;
200}
201
202static int dpdkndag_pstart_input(libtrace_t *libtrace) {
203
204        enum hasher_types hash = HASHER_UNIDIRECTIONAL;
205        int snaplen = 9000;
206        FORMAT_DATA->dpdkrecv->perpkt_thread_count = libtrace->perpkt_thread_count;
207        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_HASHER,
208                                &hash) == -1) {
209                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
210                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
211                return -1;
212        }
213
214        if (dpdk_config_input(FORMAT_DATA->dpdkrecv, TRACE_OPTION_SNAPLEN,
215                                &snaplen) == -1) {
216                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
217                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
218                return -1;
219        }
220        if (dpdk_pstart_input(FORMAT_DATA->dpdkrecv) == -1) {
221                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
222                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
223                return -1;
224        }
225        dpdkndag_init_threads(libtrace, libtrace->perpkt_thread_count);
226        return 0;
227}
228
229static void clear_threaddata(perthread_t *pt) {
230
231        int i;
232
233        if (pt->dpdkpkt) {
234                trace_destroy_packet(pt->dpdkpkt);
235        }
236        pt->dpdkpkt = NULL;
237
238        if (pt->capstreams) {
239                free(pt->capstreams);
240        }
241
242        for (i = 0; i < 40; i++) {
243                if (pt->burstspace[i]) {
244                        rte_pktmbuf_free(pt->burstspace[i]);
245                }
246        }
247        pthread_mutex_destroy(&(pt->ndag_lock));
248}
249
250static int dpdkndag_pause_input(libtrace_t *libtrace) {
251
252        int i;
253        /* Pause DPDK receive */
254        dpdk_pause_input(FORMAT_DATA->dpdkrecv);
255
256        /* Clear the threaddatas */
257        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
258                clear_threaddata(&(FORMAT_DATA->threaddatas[i]));
259        }
260        return 0;
261}
262
263static int dpdkndag_fin_input(libtrace_t *libtrace) {
264
265        if (FORMAT_DATA->dpdkrecv) {
266                trace_destroy(FORMAT_DATA->dpdkrecv);
267        }
268
269        if (FORMAT_DATA->threaddatas) {
270                free(FORMAT_DATA->threaddatas);
271        }
272
273        if (FORMAT_DATA->localiface) {
274                free(FORMAT_DATA->localiface);
275        }
276
277        if (FORMAT_DATA->multicastgroup) {
278                freeaddrinfo(FORMAT_DATA->multicastgroup);
279        }
280
281        free(FORMAT_DATA);
282        return 0;
283}
284
285static int dpdkndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
286                bool reader) {
287
288        perthread_t *pt;
289
290        if (!reader || t->type != THREAD_PERPKT) {
291                return 0;
292        }
293
294        if (dpdk_pregister_thread(FORMAT_DATA->dpdkrecv, t, reader) == -1) {
295                libtrace_err_t err = trace_get_err(FORMAT_DATA->dpdkrecv);
296                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err.problem);
297                return -1;
298        }
299
300        /* t->format_data now contains our dpdk stream data */
301        pt = &(FORMAT_DATA->threaddatas[t->perpkt_num]);
302        pt->dpdkstreamdata = t->format_data;
303        t->format_data = pt;
304
305        return 0;
306}
307
308static void dpdkndag_punregister_thread(libtrace_t *libtrace, libtrace_thread_t *t) {
309
310        dpdk_punregister_thread(libtrace, t);
311}
312
313static void dpdkndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
314                libtrace_stat_t *stat) {
315
316        perthread_t *pt = (perthread_t *)t->format_data;
317
318        if (libtrace == NULL) {
319                return;
320        }
321
322                /* TODO Is this thread safe */
323        stat->dropped_valid = 1;
324        stat->dropped = pt->dropped_upstream;
325
326        stat->received_valid = 1;
327        stat->received = pt->received_packets;
328
329        stat->missing_valid = 1;
330        stat->missing = pt->missing_records;
331}
332
333static void dpdkndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
334        int i;
335
336        libtrace_stat_t *dpdkstat;
337
338        stat->dropped_valid = 1;
339        stat->dropped = 0;
340        stat->received_valid = 1;
341        stat->received = 0;
342        stat->missing_valid = 1;
343        stat->missing = 0;
344
345        dpdkstat = trace_create_statistics();
346        dpdk_get_stats(FORMAT_DATA->dpdkrecv, dpdkstat);
347
348        if (dpdkstat->dropped_valid) {
349                stat->errors_valid = 1;
350                stat->errors = dpdkstat->dropped;
351        }
352
353        /* TODO Is this thread safe? */
354        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
355                pthread_mutex_lock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
356                stat->dropped += FORMAT_DATA->threaddatas[i].dropped_upstream;
357                stat->received += FORMAT_DATA->threaddatas[i].received_packets;
358                stat->missing += FORMAT_DATA->threaddatas[i].missing_records;
359                pthread_mutex_unlock(&(FORMAT_DATA->threaddatas[i].ndag_lock));
360        }
361        free(dpdkstat);
362}
363
364static int is_ndag_packet(libtrace_packet_t *packet, perthread_t *pt) {
365
366        void *trans = NULL;
367        uint32_t rem = 0;
368        uint8_t proto;
369        char *payload;
370
371        trans = trace_get_transport(packet, &proto, &rem);
372        if (trans == NULL) {
373                return 0;
374        }
375
376        if (proto != TRACE_IPPROTO_UDP) {
377                return 0;
378        }
379
380        payload = (char *)trace_get_payload_from_udp((libtrace_udp_t *)trans,
381                        &rem);
382
383        if (payload == NULL) {
384                return 0;
385        }
386
387        if (rem < 4) {
388                return 0;
389        }
390
391        if (payload[0] == 'N' && payload[1] == 'D' && payload[2] == 'A'
392                        && payload[3] == 'G') {
393                pt->ndagsize = rem;
394                pt->ndagheader = payload;
395                return 1;
396        }
397
398        return 0;
399
400}
401
402static int sockaddr_same(struct sockaddr *a, struct sockaddr *b) {
403
404        if (a->sa_family != b->sa_family) {
405                return 0;
406        }
407
408        if (a->sa_family == AF_INET) {
409                struct sockaddr_in *ain = (struct sockaddr_in *)a;
410                struct sockaddr_in *bin = (struct sockaddr_in *)b;
411
412                if (ain->sin_addr.s_addr != bin->sin_addr.s_addr) {
413                        return 0;
414                }
415                return 1;
416        } else if (a->sa_family == AF_INET6) {
417                struct sockaddr_in6 *ain6 = (struct sockaddr_in6 *)a;
418                struct sockaddr_in6 *bin6 = (struct sockaddr_in6 *)b;
419
420                if (memcmp(ain6->sin6_addr.s6_addr, bin6->sin6_addr.s6_addr,
421                                sizeof(ain6->sin6_addr.s6_addr)) != 0) {
422                        return 0;
423                }
424                return 1;
425        }
426        return 0;
427}
428
429static int process_fresh_packet(perthread_t *pt, struct addrinfo *expectedaddr) {
430
431        ndag_common_t *header = (ndag_common_t *)pt->ndagheader;
432        ndag_encap_t *encaphdr = (ndag_encap_t *)(pt->ndagheader +
433                        sizeof(ndag_common_t));
434        uint16_t targetport;
435        struct sockaddr_storage targetaddr;
436        struct sockaddr *p;
437        capstream_t *cap = NULL;
438        int i;
439
440        memset((&targetaddr), 0, sizeof(targetaddr));
441        if (header->type != NDAG_PKT_ENCAPERF) {
442                pt->nextrec = NULL;
443                pt->ndagsize = 0;
444                pt->ndagheader = NULL;
445                return 1;
446        }
447
448        if ((p = trace_get_destination_address(pt->dpdkpkt,
449                        (struct sockaddr *)(&targetaddr))) == NULL) {
450                pt->nextrec = NULL;
451                pt->ndagsize = 0;
452                pt->ndagheader = NULL;
453                return 1;
454        }
455
456        if (!(sockaddr_same(p, expectedaddr->ai_addr))) {
457                pt->nextrec = NULL;
458                pt->ndagsize = 0;
459                pt->ndagheader = NULL;
460                return 1;
461        }
462
463        targetport = trace_get_destination_port(pt->dpdkpkt);
464        if (pt->streamcount == 0) {
465                pt->capstreams = (capstream_t *)malloc(sizeof(capstream_t));
466                pt->streamcount = 1;
467                pt->capstreams[0].port = targetport;
468                pt->capstreams[0].expectedseq = 0;
469                pt->capstreams[0].recordcount = 0;
470                cap = pt->capstreams;
471
472        } else {
473                for (i = 0; i < pt->streamcount; i++) {
474                        if (pt->capstreams[i].port == targetport) {
475                                cap = (&pt->capstreams[i]);
476                                break;
477                        }
478                }
479
480                if (cap == NULL) {
481                        uint16_t next = pt->streamcount;
482                        pt->capstreams = (capstream_t *)realloc(pt->capstreams,
483                                    sizeof(capstream_t) * (pt->streamcount + 1));
484                        pt->streamcount += 1;
485                        pt->capstreams[next].port = targetport;
486                        pt->capstreams[next].expectedseq = 0;
487                        pt->capstreams[next].recordcount = 0;
488                        cap = &(pt->capstreams[next]);
489                }
490        }
491        if (cap->expectedseq != 0) {
492                pthread_mutex_lock(&pt->ndag_lock);
493                pt->missing_records += seq_cmp(
494                                ntohl(encaphdr->seqno), cap->expectedseq);
495                pthread_mutex_unlock(&pt->ndag_lock);
496        }
497        cap->expectedseq = ntohl(encaphdr->seqno) + 1;
498        if (cap->expectedseq == 0) {
499                cap->expectedseq ++;
500        }
501        cap->recordcount ++;
502
503        pt->nextrec = ((char *)header) + sizeof(ndag_common_t) +
504                        sizeof(ndag_encap_t);
505
506        return 1;
507}
508
509static int ndagrec_to_libtrace_packet(libtrace_t *libtrace, perthread_t *pt,
510                libtrace_packet_t *packet) {
511
512        /* This is mostly borrowed from ndag_prepare_packet_stream, minus
513         * the ndag socket-specific stuff */
514
515        dag_record_t *erfptr;
516        ndag_encap_t *encaphdr;
517
518        if (pt->nextrec == NULL) {
519                return -1;
520        }
521
522        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
523                return -1;
524        }
525
526        packet->buf_control = TRACE_CTRL_EXTERNAL;
527
528        packet->trace = libtrace;
529        packet->buffer = pt->nextrec;
530        packet->header = pt->nextrec;
531        packet->type = TRACE_RT_DATA_ERF;
532
533        erfptr = (dag_record_t *)packet->header;
534
535        if (erfptr->flags.rxerror == 1) {
536                packet->payload = NULL;
537                erfptr->rlen = htons(erf_get_framing_length(packet));
538        } else {
539                packet->payload = (char *)packet->buffer +
540                                erf_get_framing_length(packet);
541        }
542
543        /* Update upstream drops using lctr */
544
545        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
546                /* TODO */
547        } else {
548                pthread_mutex_lock(&(pt->ndag_lock));
549                if (pt->received_packets > 0) {
550                        pt->dropped_upstream += ntohs(erfptr->lctr);
551                }
552                pthread_mutex_unlock(&(pt->ndag_lock));
553        }
554
555        pthread_mutex_lock(&(pt->ndag_lock));
556        pt->received_packets ++;
557        pthread_mutex_unlock(&(pt->ndag_lock));
558        encaphdr = (ndag_encap_t *)(pt->ndagheader + sizeof(ndag_common_t));
559
560        if ((ntohs(encaphdr->recordcount) & 0x8000) != 0) {
561                /* Record was truncated */
562                erfptr->rlen = htons(pt->ndagsize - (pt->nextrec -
563                                pt->ndagheader));
564        }
565
566        pt->nextrec += ntohs(erfptr->rlen);
567
568        if (pt->nextrec - pt->ndagheader >= pt->ndagsize) {
569                pt->ndagheader = NULL;
570                pt->nextrec = NULL;
571                pt->ndagsize = 0;
572        }
573
574        packet->order = erf_get_erf_timestamp(packet);
575        packet->error = packet->payload ? ntohs(erfptr->rlen) :
576                        erf_get_framing_length(packet);
577        return ntohs(erfptr->rlen);
578}
579
580static int dpdkndag_pread_packets(libtrace_t *libtrace,
581                                    libtrace_thread_t *t,
582                                    libtrace_packet_t **packets,
583                                    size_t nb_packets) {
584
585        perthread_t *pt = (perthread_t *)t->format_data;
586        size_t read_packets = 0;
587        int ret;
588
589        while (pt->nextrec == NULL) {
590                trace_fin_packet(pt->dpdkpkt);
591
592                if (pt->burstsize > 0 && pt->burstsize != pt->burstoffset) {
593                        pt->dpdkpkt->buffer = pt->burstspace[pt->burstoffset];
594                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
595                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
596                                        pt->dpdkpkt->buffer,
597                                        TRACE_RT_DATA_DPDK, 0);
598                        pt->burstoffset ++;
599                } else {
600                        ret = dpdk_read_packet_stream(FORMAT_DATA->dpdkrecv,
601                                        pt->dpdkstreamdata,
602                                        &t->messages,
603                                        pt->burstspace,
604                                        40);
605                        if (ret <= 0) {
606                                return ret;
607                        }
608
609                        pt->dpdkpkt->buffer = pt->burstspace[0];
610                        pt->dpdkpkt->trace = FORMAT_DATA->dpdkrecv;
611                        dpdk_prepare_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt,
612                                        pt->dpdkpkt->buffer,
613                                        TRACE_RT_DATA_DPDK, 0);
614                        pt->burstsize = ret;
615                        pt->burstoffset = 1;
616                }
617
618                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
619                        continue;
620                }
621
622                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
623                if (ret <= 0) {
624                        return ret;
625                }
626        }
627
628        while (pt->nextrec != NULL) {
629                if (read_packets == nb_packets) {
630                        break;
631                }
632
633                if (packets[read_packets]->buf_control == TRACE_CTRL_PACKET) {
634                        free(packets[read_packets]->buffer);
635                        packets[read_packets]->buffer = NULL;
636                }
637                ret = ndagrec_to_libtrace_packet(libtrace, pt,
638                                packets[read_packets]);
639                if (ret < 0) {
640                        return ret;
641                }
642                read_packets ++;
643
644        }
645
646        return read_packets;
647}
648
649static int dpdkndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
650
651        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
652        int ret;
653
654        if (packet->buf_control == TRACE_CTRL_PACKET) {
655                free(packet->buffer);
656                packet->buffer = NULL;
657        }
658
659        while (pt->nextrec == NULL) {
660                trace_fin_packet(pt->dpdkpkt);
661
662                ret = dpdk_read_packet(FORMAT_DATA->dpdkrecv, pt->dpdkpkt);
663                if (ret <= 0) {
664                        return ret;
665                }
666
667                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
668                        continue;
669                }
670
671                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
672                if (ret <= 0) {
673                        return ret;
674                }
675        }
676
677        return ndagrec_to_libtrace_packet(libtrace, pt, packet);
678}
679
680static libtrace_eventobj_t trace_event_dpdkndag(libtrace_t *libtrace,
681                libtrace_packet_t *packet) {
682
683
684        libtrace_eventobj_t event;
685        int ret;
686        perthread_t *pt = &(FORMAT_DATA->threaddatas[0]);
687
688        if (packet->buf_control == TRACE_CTRL_PACKET) {
689                free(packet->buffer);
690                packet->buffer = NULL;
691        }
692
693        while (pt->nextrec == NULL) {
694
695                event = dpdk_trace_event(libtrace, pt->dpdkpkt);
696
697                if (event.type != TRACE_EVENT_PACKET) {
698                        return event;
699                }
700
701                if (!is_ndag_packet(pt->dpdkpkt, pt)) {
702                        continue;
703                }
704
705                ret = process_fresh_packet(pt, FORMAT_DATA->multicastgroup);
706                if (ret <= 0) {
707                        event.type = TRACE_EVENT_TERMINATE;
708                        return event;
709                }
710        }
711
712        ret = ndagrec_to_libtrace_packet(libtrace, pt, packet);
713        if (ret < 0) {
714                event.type = TRACE_EVENT_TERMINATE;
715        } else {
716                event.type = TRACE_EVENT_PACKET;
717                event.size = 1;
718        }
719        return event;
720}
721
722static struct libtrace_format_t dpdkndag = {
723
724        "dpdkndag",
725        "",
726        TRACE_FORMAT_DPDK_NDAG,
727        NULL,                   /* probe filename */
728        NULL,                   /* probe magic */
729        dpdkndag_init_input,        /* init_input */
730        dpdkndag_config_input,      /* config_input */
731        dpdkndag_start_input,       /* start_input */
732        dpdkndag_pause_input,       /* pause_input */
733        NULL,                   /* init_output */
734        NULL,                   /* config_output */
735        NULL,                   /* start_output */
736        dpdkndag_fin_input,         /* fin_input */
737        NULL,                   /* fin_output */
738        dpdkndag_read_packet,   /* read_packet */
739        NULL,                   /* prepare_packet */
740        NULL,                   /* fin_packet */
741        NULL,                   /* write_packet */
742        NULL,                   /* flush_output */
743        erf_get_link_type,      /* get_link_type */
744        erf_get_direction,      /* get_direction */
745        erf_set_direction,      /* set_direction */
746        erf_get_erf_timestamp,  /* get_erf_timestamp */
747        NULL,                   /* get_timeval */
748        NULL,                   /* get_seconds */
749        NULL,                   /* get_timespec */
750        NULL,                   /* seek_erf */
751        NULL,                   /* seek_timeval */
752        NULL,                   /* seek_seconds */
753        erf_get_capture_length, /* get_capture_length */
754        erf_get_wire_length,    /* get_wire_length */
755        erf_get_framing_length, /* get_framing_length */
756        erf_set_capture_length, /* set_capture_length */
757        NULL,                   /* get_received_packets */
758        NULL,                   /* get_filtered_packets */
759        NULL,                   /* get_dropped_packets */
760        dpdkndag_get_statistics,    /* get_statistics */
761        NULL,                   /* get_fd */
762        trace_event_dpdkndag,       /* trace_event */
763        NULL,                   /* help */
764        NULL,                   /* next pointer */
765        {true, 0},              /* live packet capture */
766        dpdkndag_pstart_input,      /* parallel start */
767        dpdkndag_pread_packets,     /* parallel read */
768        dpdkndag_pause_input,       /* parallel pause */
769        NULL,
770        dpdkndag_pregister_thread,  /* register thread */
771        dpdkndag_punregister_thread,
772        dpdkndag_get_thread_stats   /* per-thread stats */
773};
774
775void dpdkndag_constructor(void) {
776        register_format(&dpdkndag);
777}
Note: See TracBrowser for help on using the repository browser.