source: lib/format_dpdk.c @ 1407294

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 1407294 was 1407294, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Remove HASHER_HARDWARE and doc/code tidies

We don't want to expose this option to the user as it was only used internally.
As it happens we can completely remove it if needed.

Remove error handling from start thread and fix gcc warning in start thread.

  • Property mode set to 100644
File size: 74.7 KB
Line 
1
2/*
3 * This file is part of libtrace
4 *
5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
6 * New Zealand.
7 *
8 * Author: Richard Sanger
9 *
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
30 *
31 */
32
33/* This format module deals with using the Intel Data Plane Development
34 * Kit capture format.
35 *
36 * Intel Data Plane Development Kit is a LIVE capture format.
37 *
38 * This format also supports writing which will write packets out to the
39 * network as a form of packet replay. This should not be confused with the
40 * RT protocol which is intended to transfer captured packet records between
41 * RT-speaking programs.
42 */
43
44#define _GNU_SOURCE
45
46#include "config.h"
47#include "libtrace.h"
48#include "libtrace_int.h"
49#include "format_helper.h"
50#include "libtrace_arphrd.h"
51#include "hash_toeplitz.h"
52
53#ifdef HAVE_INTTYPES_H
54#  include <inttypes.h>
55#else
56# error "Can't find inttypes.h"
57#endif
58
59#include <stdlib.h>
60#include <assert.h>
61#include <unistd.h>
62#include <endian.h>
63#include <string.h>
64
65#if HAVE_LIBNUMA
66#include <numa.h>
67#endif
68
69/* We can deal with any minor differences by checking the RTE VERSION
70 * Typically DPDK backports some fixes (typically for building against
71 * newer kernels) to the older version of DPDK.
72 *
73 * These get released with the rX suffix. The following macros where added
74 * in these new releases.
75 *
76 * Below this is a log of version that required changes to the libtrace
77 * code (that we still attempt to support).
78 *
79 * DPDK v1.7.1 is recommended.
80 * However 1.5 to 1.8 are likely supported.
81 */
82#include <rte_eal.h>
83#include <rte_version.h>
84#ifndef RTE_VERSION_NUM
85#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
86#endif
87#ifndef RTE_VER_PATCH_RELEASE
88#       define RTE_VER_PATCH_RELEASE 0
89#endif
90#ifndef RTE_VERSION
91#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
92        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
93#endif
94
95/* 1.6.0r2 :
96 *      rte_eal_pci_set_blacklist() is removed
97 *      device_list is renamed to pci_device_list
98 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
99 *      as such we do apply the whitelist before rte_eal_init.
100 *      This also works correctly with DPDK 1.6.0r2.
101 *
102 * Replaced by:
103 *      rte_devargs (we can simply whitelist)
104 */
105#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
106#       define DPDK_USE_BLACKLIST 1
107#else
108#       define DPDK_USE_BLACKLIST 0
109#endif
110
111/*
112 * 1.7.0 :
113 *      rte_pmd_init_all is removed
114 *
115 * Replaced by:
116 *      Nothing, no longer needed
117 */
118#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
119#       define DPDK_USE_PMD_INIT 1
120#else
121#       define DPDK_USE_PMD_INIT 0
122#endif
123
124/* 1.7.0-rc3 :
125 *
126 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
127 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
128 * it twice.
129 */
130#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
131#       define DPDK_USE_PCI_PROBE 1
132#else
133#       define DPDK_USE_PCI_PROBE 0
134#endif
135
136/* 1.8.0-rc1 :
137 * LOG LEVEL is a command line option which overrides what
138 * we previously set it to.
139 */
140#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
141#       define DPDK_USE_LOG_LEVEL 1
142#else
143#       define DPDK_USE_LOG_LEVEL 0
144#endif
145
146#include <rte_per_lcore.h>
147#include <rte_debug.h>
148#include <rte_errno.h>
149#include <rte_common.h>
150#include <rte_log.h>
151#include <rte_memcpy.h>
152#include <rte_prefetch.h>
153#include <rte_branch_prediction.h>
154#include <rte_pci.h>
155#include <rte_ether.h>
156#include <rte_ethdev.h>
157#include <rte_ring.h>
158#include <rte_mempool.h>
159#include <rte_mbuf.h>
160#include <rte_launch.h>
161#include <rte_lcore.h>
162#include <rte_per_lcore.h>
163#include <rte_cycles.h>
164#include <pthread.h>
165
166/* The default size of memory buffers to use - This is the max size of standard
167 * ethernet packet less the size of the MAC CHECKSUM */
168#define RX_MBUF_SIZE 1514
169
170/* The minimum number of memory buffers per queue tx or rx. Search for
171 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
172 */
173#define MIN_NB_BUF 64
174
175/* Number of receive memory buffers to use
176 * By default this is limited by driver to 4k and must be a multiple of 128.
177 * A modification can be made to the driver to remove this limit.
178 * This can be increased in the driver and here.
179 * Should be at least MIN_NB_BUF.
180 */
181#define NB_RX_MBUF 4096
182
183/* Number of send memory buffers to use.
184 * Same limits apply as those to NB_TX_MBUF.
185 */
186#define NB_TX_MBUF 1024
187
188/* The size of the PCI blacklist needs to be big enough to contain
189 * every PCI device address (listed by lspci every bus:device.function tuple).
190 */
191#define BLACK_LIST_SIZE 50
192
193/* The maximum number of characters the mempool name can be */
194#define MEMPOOL_NAME_LEN 20
195
196/* For single threaded libtrace we read packets as a batch/burst
197 * this is the maximum size of said burst */
198#define BURST_SIZE 50
199
200#define MBUF(x) ((struct rte_mbuf *) x)
201/* Get the original placement of the packet data */
202#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
203#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
204#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
205
206#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
207#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
208
209#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
210                        (uint64_t) tv.tv_usec*1000ull)
211#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
212                        (uint64_t) ts.tv_nsec)
213
214#if RTE_PKTMBUF_HEADROOM != 128
215#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
216         "any libtrace instance processing these packet must be have the" \
217         "same RTE_PKTMBUF_HEADROOM set"
218#endif
219
220/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
221 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
222 *
223 * Make sure you understand what these are doing before enabling them.
224 * They might make traces incompatable with other builds etc.
225 *
226 * These are also included to show how to do somethings which aren't
227 * obvious in the DPDK documentation.
228 */
229
230/* Print verbose messages to stderr */
231#define DEBUG 0
232
233/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
234 * only turn on if you know clock_gettime is a vsyscall on your system
235 * overwise could be a large overhead. Again gettimeofday() should be
236 * vsyscall also if it's not you should seriously consider updating your
237 * kernel.
238 */
239#ifdef HAVE_CLOCK_GETTIME
240/* You can turn this on (set to 1) to prefer clock_gettime */
241#define USE_CLOCK_GETTIME 1
242#else
243/* DON'T CHANGE THIS !!! */
244#define USE_CLOCK_GETTIME 0
245#endif
246
247/* This is fairly safe to turn on - currently there appears to be a 'bug'
248 * in DPDK that will remove the checksum by making the packet appear 4bytes
249 * smaller than what it really is. Most formats don't include the checksum
250 * hence writing out a port such as int: ring: and dpdk: assumes there
251 * is no checksum and will attempt to write the checksum as part of the
252 * packet
253 */
254#define GET_MAC_CRC_CHECKSUM 0
255
256/* This requires a modification of the pmd drivers (inside Intel DPDK)
257 * TODO this requires updating (packet sizes are wrong TS most likely also)
258 */
259#define HAS_HW_TIMESTAMPS_82580 0
260
261#if HAS_HW_TIMESTAMPS_82580
262# define TS_NBITS_82580     40
263/* The maximum on the +ve or -ve side that we can be, make it half way */
264# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
265#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
266#endif
267
268static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
269/* Memory pools Per NUMA node */
270static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
271
272/* As per Intel 82580 specification - mismatch in 82580 datasheet
273 * it states ts is stored in Big Endian, however its actually Little */
274struct hw_timestamp_82580 {
275        uint64_t reserved;
276        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
277};
278
279enum paused_state {
280        DPDK_NEVER_STARTED,
281        DPDK_RUNNING,
282        DPDK_PAUSED,
283};
284
285struct dpdk_per_stream_t
286{
287        uint16_t queue_id;
288        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
289        struct rte_mempool *mempool;
290        int lcore;
291#if HAS_HW_TIMESTAMPS_82580
292        /* Timestamping only relevent to RX */
293        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
294        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
295#endif
296} ALIGN_STRUCT(CACHE_LINE_SIZE);
297
298#if HAS_HW_TIMESTAMPS_82580
299#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
300#else
301#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
302#endif
303
304typedef struct dpdk_per_stream_t dpdk_per_stream_t;
305
306/* Used by both input and output however some fields are not used
307 * for output */
308struct dpdk_format_data_t {
309        int8_t promisc; /* promiscuous mode - RX only */
310        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
311        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
312        uint8_t paused; /* See paused_state */
313        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
314        int snaplen; /* The snap length for the capture - RX only */
315        /* We always have to setup both rx and tx queues even if we don't want them */
316        int nb_rx_buf; /* The number of packet buffers in the rx ring */
317        int nb_tx_buf; /* The number of packet buffers in the tx ring */
318        int nic_numa_node; /* The NUMA node that the NIC is attached to */
319        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
320#if DPDK_USE_BLACKLIST
321        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
322        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
323#endif
324        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
325        uint8_t rss_key[40]; // This is the RSS KEY
326        /* To improve single-threaded performance we always batch reading
327         * packets, in a burst, otherwise the parallel library does this for us */
328        struct rte_mbuf* burst_pkts[BURST_SIZE];
329        int burst_size; /* The total number read in the burst */
330        int burst_offset; /* The offset we are into the burst */
331
332        /* Our parallel streams */
333        libtrace_list_t *per_stream;
334};
335
336enum dpdk_addt_hdr_flags {
337        INCLUDES_CHECKSUM = 0x1,
338        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
339};
340
341/**
342 * A structure placed in front of the packet where we can store
343 * additional information about the given packet.
344 * +--------------------------+
345 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
346 * +--------------------------+
347 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
348 * +--------------------------+
349 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
350 * +--------------------------+
351 * *   hw_timestamp_82580     * 16 bytes Optional
352 * +--------------------------+
353 * |       Packet data        | Variable Size
354 * |                          |
355 */
356struct dpdk_addt_hdr {
357        uint64_t timestamp;
358        uint8_t flags;
359        uint8_t direction;
360        uint8_t reserved1;
361        uint8_t reserved2;
362        uint32_t cap_len; /* The size to say the capture is */
363};
364
365/**
366 * We want to blacklist all devices except those on the whitelist
367 * (I say list, but yes it is only the one).
368 *
369 * The default behaviour of rte_pci_probe() will map every possible device
370 * to its DPDK driver. The DPDK driver will take the ethernet device
371 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
372 *
373 * So blacklist all devices except the one that we wish to use so that
374 * the others can still be used as standard ethernet ports.
375 *
376 * @return 0 if successful, otherwise -1 on error.
377 */
378#if DPDK_USE_BLACKLIST
379static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
380{
381        struct rte_pci_device *dev = NULL;
382        format_data->nb_blacklist = 0;
383
384        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
385
386        TAILQ_FOREACH(dev, &device_list, next) {
387        if (whitelist != NULL && whitelist->domain == dev->addr.domain
388            && whitelist->bus == dev->addr.bus
389            && whitelist->devid == dev->addr.devid
390            && whitelist->function == dev->addr.function)
391            continue;
392                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
393                                / sizeof (format_data->blacklist[0])) {
394                        fprintf(stderr, "Warning: too many devices to blacklist consider"
395                                        " increasing BLACK_LIST_SIZE");
396                        break;
397                }
398                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
399                ++format_data->nb_blacklist;
400        }
401
402        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
403        return 0;
404}
405#else /* DPDK_USE_BLACKLIST */
406#include <rte_devargs.h>
407static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
408{
409        char pci_str[20] = {0};
410        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
411                 whitelist->domain,
412                 whitelist->bus,
413                 whitelist->devid,
414                 whitelist->function);
415        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
416                return -1;
417        }
418        return 0;
419}
420#endif
421
422/**
423 * Parse the URI format as a pci address
424 * Fills in addr, note core is optional and is unchanged if
425 * a value for it is not provided.
426 *
427 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
428 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
429 */
430static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
431        int matches;
432        assert(str);
433        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
434                         &addr->domain, &addr->bus, &addr->devid,
435                         &addr->function, core);
436        if (matches >= 4) {
437                return 0;
438        } else {
439                return -1;
440        }
441}
442
443/**
444 * Convert a pci address to the numa node it is
445 * connected to.
446 *
447 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
448 * so we can call it before DPDK
449 *
450 * @return -1 if unknown otherwise a number 0 or higher of the numa node
451 */
452static int pci_to_numa(struct rte_pci_addr * dev_addr) {
453        char path[50] = {0};
454        FILE *file;
455
456        /* Read from the system */
457        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
458                 dev_addr->domain,
459                 dev_addr->bus,
460                 dev_addr->devid,
461                 dev_addr->function);
462
463        if((file = fopen(path, "r")) != NULL) {
464                int numa_node = -1;
465                fscanf(file, "%d", &numa_node);
466                fclose(file);
467                return numa_node;
468        }
469        return -1;
470}
471
472#if DEBUG
473/* For debugging */
474static inline void dump_configuration()
475{
476        struct rte_config * global_config;
477        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
478
479        if (nb_cpu <= 0) {
480                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
481                       " Falling back to the first core.");
482                nb_cpu = 1; /* fallback to just 1 core */
483        }
484        if (nb_cpu > RTE_MAX_LCORE)
485                nb_cpu = RTE_MAX_LCORE;
486
487        global_config = rte_eal_get_configuration();
488
489        if (global_config != NULL) {
490                int i;
491                fprintf(stderr, "Intel DPDK setup\n"
492                        "---Version      : %s\n"
493                        "---Master LCore : %"PRIu32"\n"
494                        "---LCore Count  : %"PRIu32"\n",
495                        rte_version(),
496                        global_config->master_lcore, global_config->lcore_count);
497
498                for (i = 0 ; i < nb_cpu; i++) {
499                        fprintf(stderr, "   ---Core %d : %s\n", i,
500                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
501                }
502
503                const char * proc_type;
504                switch (global_config->process_type) {
505                case RTE_PROC_AUTO:
506                        proc_type = "auto";
507                        break;
508                case RTE_PROC_PRIMARY:
509                        proc_type = "primary";
510                        break;
511                case RTE_PROC_SECONDARY:
512                        proc_type = "secondary";
513                        break;
514                case RTE_PROC_INVALID:
515                        proc_type = "invalid";
516                        break;
517                default:
518                        proc_type = "something worse than invalid!!";
519                }
520                fprintf(stderr, "---Process Type : %s\n", proc_type);
521        }
522
523}
524#endif
525
526/**
527 * Expects to be called from the master lcore and moves it to the given dpdk id
528 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
529 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
530 *               and not already in use.
531 * @return 0 is successful otherwise -1 on error.
532 */
533static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
534        struct rte_config *cfg = rte_eal_get_configuration();
535        cpu_set_t cpuset;
536        int i;
537
538        assert (core < RTE_MAX_LCORE);
539        assert (rte_get_master_lcore() == rte_lcore_id());
540
541        if (core == rte_lcore_id())
542                return 0;
543
544        /* Make sure we are not overwriting someone else */
545        assert(!rte_lcore_is_enabled(core));
546
547        /* Move the core */
548        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
549        cfg->lcore_role[core] = ROLE_RTE;
550        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
551        rte_eal_get_configuration()->master_lcore = core;
552        RTE_PER_LCORE(_lcore_id) = core;
553
554        /* Now change the affinity, either mapped to a single core or all accepted */
555        CPU_ZERO(&cpuset);
556
557        if (lcore_config[core].detected) {
558                CPU_SET(core, &cpuset);
559        } else {
560                for (i = 0; i < RTE_MAX_LCORE; ++i) {
561                        if (lcore_config[i].detected)
562                                CPU_SET(i, &cpuset);
563                }
564        }
565
566        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
567        if (i != 0) {
568                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
569                return -1;
570        }
571        return 0;
572}
573
574/**
575 * XXX This is very bad XXX
576 * But we have to do something to allow getopts nesting
577 * Luckly normally the format is last so it doesn't matter
578 * DPDK only supports modern systems so hopefully this
579 * will continue to work
580 */
581struct saved_getopts {
582        char *optarg;
583        int optind;
584        int opterr;
585        int optopt;
586};
587
588static void save_getopts(struct saved_getopts *opts) {
589        opts->optarg = optarg;
590        opts->optind = optind;
591        opts->opterr = opterr;
592        opts->optopt = optopt;
593}
594
595static void restore_getopts(struct saved_getopts *opts) {
596        optarg = opts->optarg;
597        optind = opts->optind;
598        opterr = opts->opterr;
599        optopt = opts->optopt;
600}
601
602static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
603                                        char * err, int errlen) {
604        int ret; /* Returned error codes */
605        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
606        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
607        char mem_map[20] = {0}; /* The memory name */
608        long nb_cpu; /* The number of CPUs in the system */
609        long my_cpu; /* The CPU number we want to bind to */
610        int i;
611        struct rte_config *cfg = rte_eal_get_configuration();
612        struct saved_getopts save_opts;
613
614        /* This initialises the Environment Abstraction Layer (EAL)
615         * If we had slave workers these are put into WAITING state
616         *
617         * Basically binds this thread to a fixed core, which we choose as
618         * the last core on the machine (assuming fewer interrupts mapped here).
619         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
620         * "-n" the number of memory channels into the CPU (hardware specific)
621         *      - Most likely to be half the number of ram slots in your machine.
622         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
623         * Controls where in memory packets are stored such that they are spread
624         * across the channels. We just use 1 to be safe.
625         *
626         * Using unique file prefixes mean separate memory is used, unlinking
627         * the two processes. However be careful we still cannot access a
628         * port that already in use.
629         */
630        char* argv[] = {"libtrace",
631                        "-c", cpu_number,
632                        "-n", "1",
633                        "--proc-type", "auto",
634                        "--file-prefix", mem_map,
635                        "-m", "512",
636#if DPDK_USE_LOG_LEVEL
637#       if DEBUG
638                        "--log-level", "8", /* RTE_LOG_DEBUG */
639#       else
640                        "--log-level", "5", /* RTE_LOG_WARNING */
641#       endif
642#endif
643                        NULL};
644        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
645
646#if DEBUG
647        rte_set_log_level(RTE_LOG_DEBUG);
648#else
649        rte_set_log_level(RTE_LOG_WARNING);
650#endif
651
652        /* Get the number of cpu cores in the system and use the last core
653         * on the correct numa node */
654        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
655        if (nb_cpu <= 0) {
656                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
657                       " Falling back to the first core.");
658                nb_cpu = 1; /* fallback to the first core */
659        }
660        if (nb_cpu > RTE_MAX_LCORE)
661                nb_cpu = RTE_MAX_LCORE;
662
663        my_cpu = -1;
664        /* This allows the user to specify the core - we would try to do this
665         * automatically but it's hard to tell that this is secondary
666         * before running rte_eal_init(...). Currently we are limited to 1
667         * instance per core due to the way memory is allocated. */
668        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
669                snprintf(err, errlen, "Failed to parse URI");
670                return -1;
671        }
672
673#if HAVE_LIBNUMA
674        format_data->nic_numa_node = pci_to_numa(&use_addr);
675        if (my_cpu < 0) {
676                /* If we can assign to a core on the same numa node */
677                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
678                if(format_data->nic_numa_node >= 0) {
679                        int max_node_cpu = -1;
680                        struct bitmask *mask = numa_allocate_cpumask();
681                        assert(mask);
682                        numa_node_to_cpus(format_data->nic_numa_node, mask);
683                        for (i = 0 ; i < nb_cpu; ++i) {
684                                if (numa_bitmask_isbitset(mask,i))
685                                        max_node_cpu = i+1;
686                        }
687                        my_cpu = max_node_cpu;
688                }
689        }
690#endif
691        if (my_cpu < 0) {
692                my_cpu = nb_cpu;
693        }
694
695
696        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
697                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
698
699        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
700                snprintf(err, errlen,
701                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
702                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
703                return -1;
704        }
705
706        /* Make our mask with all cores turned on this is so that DPDK to
707         * gets CPU info older versions */
708        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
709        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
710
711#if !DPDK_USE_BLACKLIST
712        /* Black list all ports besides the one that we want to use */
713        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
714                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
715                         " are you sure the address is correct?: %s", strerror(-ret));
716                return -1;
717        }
718#endif
719
720        /* Give the memory map a unique name */
721        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
722        /* rte_eal_init it makes a call to getopt so we need to reset the
723         * global optind variable of getopt otherwise this fails */
724        save_getopts(&save_opts);
725        optind = 1;
726        if ((ret = rte_eal_init(argc, argv)) < 0) {
727                snprintf(err, errlen,
728                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
729                return -1;
730        }
731        restore_getopts(&save_opts);
732        // These are still running but will never do anything with DPDK v1.7 we
733        // should remove this XXX in the future
734        for(i = 0; i < RTE_MAX_LCORE; ++i) {
735                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
736                        cfg->lcore_role[i] = ROLE_OFF;
737                        cfg->lcore_count--;
738                }
739        }
740        // Only the master should be running
741        assert(cfg->lcore_count == 1);
742
743        // TODO XXX TODO
744        dpdk_move_master_lcore(NULL, my_cpu-1);
745
746#if DEBUG
747        dump_configuration();
748#endif
749
750#if DPDK_USE_PMD_INIT
751        /* This registers all available NICs with Intel DPDK
752         * These are not loaded until rte_eal_pci_probe() is called.
753         */
754        if ((ret = rte_pmd_init_all()) < 0) {
755                snprintf(err, errlen,
756                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
757                return -1;
758        }
759#endif
760
761#if DPDK_USE_BLACKLIST
762        /* Blacklist all ports besides the one that we want to use */
763        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
764                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
765                         " are you sure the address is correct?: %s", strerror(-ret));
766                return -1;
767        }
768#endif
769
770#if DPDK_USE_PCI_PROBE
771        /* This loads DPDK drivers against all ports that are not blacklisted */
772        if ((ret = rte_eal_pci_probe()) < 0) {
773                snprintf(err, errlen,
774                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
775                return -1;
776        }
777#endif
778
779        format_data->nb_ports = rte_eth_dev_count();
780
781        if (format_data->nb_ports != 1) {
782                snprintf(err, errlen,
783                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
784                         format_data->nb_ports);
785                return -1;
786        }
787
788        struct rte_eth_dev_info dev_info;
789        rte_eth_dev_info_get(0, &dev_info);
790        fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
791                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
792
793        return 0;
794}
795
796static int dpdk_init_input (libtrace_t *libtrace) {
797        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
798        char err[500];
799        err[0] = 0;
800
801        libtrace->format_data = (struct dpdk_format_data_t *)
802                                malloc(sizeof(struct dpdk_format_data_t));
803        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
804        FORMAT(libtrace)->nb_ports = 0;
805        FORMAT(libtrace)->snaplen = 0; /* Use default */
806        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
807        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
808        FORMAT(libtrace)->nic_numa_node = -1;
809        FORMAT(libtrace)->promisc = -1;
810        FORMAT(libtrace)->pktmbuf_pool = NULL;
811#if DPDK_USE_BLACKLIST
812        FORMAT(libtrace)->nb_blacklist = 0;
813#endif
814        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
815        FORMAT(libtrace)->mempool_name[0] = 0;
816        memset(FORMAT(libtrace)->burst_pkts, 0,
817               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
818        FORMAT(libtrace)->burst_size = 0;
819        FORMAT(libtrace)->burst_offset = 0;
820
821        /* Make our first stream */
822        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
823        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
824
825        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
826                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
827                free(libtrace->format_data);
828                libtrace->format_data = NULL;
829                return -1;
830        }
831        return 0;
832}
833
834static int dpdk_init_output(libtrace_out_t *libtrace)
835{
836        char err[500];
837        err[0] = 0;
838
839        libtrace->format_data = (struct dpdk_format_data_t *)
840                                malloc(sizeof(struct dpdk_format_data_t));
841        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
842        FORMAT(libtrace)->nb_ports = 0;
843        FORMAT(libtrace)->snaplen = 0; /* Use default */
844        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
845        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
846        FORMAT(libtrace)->nic_numa_node = -1;
847        FORMAT(libtrace)->promisc = -1;
848        FORMAT(libtrace)->pktmbuf_pool = NULL;
849#if DPDK_USE_BLACKLIST
850        FORMAT(libtrace)->nb_blacklist = 0;
851#endif
852        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
853        FORMAT(libtrace)->mempool_name[0] = 0;
854        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
855        FORMAT(libtrace)->burst_size = 0;
856        FORMAT(libtrace)->burst_offset = 0;
857
858        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
859                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
860                free(libtrace->format_data);
861                libtrace->format_data = NULL;
862                return -1;
863        }
864        return 0;
865}
866
867static int dpdk_pconfig_input (libtrace_t *libtrace,
868                               trace_parallel_option_t option,
869                               void *data) {
870        switch (option) {
871        case TRACE_OPTION_SET_HASHER:
872                switch (*((enum hasher_types *) data))
873                {
874                case HASHER_BALANCE:
875                case HASHER_UNIDIRECTIONAL:
876                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
877                        return 0;
878                case HASHER_BIDIRECTIONAL:
879                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
880                        return 0;
881                case HASHER_CUSTOM:
882                        // We don't support these
883                        return -1;
884                }
885                break;
886        }
887        return -1;
888}
889
890/**
891 * Note here snaplen excludes the MAC checksum. Packets over
892 * the requested snaplen will be dropped. (Excluding MAC checksum)
893 *
894 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
895 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
896 * is set the maximum size of the returned packet would be 1518 otherwise
897 * 1514 would be the largest size possibly returned.
898 *
899 */
900static int dpdk_config_input (libtrace_t *libtrace,
901                              trace_option_t option,
902                              void *data) {
903        switch (option) {
904        case TRACE_OPTION_SNAPLEN:
905                /* Only support changing snaplen before a call to start is
906                 * made */
907                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
908                        FORMAT(libtrace)->snaplen=*(int*)data;
909                else
910                        return -1;
911                return 0;
912        case TRACE_OPTION_PROMISC:
913                FORMAT(libtrace)->promisc=*(int*)data;
914                return 0;
915        case TRACE_OPTION_FILTER:
916                /* TODO filtering */
917                break;
918        case TRACE_OPTION_META_FREQ:
919                break;
920        case TRACE_OPTION_EVENT_REALTIME:
921                break;
922        /* Avoid default: so that future options will cause a warning
923         * here to remind us to implement it, or flag it as
924         * unimplementable
925         */
926        }
927
928        /* Don't set an error - trace_config will try to deal with the
929         * option and will set an error if it fails */
930        return -1;
931}
932
933/* Can set jumbo frames/ or limit the size of a frame by setting both
934 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
935 *
936 */
937static struct rte_eth_conf port_conf = {
938        .rxmode = {
939                .mq_mode = ETH_RSS,
940                .split_hdr_size = 0,
941                .header_split   = 0, /**< Header Split disabled */
942                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
943                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
944                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
945                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
946#if GET_MAC_CRC_CHECKSUM
947/* So it appears that if hw_strip_crc is turned off the driver will still
948 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
949 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
950 * So lets just add it back on when we receive the packet.
951 */
952                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
953#else
954/* By default strip the MAC checksum because it's a bit of a hack to
955 * actually read these. And don't want to rely on disabling this to actualy
956 * always cut off the checksum in the future
957 */
958                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
959#endif
960        },
961        .txmode = {
962                .mq_mode = ETH_DCB_NONE,
963        },
964        .rx_adv_conf = {
965                .rss_conf = {
966                        // .rss_key = &rss_key, // We set this per format
967                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
968                },
969        },
970        .intr_conf = {
971                .lsc = 1
972        }
973};
974
975static const struct rte_eth_rxconf rx_conf = {
976        .rx_thresh = {
977                .pthresh = 8,/* RX_PTHRESH prefetch */
978                .hthresh = 8,/* RX_HTHRESH host */
979                .wthresh = 4,/* RX_WTHRESH writeback */
980        },
981        .rx_free_thresh = 0,
982        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
983};
984
985static const struct rte_eth_txconf tx_conf = {
986        .tx_thresh = {
987                /*
988                 * TX_PTHRESH prefetch
989                 * Set on the NIC, if the number of unprocessed descriptors to queued on
990                 * the card fall below this try grab at least hthresh more unprocessed
991                 * descriptors.
992                 */
993                .pthresh = 36,
994
995                /* TX_HTHRESH host
996                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
997                 */
998                .hthresh = 0,
999
1000                /* TX_WTHRESH writeback
1001                 * Set on the NIC, the number of sent descriptors before writing back
1002                 * status to confirm the transmission. This is done more efficiently as
1003                 * a bulk DMA-transfer rather than writing one at a time.
1004                 * Similar to tx_free_thresh however this is applied to the NIC, where
1005                 * as tx_free_thresh is when DPDK will check these. This is extended
1006                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
1007                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
1008                 */
1009                .wthresh = 4,
1010        },
1011
1012        /* Used internally by DPDK rather than passed to the NIC. The number of
1013         * packet descriptors to send before checking for any responses written
1014         * back (to confirm the transmission). Default = 32 if set to 0)
1015         */
1016        .tx_free_thresh = 0,
1017
1018        /* This is the Report Status threshold, used by 10Gbit cards,
1019         * This signals the card to only write back status (such as
1020         * transmission successful) after this minimum number of transmit
1021         * descriptors are seen. The default is 32 (if set to 0) however if set
1022         * to greater than 1 TX wthresh must be set to zero, because this is kindof
1023         * a replacement. See the dpdk programmers guide for more restrictions.
1024         */
1025        .tx_rs_thresh = 1,
1026};
1027
1028/**
1029 * A callback for a link state change (LSC).
1030 *
1031 * Packets may be received before this notification. In fact the DPDK IGXBE
1032 * driver likes to put a delay upto 5sec before sending this.
1033 *
1034 * We use this to ensure the link speed is correct for our timestamp
1035 * calculations. Because packets might be received before the link up we still
1036 * update this when the packet is received.
1037 *
1038 * @param port The DPDK port
1039 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1040 * @param cb_arg The dpdk_format_data_t structure associated with the format
1041 */
1042static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1043                              void *cb_arg) {
1044        struct dpdk_format_data_t * format_data = cb_arg;
1045        struct rte_eth_link link_info;
1046        assert(event == RTE_ETH_EVENT_INTR_LSC);
1047        assert(port == format_data->port);
1048
1049        rte_eth_link_get_nowait(port, &link_info);
1050
1051        if (link_info.link_status)
1052                format_data->link_speed = link_info.link_speed;
1053        else
1054                format_data->link_speed = 0;
1055
1056#if DEBUG
1057        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1058                link_info.link_status ? "up" : "down",
1059                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1060                                          "full-duplex" : "half-duplex",
1061                (int) link_info.link_speed);
1062#endif
1063
1064        /* Turns out DPDK drivers might not come back up if the link speed
1065         * changes. So we reset the autoneg procedure. This is very unsafe
1066         * we have have threads reading packets and we stop the port. */
1067#if 0
1068        if (!link_info.link_status) {
1069                int ret;
1070                rte_eth_dev_stop(port);
1071                ret = rte_eth_dev_start(port);
1072                if (ret < 0) {
1073                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1074                                strerror(-ret));
1075                }
1076        }
1077#endif
1078}
1079
1080/** Reserve a DPDK lcore ID for a thread globally.
1081 *
1082 * @param real If true allocate a real lcore, otherwise allocate a core which
1083 * does not exist on the local machine.
1084 * @param socket the prefered NUMA socket - only used if a real core is requested
1085 * @return a valid core, which can later be used with dpdk_register_lcore() or a
1086 * -1 if have run out of cores.
1087 *
1088 * If any thread is reading or freeing packets we need to register it here
1089 * due to TLS caches in the memory pools.
1090 */
1091static int dpdk_reserve_lcore(bool real, int socket) {
1092        int new_id = -1;
1093        int i;
1094        struct rte_config *cfg = rte_eal_get_configuration();
1095
1096        pthread_mutex_lock(&dpdk_lock);
1097        /* If 'reading packets' fill in cores from 0 up and bind affinity
1098         * otherwise start from the MAX core (which is also the master) and work backwards
1099         * in this case physical cores on the system will not exist so we don't bind
1100         * these to any particular physical core */
1101        if (real) {
1102#if HAVE_LIBNUMA
1103                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1104                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
1105                                new_id = i;
1106                                if (!lcore_config[i].detected)
1107                                        new_id = -1;
1108                                break;
1109                        }
1110                }
1111#endif
1112                /* Retry without the the numa restriction */
1113                if (new_id == -1) {
1114                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1115                                if (!rte_lcore_is_enabled(i)) {
1116                                        new_id = i;
1117                                        if (!lcore_config[i].detected)
1118                                                fprintf(stderr, "Warning the"
1119                                                        " number of 'reading' "
1120                                                        "threads exceed cores\n");
1121                                        break;
1122                                }
1123                        }
1124                }
1125        } else {
1126                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1127                        if (!rte_lcore_is_enabled(i)) {
1128                                new_id = i;
1129                                break;
1130                        }
1131                }
1132        }
1133
1134        if (new_id != -1) {
1135                /* Enable the core in global DPDK structs */
1136                cfg->lcore_role[new_id] = ROLE_RTE;
1137                cfg->lcore_count++;
1138        }
1139
1140        pthread_mutex_unlock(&dpdk_lock);
1141        return new_id;
1142}
1143
1144/** Register a thread as a lcore
1145 * @param libtrace any error is set against libtrace on exit
1146 * @param real If this is a true lcore we will bind its affinty to the
1147 * requested core.
1148 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
1149 * @return 0, if successful otherwise -1 if an error occured (details are stored
1150 * in libtrace)
1151 *
1152 * @note This must be called from the thread being registered.
1153 */
1154static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
1155        int ret;
1156        RTE_PER_LCORE(_lcore_id) = lcore;
1157
1158        /* Set affinity bind to corresponding core */
1159        if (real) {
1160                cpu_set_t cpuset;
1161                CPU_ZERO(&cpuset);
1162                CPU_SET(rte_lcore_id(), &cpuset);
1163                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1164                if (ret != 0) {
1165                        trace_set_err(libtrace, errno, "Warning "
1166                                      "pthread_setaffinity_np failed");
1167                        return -1;
1168                }
1169        }
1170
1171        return 0;
1172}
1173
1174/** Allocates a new dpdk packet buffer memory pool.
1175 *
1176 * @param n The number of threads
1177 * @param pkt_size The packet size we need ot store
1178 * @param socket_id The NUMA socket id
1179 * @param A new mempool, if NULL query the DPDK library for the error code
1180 * see rte_mempool_create() documentation.
1181 *
1182 * This allocates a new pool or recycles an existing memory pool.
1183 * Call dpdk_free_memory() to free the memory.
1184 * We cannot delete memory so instead we store the pools, allowing them to be
1185 * re-used.
1186 */
1187static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1188                                             unsigned pkt_size,
1189                                             int socket_id) {
1190        struct rte_mempool *ret;
1191        size_t j,k;
1192        char name[MEMPOOL_NAME_LEN];
1193
1194        /* Add on packet size overheads */
1195        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1196
1197        pthread_mutex_lock(&dpdk_lock);
1198
1199        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1200                /* Best guess go for zero */
1201                socket_id = 0;
1202        }
1203
1204        /* Find a valid pool */
1205        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1206                if (mem_pools[socket_id][j]->size >= n &&
1207                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1208                        break;
1209                }
1210        }
1211
1212        /* Find the end (+1) of the list */
1213        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1214
1215        if (mem_pools[socket_id][j]) {
1216                ret = mem_pools[socket_id][j];
1217                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1218                mem_pools[socket_id][k-1] = NULL;
1219                mem_pools[socket_id][j] = NULL;
1220        } else {
1221                static uint32_t test = 10;
1222                test++;
1223                snprintf(name, MEMPOOL_NAME_LEN,
1224                         "libtrace_pool_%"PRIu32, test);
1225
1226                ret = rte_mempool_create(name, n, pkt_size,
1227                                         128, sizeof(struct rte_pktmbuf_pool_private),
1228                                         rte_pktmbuf_pool_init, NULL,
1229                                         rte_pktmbuf_init, NULL,
1230                                         socket_id, 0);
1231        }
1232
1233        pthread_mutex_unlock(&dpdk_lock);
1234        return ret;
1235}
1236
1237/** Stores the memory against the DPDK library.
1238 *
1239 * @param mempool The mempool to free
1240 * @param socket_id The NUMA socket this mempool was allocated upon.
1241 *
1242 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1243 * store the memory shared globally against the format.
1244 */
1245static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1246        size_t i;
1247        pthread_mutex_lock(&dpdk_lock);
1248
1249        /* We should have all entries back in the mempool */
1250        rte_mempool_audit(mempool);
1251        if (!rte_mempool_full(mempool)) {
1252                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1253                        "free all packets before finishing a trace\n",
1254                        rte_mempool_count(mempool), mempool->size);
1255        }
1256
1257        /* Find the end (+1) of the list */
1258        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1259
1260        if (i >= RTE_MAX_LCORE) {
1261                fprintf(stderr, "Too many memory pools, dropping this one\n");
1262        } else {
1263                mem_pools[socket_id][i] = mempool;
1264        }
1265
1266        pthread_mutex_unlock(&dpdk_lock);
1267}
1268
1269/* Attach memory to the port and start (or restart) the port/s.
1270 */
1271static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1272                              char *err, int errlen, uint16_t rx_queues) {
1273        int ret, i;
1274        struct rte_eth_link link_info; /* Wait for link */
1275        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1276
1277        /* Already started */
1278        if (format_data->paused == DPDK_RUNNING)
1279                return 0;
1280
1281        /* First time started we need to alloc our memory, doing this here
1282         * rather than in environment setup because we don't have snaplen then */
1283        if (format_data->paused == DPDK_NEVER_STARTED) {
1284                if (format_data->snaplen == 0) {
1285                        format_data->snaplen = RX_MBUF_SIZE;
1286                        port_conf.rxmode.jumbo_frame = 0;
1287                        port_conf.rxmode.max_rx_pkt_len = 0;
1288                } else {
1289                        /* Use jumbo frames */
1290                        port_conf.rxmode.jumbo_frame = 1;
1291                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1292                }
1293
1294#if GET_MAC_CRC_CHECKSUM
1295                /* This is additional overhead so make sure we allow space for this */
1296                format_data->snaplen += ETHER_CRC_LEN;
1297#endif
1298#if HAS_HW_TIMESTAMPS_82580
1299                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1300#endif
1301
1302                /* Create the mbuf pool, which is the place packets are allocated
1303                 * from - There is no free function (I cannot see one).
1304                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1305                 * allocate however that extra 1 packet is not used.
1306                 * (I assume <= vs < error some where in DPDK code)
1307                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1308                 * so that will fill the new buffer and wait until slots in the
1309                 * ring become available.
1310                 */
1311#if DEBUG
1312                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1313#endif
1314                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1315                                                              format_data->snaplen,
1316                                                              format_data->nic_numa_node);
1317
1318                if (format_data->pktmbuf_pool == NULL) {
1319                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1320                                 "pool failed: %s", strerror(rte_errno));
1321                        return -1;
1322                }
1323        }
1324
1325        /* ----------- Now do the setup for the port mapping ------------ */
1326        /* Order of calls must be
1327         * rte_eth_dev_configure()
1328         * rte_eth_tx_queue_setup()
1329         * rte_eth_rx_queue_setup()
1330         * rte_eth_dev_start()
1331         * other rte_eth calls
1332         */
1333
1334        /* This must be called first before another *eth* function
1335         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1336        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1337        if (ret < 0) {
1338                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1339                         " %"PRIu8" : %s", format_data->port,
1340                         strerror(-ret));
1341                return -1;
1342        }
1343#if DEBUG
1344        fprintf(stderr, "Doing dev configure\n");
1345#endif
1346        /* Initialise the TX queue a minimum value if using this port for
1347         * receiving. Otherwise a larger size if writing packets.
1348         */
1349        ret = rte_eth_tx_queue_setup(format_data->port,
1350                                     0 /* queue XXX */,
1351                                     format_data->nb_tx_buf,
1352                                     SOCKET_ID_ANY,
1353                                     &tx_conf);
1354        if (ret < 0) {
1355                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1356                         " on port %"PRIu8" : %s", format_data->port,
1357                         strerror(-ret));
1358                return -1;
1359        }
1360
1361        /* Attach memory to our RX queues */
1362        for (i=0; i < rx_queues; i++) {
1363                dpdk_per_stream_t *stream;
1364#if DEBUG
1365                fprintf(stderr, "Configuring queue %d\n", i);
1366#endif
1367
1368                /* Add storage for the stream */
1369                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1370                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1371                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1372                stream->queue_id = i;
1373
1374                if (stream->lcore == -1)
1375                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1376
1377                if (stream->lcore == -1) {
1378                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1379                                 ". Too many threads?");
1380                        return -1;
1381                }
1382
1383                if (stream->mempool == NULL) {
1384                        stream->mempool = dpdk_alloc_memory(
1385                                                  format_data->nb_rx_buf*2,
1386                                                  format_data->snaplen,
1387                                                  rte_lcore_to_socket_id(stream->lcore));
1388
1389                        if (stream->mempool == NULL) {
1390                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1391                                         "pool failed: %s", strerror(rte_errno));
1392                                return -1;
1393                        }
1394                }
1395
1396                /* Initialise the RX queue with some packets from memory */
1397                ret = rte_eth_rx_queue_setup(format_data->port,
1398                                             stream->queue_id,
1399                                             format_data->nb_rx_buf,
1400                                             format_data->nic_numa_node,
1401                                             &rx_conf,
1402                                             stream->mempool);
1403                if (ret < 0) {
1404                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1405                                 " RX queue on port %"PRIu8" : %s",
1406                                 format_data->port,
1407                                 strerror(-ret));
1408                        return -1;
1409                }
1410        }
1411
1412#if DEBUG
1413        fprintf(stderr, "Doing start device\n");
1414#endif
1415        rte_eth_stats_reset(format_data->port);
1416        /* Start device */
1417        ret = rte_eth_dev_start(format_data->port);
1418        if (ret < 0) {
1419                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1420                         strerror(-ret));
1421                return -1;
1422        }
1423
1424        /* Default promiscuous to on */
1425        if (format_data->promisc == -1)
1426                format_data->promisc = 1;
1427
1428        if (format_data->promisc == 1)
1429                rte_eth_promiscuous_enable(format_data->port);
1430        else
1431                rte_eth_promiscuous_disable(format_data->port);
1432
1433        /* We have now successfully started/unpased */
1434        format_data->paused = DPDK_RUNNING;
1435
1436
1437        /* Register a callback for link state changes */
1438        ret = rte_eth_dev_callback_register(format_data->port,
1439                                            RTE_ETH_EVENT_INTR_LSC,
1440                                            dpdk_lsc_callback,
1441                                            format_data);
1442#if DEBUG
1443        if (ret)
1444                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1445                        ret, strerror(-ret));
1446#endif
1447
1448        /* Get the current link status */
1449        rte_eth_link_get_nowait(format_data->port, &link_info);
1450        format_data->link_speed = link_info.link_speed;
1451#if DEBUG
1452        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1453                (int) link_info.link_duplex, (int) link_info.link_speed);
1454#endif
1455
1456        return 0;
1457}
1458
1459static int dpdk_start_input (libtrace_t *libtrace) {
1460        char err[500];
1461        err[0] = 0;
1462
1463        /* Make sure we don't reserve an extra thread for this */
1464        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1465
1466        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1467                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1468                free(libtrace->format_data);
1469                libtrace->format_data = NULL;
1470                return -1;
1471        }
1472        return 0;
1473}
1474
1475static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1476        struct rte_eth_dev_info dev_info;
1477        rte_eth_dev_info_get(port_id, &dev_info);
1478        return dev_info.max_rx_queues;
1479}
1480
1481static inline size_t dpdk_processor_count () {
1482        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1483        if (nb_cpu <= 0)
1484                return 1;
1485        else
1486                return (size_t) nb_cpu;
1487}
1488
1489static int dpdk_pstart_input (libtrace_t *libtrace) {
1490        char err[500];
1491        int i=0, phys_cores=0;
1492        int tot = libtrace->perpkt_thread_count;
1493        libtrace_list_node_t *n;
1494        err[0] = 0;
1495
1496        if (rte_lcore_id() != rte_get_master_lcore())
1497                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1498                        " from the master DPDK thread!\n");
1499
1500        /* If the master is not on the last thread we move it there */
1501        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1502                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1503                        return -1;
1504        }
1505
1506        /* Don't exceed the number of cores in the system/detected by dpdk
1507         * We don't have to force this but performance wont be good if we don't */
1508        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1509                if (lcore_config[i].detected) {
1510                        if (rte_lcore_is_enabled(i)) {
1511#if DEBUG
1512                                fprintf(stderr, "Found core %d already in use!\n", i);
1513#endif
1514                        } else {
1515                                phys_cores++;
1516                        }
1517                }
1518        }
1519        /* If we are restarting we have already allocated some threads as such
1520         * we add these back to the count for this calculation */
1521        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1522                dpdk_per_stream_t * stream = n->data;
1523                if (stream->lcore != -1)
1524                        phys_cores++;
1525        }
1526
1527        tot = MIN(libtrace->perpkt_thread_count,
1528                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1529        tot = MIN(tot, phys_cores);
1530
1531#if DEBUG
1532        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1533                libtrace->perpkt_thread_count, phys_cores);
1534#endif
1535
1536        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1537                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1538                free(libtrace->format_data);
1539                libtrace->format_data = NULL;
1540                return -1;
1541        }
1542
1543        /* Make sure we only start the number that we should */
1544        libtrace->perpkt_thread_count = tot;
1545        return 0;
1546}
1547
1548/**
1549 * Register a thread with the DPDK system,
1550 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1551 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1552 * gives it.
1553 *
1554 * We then allow a mapper thread to be started on every real core as DPDK would,
1555 * we also bind these to the corresponding CPU cores.
1556 *
1557 * @param libtrace A pointer to the trace
1558 * @param reading True if the thread will be used to read packets, i.e. will
1559 *                call pread_packet(), false if thread used to process packet
1560 *                in any other manner including statistics functions.
1561 */
1562static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1563{
1564#if DEBUG
1565        char name[99];
1566        pthread_getname_np(pthread_self(),
1567                           name, sizeof(name));
1568#endif
1569        if (reading) {
1570                dpdk_per_stream_t *stream;
1571                /* Attach our thread */
1572                if(t->type == THREAD_PERPKT) {
1573                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1574                        if (t->format_data == NULL) {
1575                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1576                                              "Too many threads registered");
1577                                return -1;
1578                        }
1579                } else {
1580                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1581                }
1582                stream = t->format_data;
1583#if DEBUG
1584                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1585#endif
1586                return dpdk_register_lcore(libtrace, true, stream->lcore);
1587        } else {
1588                int lcore = dpdk_reserve_lcore(reading, 0);
1589                if (lcore == -1) {
1590                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1591                                      " for DPDK");
1592                        return -1;
1593                }
1594#if DEBUG
1595                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1596#endif
1597                return dpdk_register_lcore(libtrace, false, lcore);
1598        }
1599
1600        return 0;
1601}
1602
1603/**
1604 * Unregister a thread with the DPDK system.
1605 *
1606 * Only previously registered threads should be calling this just before
1607 * they are destroyed.
1608 */
1609static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1610{
1611        struct rte_config *cfg = rte_eal_get_configuration();
1612
1613        assert(rte_lcore_id() < RTE_MAX_LCORE);
1614        pthread_mutex_lock(&dpdk_lock);
1615        /* Skip if master */
1616        if (rte_lcore_id() == rte_get_master_lcore()) {
1617                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1618                pthread_mutex_unlock(&dpdk_lock);
1619                return;
1620        }
1621
1622        /* Disable this core in global DPDK structs */
1623        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1624        cfg->lcore_count--;
1625        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1626        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1627        pthread_mutex_unlock(&dpdk_lock);
1628        return;
1629}
1630
1631static int dpdk_start_output(libtrace_out_t *libtrace)
1632{
1633        char err[500];
1634        err[0] = 0;
1635
1636        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1637                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1638                free(libtrace->format_data);
1639                libtrace->format_data = NULL;
1640                return -1;
1641        }
1642        return 0;
1643}
1644
1645static int dpdk_pause_input(libtrace_t * libtrace) {
1646        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1647        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1648        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1649#if DEBUG
1650                fprintf(stderr, "Pausing DPDK port\n");
1651#endif
1652                rte_eth_dev_stop(FORMAT(libtrace)->port);
1653                FORMAT(libtrace)->paused = DPDK_PAUSED;
1654                /* Empty the queue of packets */
1655                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1656                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1657                }
1658                FORMAT(libtrace)->burst_offset = 0;
1659                FORMAT(libtrace)->burst_size = 0;
1660
1661                for (; tmp != NULL; tmp = tmp->next) {
1662                        dpdk_per_stream_t *stream = tmp->data;
1663                        stream->ts_last_sys = 0;
1664#if HAS_HW_TIMESTAMPS_82580
1665                        stream->ts_first_sys = 0;
1666#endif
1667                }
1668
1669        }
1670        return 0;
1671}
1672
1673static int dpdk_write_packet(libtrace_out_t *trace,
1674                             libtrace_packet_t *packet){
1675        struct rte_mbuf* m_buff[1];
1676
1677        int wirelen = trace_get_wire_length(packet);
1678        int caplen = trace_get_capture_length(packet);
1679
1680        /* Check for a checksum and remove it */
1681        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1682            wirelen == caplen)
1683                caplen -= ETHER_CRC_LEN;
1684
1685        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1686        if (m_buff[0] == NULL) {
1687                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1688                return -1;
1689        } else {
1690                int ret;
1691                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1692                do {
1693                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1694                } while (ret != 1);
1695        }
1696
1697        return 0;
1698}
1699
1700static int dpdk_fin_input(libtrace_t * libtrace) {
1701        libtrace_list_node_t * n;
1702        /* Free our memory structures */
1703        if (libtrace->format_data != NULL) {
1704
1705                if (FORMAT(libtrace)->port != 0xFF)
1706                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1707                                                        RTE_ETH_EVENT_INTR_LSC,
1708                                                        dpdk_lsc_callback,
1709                                                        FORMAT(libtrace));
1710                /* Close the device completely, device cannot be restarted */
1711                rte_eth_dev_close(FORMAT(libtrace)->port);
1712
1713                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1714                                 FORMAT(libtrace)->nic_numa_node);
1715
1716                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1717                        dpdk_per_stream_t * stream = n->data;
1718                        if (stream->mempool)
1719                                dpdk_free_memory(stream->mempool,
1720                                                 rte_lcore_to_socket_id(stream->lcore));
1721                }
1722
1723                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1724                /* filter here if we used it */
1725                free(libtrace->format_data);
1726        }
1727
1728        return 0;
1729}
1730
1731
1732static int dpdk_fin_output(libtrace_out_t * libtrace) {
1733        /* Free our memory structures */
1734        if (libtrace->format_data != NULL) {
1735                /* Close the device completely, device cannot be restarted */
1736                if (FORMAT(libtrace)->port != 0xFF)
1737                        rte_eth_dev_close(FORMAT(libtrace)->port);
1738                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1739                /* filter here if we used it */
1740                free(libtrace->format_data);
1741        }
1742
1743        return 0;
1744}
1745
1746/**
1747 * Get the start of the additional header that we added to a packet.
1748 */
1749static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1750        assert(packet);
1751        assert(packet->buffer);
1752        /* Our header sits straight after the mbuf header */
1753        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1754}
1755
1756static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1757        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1758        return hdr->cap_len;
1759}
1760
1761static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1762        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1763        if (size > hdr->cap_len) {
1764                /* Cannot make a packet bigger */
1765                return trace_get_capture_length(packet);
1766        }
1767
1768        /* Reset the cached capture length first*/
1769        packet->capture_length = -1;
1770        hdr->cap_len = (uint32_t) size;
1771        return trace_get_capture_length(packet);
1772}
1773
1774static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1775        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1776        int org_cap_size; /* The original capture size */
1777        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1778                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1779                               sizeof(struct hw_timestamp_82580);
1780        } else {
1781                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1782        }
1783        if (hdr->flags & INCLUDES_CHECKSUM) {
1784                return org_cap_size;
1785        } else {
1786                /* DPDK packets are always TRACE_TYPE_ETH packets */
1787                return org_cap_size + ETHER_CRC_LEN;
1788        }
1789}
1790
1791static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1792        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1793        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1794                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1795                                sizeof(struct hw_timestamp_82580);
1796        else
1797                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1798}
1799
1800static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1801                               libtrace_packet_t *packet, void *buffer,
1802                               libtrace_rt_types_t rt_type, uint32_t flags) {
1803        assert(packet);
1804        if (packet->buffer != buffer &&
1805            packet->buf_control == TRACE_CTRL_PACKET) {
1806                free(packet->buffer);
1807        }
1808
1809        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1810                packet->buf_control = TRACE_CTRL_PACKET;
1811        else
1812                packet->buf_control = TRACE_CTRL_EXTERNAL;
1813
1814        packet->buffer = buffer;
1815        packet->header = buffer;
1816
1817        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1818        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1819        packet->type = rt_type;
1820        return 0;
1821}
1822
1823/**
1824 * Given a packet size and a link speed, computes the
1825 * time to transmit in nanoseconds.
1826 *
1827 * @param format_data The dpdk format data from which we get the link speed
1828 *        and if unset updates it in a thread safe manner
1829 * @param pkt_size The size of the packet in bytes
1830 * @return The wire time in nanoseconds
1831 */
1832static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1833        uint32_t wire_time;
1834        /* 20 extra bytes of interframe gap and preamble */
1835# if GET_MAC_CRC_CHECKSUM
1836        wire_time = ((pkt_size + 20) * 8000);
1837# else
1838        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1839# endif
1840
1841        /* Division is really slow and introduces a pipeline stall
1842         * The compiler will optimise this into magical multiplication and shifting
1843         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1844         */
1845retry_calc_wiretime:
1846        switch (format_data->link_speed) {
1847        case ETH_LINK_SPEED_40G:
1848                wire_time /=  ETH_LINK_SPEED_40G;
1849                break;
1850        case ETH_LINK_SPEED_20G:
1851                wire_time /= ETH_LINK_SPEED_20G;
1852                break;
1853        case ETH_LINK_SPEED_10G:
1854                wire_time /= ETH_LINK_SPEED_10G;
1855                break;
1856        case ETH_LINK_SPEED_1000:
1857                wire_time /= ETH_LINK_SPEED_1000;
1858                break;
1859        case 0:
1860                {
1861                /* Maybe the link was down originally, but now it should be up */
1862                struct rte_eth_link link = {0};
1863                rte_eth_link_get_nowait(format_data->port, &link);
1864                if (link.link_status && link.link_speed) {
1865                        format_data->link_speed = link.link_speed;
1866#ifdef DEBUG
1867                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1868#endif
1869                        goto retry_calc_wiretime;
1870                }
1871                /* We don't know the link speed, make sure numbers are counting up */
1872                wire_time = 1;
1873                break;
1874                }
1875        default:
1876                wire_time /= format_data->link_speed;
1877        }
1878        return wire_time;
1879}
1880
1881/**
1882 * Does any extra preperation to all captured packets
1883 * This includes adding our extra header to it with the timestamp,
1884 * and any snapping
1885 *
1886 * @param format_data The DPDK format data
1887 * @param plc The DPDK per lcore format data
1888 * @param pkts An array of size nb_pkts of DPDK packets
1889 */
1890static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1891                                   struct dpdk_per_stream_t *plc,
1892                                   struct rte_mbuf **pkts,
1893                                   size_t nb_pkts) {
1894        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1895        struct dpdk_addt_hdr *hdr;
1896        size_t i;
1897        uint64_t cur_sys_time_ns;
1898#if HAS_HW_TIMESTAMPS_82580
1899        struct hw_timestamp_82580 *hw_ts;
1900        uint64_t estimated_wraps;
1901#else
1902
1903#endif
1904
1905#if USE_CLOCK_GETTIME
1906        struct timespec cur_sys_time = {0};
1907        /* This looks terrible and I feel bad doing it. But it's OK
1908         * on new kernels, because this is a fast vsyscall */
1909        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1910        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1911#else
1912        struct timeval cur_sys_time = {0};
1913        /* Also a fast vsyscall */
1914        gettimeofday(&cur_sys_time, NULL);
1915        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1916#endif
1917
1918        /* The system clock is not perfect so when running
1919         * at linerate we could timestamp a packet in the past.
1920         * To avoid this we munge the timestamp to appear 1ns
1921         * after the previous packet. We should eventually catch up
1922         * to system time since a 64byte packet on a 10G link takes 67ns.
1923         *
1924         * Note with parallel readers timestamping packets
1925         * with duplicate stamps or out of order is unavoidable without
1926         * hardware timestamping from the NIC.
1927         */
1928#if !HAS_HW_TIMESTAMPS_82580
1929        if (plc->ts_last_sys >= cur_sys_time_ns) {
1930                cur_sys_time_ns = plc->ts_last_sys + 1;
1931        }
1932#endif
1933
1934        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1935        for (i = 0 ; i < nb_pkts ; ++i) {
1936
1937                /* We put our header straight after the dpdk header */
1938                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1939                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1940
1941#if GET_MAC_CRC_CHECKSUM
1942                /* Add back in the CRC sum */
1943                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1944                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1945                hdr->flags |= INCLUDES_CHECKSUM;
1946#endif
1947
1948                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1949
1950#if HAS_HW_TIMESTAMPS_82580
1951                /* The timestamp is sitting before our packet and is included in pkt_len */
1952                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1953                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1954                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1955
1956                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1957                 *
1958                 *        +----------+---+   +--------------+
1959                 *  82580 |    24    | 8 |   |      32      |
1960                 *        +----------+---+   +--------------+
1961                 *          reserved  \______ 40 bits _____/
1962                 *
1963                 * The 40 bit 82580 SYSTIM overflows every
1964                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1965                 *
1966                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1967                 * Endian (for the full 64 bits) i.e. picture is mirrored
1968                 */
1969
1970                /* Despite what the documentation says this is in Little
1971                 * Endian byteorder. Mask the reserved section out.
1972                 */
1973                hdr->timestamp = le64toh(hw_ts->timestamp) &
1974                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1975
1976                if (unlikely(plc->ts_first_sys == 0)) {
1977                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1978                        plc->ts_last_sys = plc->ts_first_sys;
1979                }
1980
1981                /* This will have serious problems if packets aren't read quickly
1982                 * that is within a couple of seconds because our clock cycles every
1983                 * 18 seconds */
1984                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1985                                  / (1ull<<TS_NBITS_82580);
1986
1987                /* Estimated_wraps gives the number of times the counter should have
1988                 * wrapped (however depending on value last time it could have wrapped
1989                 * twice more (if hw clock is close to its max value) or once less (allowing
1990                 * for a bit of variance between hw and sys clock). But if the clock
1991                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1992                if (unlikely(estimated_wraps >= 2)) {
1993                        /* 2 or more wrap arounds add all but the very last wrap */
1994                        plc->wrap_count += estimated_wraps - 1;
1995                }
1996
1997                /* Set the timestamp to the lowest possible value we're considering */
1998                hdr->timestamp += plc->ts_first_sys +
1999                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
2000
2001                /* In most runs only the first if() will need evaluating - i.e our
2002                 * estimate is correct. */
2003                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
2004                                              hdr->timestamp, MAXSKEW_82580))) {
2005                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
2006                        plc->wrap_count++;
2007                        hdr->timestamp += (1ull<<TS_NBITS_82580);
2008                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
2009                                             hdr->timestamp, MAXSKEW_82580)) {
2010                                /* Failed to match estimated_wraps */
2011                                plc->wrap_count++;
2012                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2013                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2014                                                     hdr->timestamp, MAXSKEW_82580)) {
2015                                        if (estimated_wraps == 0) {
2016                                                /* 0 case Failed to match estimated_wraps+2 */
2017                                                printf("WARNING - Hardware Timestamp failed to"
2018                                                       " match using systemtime!\n");
2019                                                hdr->timestamp = cur_sys_time_ns;
2020                                        } else {
2021                                                /* Failed to match estimated_wraps+1 */
2022                                                plc->wrap_count++;
2023                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2024                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2025                                                                     hdr->timestamp, MAXSKEW_82580)) {
2026                                                        /* Failed to match estimated_wraps+2 */
2027                                                        printf("WARNING - Hardware Timestamp failed to"
2028                                                               " match using systemtime!!\n");
2029                                                }
2030                                        }
2031                                }
2032                        }
2033                }
2034#else
2035
2036                hdr->timestamp = cur_sys_time_ns;
2037                /* Offset the next packet by the wire time of previous */
2038                calculate_wire_time(format_data, hdr->cap_len);
2039
2040#endif
2041        }
2042
2043        plc->ts_last_sys = cur_sys_time_ns;
2044        return;
2045}
2046
2047
2048static void dpdk_fin_packet(libtrace_packet_t *packet)
2049{
2050        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2051                rte_pktmbuf_free(packet->buffer);
2052                packet->buffer = NULL;
2053        }
2054}
2055
2056/** Reads at least one packet or returns an error
2057 */
2058static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
2059                                           dpdk_per_stream_t *stream,
2060                                           libtrace_message_queue_t *mesg,
2061                                           struct rte_mbuf* pkts_burst[],
2062                                           size_t nb_packets) {
2063        size_t nb_rx; /* Number of rx packets we've recevied */
2064        while (1) {
2065                /* Poll for a batch of packets */
2066                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2067                                         stream->queue_id, pkts_burst, nb_packets);
2068                if (nb_rx > 0) {
2069                        /* Got some packets - otherwise we keep spining */
2070                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
2071                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2072                        return nb_rx;
2073                }
2074                /* Check the message queue this could be less than 0 */
2075                if (mesg && libtrace_message_queue_count(mesg) > 0)
2076                        return READ_MESSAGE;
2077                if (libtrace_halt)
2078                        return READ_EOF;
2079                /* Wait a while, polling on memory degrades performance
2080                 * This relieves the pressure on memory allowing the NIC to DMA */
2081                rte_delay_us(10);
2082        }
2083
2084        /* We'll never get here - but if we did it would be bad */
2085        return READ_ERROR;
2086}
2087
2088static int dpdk_pread_packets (libtrace_t *libtrace,
2089                                    libtrace_thread_t *t,
2090                                    libtrace_packet_t **packets,
2091                                    size_t nb_packets) {
2092        int nb_rx; /* Number of rx packets we've recevied */
2093        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2094        int i;
2095        dpdk_per_stream_t *stream = t->format_data;
2096
2097        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
2098                                         pkts_burst, nb_packets);
2099
2100        if (nb_rx > 0) {
2101                for (i = 0; i < nb_rx; ++i) {
2102                        if (packets[i]->buffer != NULL) {
2103                                /* The packet should always be finished */
2104                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
2105                                free(packets[i]->buffer);
2106                        }
2107                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2108                        packets[i]->type = TRACE_RT_DATA_DPDK;
2109                        packets[i]->buffer = pkts_burst[i];
2110                        packets[i]->trace = libtrace;
2111                        packets[i]->error = 1;
2112                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2113                }
2114        }
2115
2116        return nb_rx;
2117}
2118
2119static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2120        int nb_rx; /* Number of rx packets we've received */
2121        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2122
2123        /* Free the last packet buffer */
2124        if (packet->buffer != NULL) {
2125                /* The packet should always be finished */
2126                assert(packet->buf_control == TRACE_CTRL_PACKET);
2127                free(packet->buffer);
2128                packet->buffer = NULL;
2129        }
2130
2131        packet->buf_control = TRACE_CTRL_EXTERNAL;
2132        packet->type = TRACE_RT_DATA_DPDK;
2133
2134        /* Check if we already have some packets buffered */
2135        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2136                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2137                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2138                return 1; // TODO should be bytes read, which essentially useless anyway
2139        }
2140
2141        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2142                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2143
2144        if (nb_rx > 0) {
2145                FORMAT(libtrace)->burst_size = nb_rx;
2146                FORMAT(libtrace)->burst_offset = 1;
2147                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2148                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2149                return 1;
2150        }
2151        return nb_rx;
2152}
2153
2154static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2155        struct timeval tv;
2156        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2157
2158        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2159        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2160        return tv;
2161}
2162
2163static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2164        struct timespec ts;
2165        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2166
2167        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2168        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2169        return ts;
2170}
2171
2172static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2173        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2174}
2175
2176static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2177        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2178        return (libtrace_direction_t) hdr->direction;
2179}
2180
2181static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2182        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2183        hdr->direction = (uint8_t) direction;
2184        return (libtrace_direction_t) hdr->direction;
2185}
2186
2187static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2188        struct rte_eth_stats dev_stats = {0};
2189
2190        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2191                return;
2192
2193        /* Grab the current stats */
2194        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2195
2196        stats->captured_valid = true;
2197        stats->captured = dev_stats.ipackets;
2198
2199        /* Not that we support adding filters but if we did this
2200         * would work */
2201        stats->filtered += dev_stats.fdirmiss;
2202
2203        stats->dropped_valid = true;
2204        stats->dropped = dev_stats.imissed;
2205
2206        /* DPDK errors includes drops */
2207        stats->errors_valid = true;
2208        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2209
2210        stats->received_valid = true;
2211        stats->received = dev_stats.ipackets + dev_stats.imissed;
2212
2213}
2214
2215/* Attempts to read a packet in a non-blocking fashion. If one is not
2216 * available a SLEEP event is returned. We do not have the ability to
2217 * create a select()able file descriptor in DPDK.
2218 */
2219static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2220                                            libtrace_packet_t *packet) {
2221        libtrace_eventobj_t event = {0,0,0.0,0};
2222        int nb_rx; /* Number of receive packets we've read */
2223        struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2224
2225        do {
2226
2227                /* See if we already have a packet waiting */
2228                nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2229                                         FORMAT_DATA_FIRST(trace)->queue_id,
2230                                         pkts_burst, 1);
2231
2232                if (nb_rx > 0) {
2233                        /* Free the last packet buffer */
2234                        if (packet->buffer != NULL) {
2235                                /* The packet should always be finished */
2236                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2237                                free(packet->buffer);
2238                                packet->buffer = NULL;
2239                        }
2240
2241                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2242                        packet->type = TRACE_RT_DATA_DPDK;
2243                        event.type = TRACE_EVENT_PACKET;
2244                        dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
2245                        packet->buffer = FORMAT(trace)->burst_pkts[0];
2246                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2247                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2248
2249                        /* XXX - Check this passes the filter trace_read_packet normally
2250                         * does this for us but this wont */
2251                        if (trace->filter) {
2252                                if (!trace_apply_filter(trace->filter, packet)) {
2253                                        /* Failed the filter so we loop for another packet */
2254                                        trace->filtered_packets ++;
2255                                        continue;
2256                                }
2257                        }
2258                        trace->accepted_packets ++;
2259                } else {
2260                        /* We only want to sleep for a very short time - we are non-blocking */
2261                        event.type = TRACE_EVENT_SLEEP;
2262                        event.seconds = 0.0001;
2263                        event.size = 0;
2264                }
2265
2266                /* If we get here we have our event */
2267                break;
2268        } while (1);
2269
2270        return event;
2271}
2272
2273static void dpdk_help(void) {
2274        printf("dpdk format module: $Revision: 1752 $\n");
2275        printf("Supported input URIs:\n");
2276        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2277        printf("\tThe -<coreid> is optional \n");
2278        printf("\t e.g. dpdk:0000:01:00.1\n");
2279        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2280        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2281        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2282        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2283        printf("\n");
2284        printf("Supported output URIs:\n");
2285        printf("\tSame format as the input URI.\n");
2286        printf("\t e.g. dpdk:0000:01:00.1\n");
2287        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2288        printf("\n");
2289}
2290
2291static struct libtrace_format_t dpdk = {
2292        "dpdk",
2293        "$Id$",
2294        TRACE_FORMAT_DPDK,
2295        NULL,                               /* probe filename */
2296        NULL,                               /* probe magic */
2297        dpdk_init_input,                    /* init_input */
2298        dpdk_config_input,                  /* config_input */
2299        dpdk_start_input,                   /* start_input */
2300        dpdk_pause_input,                   /* pause_input */
2301        dpdk_init_output,                   /* init_output */
2302        NULL,                               /* config_output */
2303        dpdk_start_output,                  /* start_ouput */
2304        dpdk_fin_input,                     /* fin_input */
2305        dpdk_fin_output,                    /* fin_output */
2306        dpdk_read_packet,                   /* read_packet */
2307        dpdk_prepare_packet,                /* prepare_packet */
2308        dpdk_fin_packet,                    /* fin_packet */
2309        dpdk_write_packet,                  /* write_packet */
2310        dpdk_get_link_type,                 /* get_link_type */
2311        dpdk_get_direction,                 /* get_direction */
2312        dpdk_set_direction,                 /* set_direction */
2313        NULL,                               /* get_erf_timestamp */
2314        dpdk_get_timeval,                   /* get_timeval */
2315        dpdk_get_timespec,                  /* get_timespec */
2316        NULL,                               /* get_seconds */
2317        NULL,                               /* seek_erf */
2318        NULL,                               /* seek_timeval */
2319        NULL,                               /* seek_seconds */
2320        dpdk_get_capture_length,            /* get_capture_length */
2321        dpdk_get_wire_length,               /* get_wire_length */
2322        dpdk_get_framing_length,            /* get_framing_length */
2323        dpdk_set_capture_length,            /* set_capture_length */
2324        NULL,                               /* get_received_packets */
2325        NULL,                               /* get_filtered_packets */
2326        NULL,                               /* get_dropped_packets */
2327        dpdk_get_stats,                     /* get_statistics */
2328        NULL,                               /* get_fd */
2329        dpdk_trace_event,                   /* trace_event */
2330        dpdk_help,                          /* help */
2331        NULL,                               /* next pointer */
2332        {true, 8},                          /* Live, NICs typically have 8 threads */
2333        dpdk_pstart_input,                  /* pstart_input */
2334        dpdk_pread_packets,                 /* pread_packets */
2335        dpdk_pause_input,                   /* ppause */
2336        dpdk_fin_input,                     /* p_fin */
2337        dpdk_pconfig_input,                 /* pconfig_input */
2338        dpdk_pregister_thread,              /* pregister_thread */
2339        dpdk_punregister_thread,            /* punregister_thread */
2340        NULL                                /* get thread stats */
2341};
2342
2343void dpdk_constructor(void) {
2344        register_format(&dpdk);
2345}
Note: See TracBrowser for help on using the repository browser.