source: lib/format_dpdk.c @ a984307

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since a984307 was a984307, checked in by Shane Alcock <salcock@…>, 5 years ago

Merge remote-tracking branch 'origin/develop' into libtrace4

Conflicts:

INSTALL
README
lib/format_dpdk.c
lib/trace.c
tools/tracesplit/tracesplit.c

  • Property mode set to 100644
File size: 74.6 KB
Line 
1
2/*
3 * This file is part of libtrace
4 *
5 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
6 * New Zealand.
7 *
8 * Author: Richard Sanger
9 *
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
30 *
31 */
32
33/* This format module deals with using the Intel Data Plane Development
34 * Kit capture format.
35 *
36 * Intel Data Plane Development Kit is a LIVE capture format.
37 *
38 * This format also supports writing which will write packets out to the
39 * network as a form of packet replay. This should not be confused with the
40 * RT protocol which is intended to transfer captured packet records between
41 * RT-speaking programs.
42 */
43
44#define _GNU_SOURCE
45
46#include "config.h"
47#include "libtrace.h"
48#include "libtrace_int.h"
49#include "format_helper.h"
50#include "libtrace_arphrd.h"
51#include "hash_toeplitz.h"
52
53#ifdef HAVE_INTTYPES_H
54#  include <inttypes.h>
55#else
56# error "Can't find inttypes.h"
57#endif
58
59#include <stdlib.h>
60#include <assert.h>
61#include <unistd.h>
62#include <endian.h>
63#include <string.h>
64
65#if HAVE_LIBNUMA
66#include <numa.h>
67#endif
68
69/* We can deal with any minor differences by checking the RTE VERSION
70 * Typically DPDK backports some fixes (typically for building against
71 * newer kernels) to the older version of DPDK.
72 *
73 * These get released with the rX suffix. The following macros where added
74 * in these new releases.
75 *
76 * Below this is a log of version that required changes to the libtrace
77 * code (that we still attempt to support).
78 *
79 * DPDK v1.7.1 is recommended.
80 * However 1.5 to 1.8 are likely supported.
81 */
82#include <rte_eal.h>
83#include <rte_version.h>
84#ifndef RTE_VERSION_NUM
85#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
86#endif
87#ifndef RTE_VER_PATCH_RELEASE
88#       define RTE_VER_PATCH_RELEASE 0
89#endif
90#ifndef RTE_VERSION
91#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
92        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
93#endif
94
95/* 1.6.0r2 :
96 *      rte_eal_pci_set_blacklist() is removed
97 *      device_list is renamed to pci_device_list
98 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
99 *      as such we do apply the whitelist before rte_eal_init.
100 *      This also works correctly with DPDK 1.6.0r2.
101 *
102 * Replaced by:
103 *      rte_devargs (we can simply whitelist)
104 */
105#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
106#       define DPDK_USE_BLACKLIST 1
107#else
108#       define DPDK_USE_BLACKLIST 0
109#endif
110
111/*
112 * 1.7.0 :
113 *      rte_pmd_init_all is removed
114 *
115 * Replaced by:
116 *      Nothing, no longer needed
117 */
118#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
119#       define DPDK_USE_PMD_INIT 1
120#else
121#       define DPDK_USE_PMD_INIT 0
122#endif
123
124/* 1.7.0-rc3 :
125 *
126 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
127 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
128 * it twice.
129 */
130#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
131#       define DPDK_USE_PCI_PROBE 1
132#else
133#       define DPDK_USE_PCI_PROBE 0
134#endif
135
136/* 1.8.0-rc1 :
137 * LOG LEVEL is a command line option which overrides what
138 * we previously set it to.
139 */
140#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
141#       define DPDK_USE_LOG_LEVEL 1
142#else
143#       define DPDK_USE_LOG_LEVEL 0
144#endif
145
146/* 1.8.0-rc2
147 * rx/tx_conf thresholds can be set to NULL in rte_eth_rx/tx_queue_setup
148 * this uses the default values, which are better tuned per device
149 * See issue #26
150 */
151#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 2)
152#       define DPDK_USE_NULL_QUEUE_CONFIG 1
153#else
154#       define DPDK_USE_NULL_QUEUE_CONFIG 0
155#endif
156
157#include <rte_per_lcore.h>
158#include <rte_debug.h>
159#include <rte_errno.h>
160#include <rte_common.h>
161#include <rte_log.h>
162#include <rte_memcpy.h>
163#include <rte_prefetch.h>
164#include <rte_branch_prediction.h>
165#include <rte_pci.h>
166#include <rte_ether.h>
167#include <rte_ethdev.h>
168#include <rte_ring.h>
169#include <rte_mempool.h>
170#include <rte_mbuf.h>
171#include <rte_launch.h>
172#include <rte_lcore.h>
173#include <rte_per_lcore.h>
174#include <rte_cycles.h>
175#include <pthread.h>
176#ifdef __FreeBSD__
177#include <pthread_np.h>
178#endif
179
180
181/* The default size of memory buffers to use - This is the max size of standard
182 * ethernet packet less the size of the MAC CHECKSUM */
183#define RX_MBUF_SIZE 1514
184
185/* The minimum number of memory buffers per queue tx or rx. Search for
186 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
187 */
188#define MIN_NB_BUF 64
189
190/* Number of receive memory buffers to use
191 * By default this is limited by driver to 4k and must be a multiple of 128.
192 * A modification can be made to the driver to remove this limit.
193 * This can be increased in the driver and here.
194 * Should be at least MIN_NB_BUF.
195 */
196#define NB_RX_MBUF 4096
197
198/* Number of send memory buffers to use.
199 * Same limits apply as those to NB_TX_MBUF.
200 */
201#define NB_TX_MBUF 1024
202
203/* The size of the PCI blacklist needs to be big enough to contain
204 * every PCI device address (listed by lspci every bus:device.function tuple).
205 */
206#define BLACK_LIST_SIZE 50
207
208/* The maximum number of characters the mempool name can be */
209#define MEMPOOL_NAME_LEN 20
210
211/* For single threaded libtrace we read packets as a batch/burst
212 * this is the maximum size of said burst */
213#define BURST_SIZE 50
214
215#define MBUF(x) ((struct rte_mbuf *) x)
216/* Get the original placement of the packet data */
217#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
218#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
219#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
220
221#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
222#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
223
224#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
225                        (uint64_t) tv.tv_usec*1000ull)
226#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
227                        (uint64_t) ts.tv_nsec)
228
229#if RTE_PKTMBUF_HEADROOM != 128
230#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
231         "any libtrace instance processing these packet must be have the" \
232         "same RTE_PKTMBUF_HEADROOM set"
233#endif
234
235/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
236 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
237 *
238 * Make sure you understand what these are doing before enabling them.
239 * They might make traces incompatable with other builds etc.
240 *
241 * These are also included to show how to do somethings which aren't
242 * obvious in the DPDK documentation.
243 */
244
245/* Print verbose messages to stderr */
246#define DEBUG 0
247
248/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
249 * only turn on if you know clock_gettime is a vsyscall on your system
250 * overwise could be a large overhead. Again gettimeofday() should be
251 * vsyscall also if it's not you should seriously consider updating your
252 * kernel.
253 */
254#ifdef HAVE_CLOCK_GETTIME
255/* You can turn this on (set to 1) to prefer clock_gettime */
256#define USE_CLOCK_GETTIME 1
257#else
258/* DON'T CHANGE THIS !!! */
259#define USE_CLOCK_GETTIME 0
260#endif
261
262/* This is fairly safe to turn on - currently there appears to be a 'bug'
263 * in DPDK that will remove the checksum by making the packet appear 4bytes
264 * smaller than what it really is. Most formats don't include the checksum
265 * hence writing out a port such as int: ring: and dpdk: assumes there
266 * is no checksum and will attempt to write the checksum as part of the
267 * packet
268 */
269#define GET_MAC_CRC_CHECKSUM 0
270
271/* This requires a modification of the pmd drivers (inside Intel DPDK)
272 * TODO this requires updating (packet sizes are wrong TS most likely also)
273 */
274#define HAS_HW_TIMESTAMPS_82580 0
275
276#if HAS_HW_TIMESTAMPS_82580
277# define TS_NBITS_82580     40
278/* The maximum on the +ve or -ve side that we can be, make it half way */
279# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
280#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
281#endif
282
283static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
284/* Memory pools Per NUMA node */
285static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
286
287/* As per Intel 82580 specification - mismatch in 82580 datasheet
288 * it states ts is stored in Big Endian, however its actually Little */
289struct hw_timestamp_82580 {
290        uint64_t reserved;
291        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
292};
293
294enum paused_state {
295        DPDK_NEVER_STARTED,
296        DPDK_RUNNING,
297        DPDK_PAUSED,
298};
299
300struct dpdk_per_stream_t
301{
302        uint16_t queue_id;
303        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
304        struct rte_mempool *mempool;
305        int lcore;
306#if HAS_HW_TIMESTAMPS_82580
307        /* Timestamping only relevent to RX */
308        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
309        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
310#endif
311} ALIGN_STRUCT(CACHE_LINE_SIZE);
312
313#if HAS_HW_TIMESTAMPS_82580
314#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
315#else
316#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
317#endif
318
319typedef struct dpdk_per_stream_t dpdk_per_stream_t;
320
321/* Used by both input and output however some fields are not used
322 * for output */
323struct dpdk_format_data_t {
324        int8_t promisc; /* promiscuous mode - RX only */
325        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
326        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
327        uint8_t paused; /* See paused_state */
328        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
329        int snaplen; /* The snap length for the capture - RX only */
330        /* We always have to setup both rx and tx queues even if we don't want them */
331        int nb_rx_buf; /* The number of packet buffers in the rx ring */
332        int nb_tx_buf; /* The number of packet buffers in the tx ring */
333        int nic_numa_node; /* The NUMA node that the NIC is attached to */
334        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
335#if DPDK_USE_BLACKLIST
336        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
337        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
338#endif
339        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
340        uint8_t rss_key[40]; // This is the RSS KEY
341        /* To improve single-threaded performance we always batch reading
342         * packets, in a burst, otherwise the parallel library does this for us */
343        struct rte_mbuf* burst_pkts[BURST_SIZE];
344        int burst_size; /* The total number read in the burst */
345        int burst_offset; /* The offset we are into the burst */
346
347        /* Our parallel streams */
348        libtrace_list_t *per_stream;
349};
350
351enum dpdk_addt_hdr_flags {
352        INCLUDES_CHECKSUM = 0x1,
353        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
354};
355
356/**
357 * A structure placed in front of the packet where we can store
358 * additional information about the given packet.
359 * +--------------------------+
360 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
361 * +--------------------------+
362 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
363 * +--------------------------+
364 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
365 * +--------------------------+
366 * *   hw_timestamp_82580     * 16 bytes Optional
367 * +--------------------------+
368 * |       Packet data        | Variable Size
369 * |                          |
370 */
371struct dpdk_addt_hdr {
372        uint64_t timestamp;
373        uint8_t flags;
374        uint8_t direction;
375        uint8_t reserved1;
376        uint8_t reserved2;
377        uint32_t cap_len; /* The size to say the capture is */
378};
379
380/**
381 * We want to blacklist all devices except those on the whitelist
382 * (I say list, but yes it is only the one).
383 *
384 * The default behaviour of rte_pci_probe() will map every possible device
385 * to its DPDK driver. The DPDK driver will take the ethernet device
386 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
387 *
388 * So blacklist all devices except the one that we wish to use so that
389 * the others can still be used as standard ethernet ports.
390 *
391 * @return 0 if successful, otherwise -1 on error.
392 */
393#if DPDK_USE_BLACKLIST
394static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
395{
396        struct rte_pci_device *dev = NULL;
397        format_data->nb_blacklist = 0;
398
399        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
400
401        TAILQ_FOREACH(dev, &device_list, next) {
402        if (whitelist != NULL && whitelist->domain == dev->addr.domain
403            && whitelist->bus == dev->addr.bus
404            && whitelist->devid == dev->addr.devid
405            && whitelist->function == dev->addr.function)
406            continue;
407                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
408                                / sizeof (format_data->blacklist[0])) {
409                        fprintf(stderr, "Warning: too many devices to blacklist consider"
410                                        " increasing BLACK_LIST_SIZE");
411                        break;
412                }
413                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
414                ++format_data->nb_blacklist;
415        }
416
417        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
418        return 0;
419}
420#else /* DPDK_USE_BLACKLIST */
421#include <rte_devargs.h>
422static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
423{
424        char pci_str[20] = {0};
425        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
426                 whitelist->domain,
427                 whitelist->bus,
428                 whitelist->devid,
429                 whitelist->function);
430        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
431                return -1;
432        }
433        return 0;
434}
435#endif
436
437/**
438 * Parse the URI format as a pci address
439 * Fills in addr, note core is optional and is unchanged if
440 * a value for it is not provided.
441 *
442 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
443 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
444 */
445static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
446        int matches;
447        assert(str);
448        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
449                         &addr->domain, &addr->bus, &addr->devid,
450                         &addr->function, core);
451        if (matches >= 4) {
452                return 0;
453        } else {
454                return -1;
455        }
456}
457
458/**
459 * Convert a pci address to the numa node it is
460 * connected to.
461 *
462 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
463 * so we can call it before DPDK
464 *
465 * @return -1 if unknown otherwise a number 0 or higher of the numa node
466 */
467static int pci_to_numa(struct rte_pci_addr * dev_addr) {
468        char path[50] = {0};
469        FILE *file;
470
471        /* Read from the system */
472        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
473                 dev_addr->domain,
474                 dev_addr->bus,
475                 dev_addr->devid,
476                 dev_addr->function);
477
478        if((file = fopen(path, "r")) != NULL) {
479                int numa_node = -1;
480                fscanf(file, "%d", &numa_node);
481                fclose(file);
482                return numa_node;
483        }
484        return -1;
485}
486
487#if DEBUG
488/* For debugging */
489static inline void dump_configuration()
490{
491        struct rte_config * global_config;
492        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
493
494        if (nb_cpu <= 0) {
495                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
496                       " Falling back to the first core.");
497                nb_cpu = 1; /* fallback to just 1 core */
498        }
499        if (nb_cpu > RTE_MAX_LCORE)
500                nb_cpu = RTE_MAX_LCORE;
501
502        global_config = rte_eal_get_configuration();
503
504        if (global_config != NULL) {
505                int i;
506                fprintf(stderr, "Intel DPDK setup\n"
507                        "---Version      : %s\n"
508                        "---Master LCore : %"PRIu32"\n"
509                        "---LCore Count  : %"PRIu32"\n",
510                        rte_version(),
511                        global_config->master_lcore, global_config->lcore_count);
512
513                for (i = 0 ; i < nb_cpu; i++) {
514                        fprintf(stderr, "   ---Core %d : %s\n", i,
515                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
516                }
517
518                const char * proc_type;
519                switch (global_config->process_type) {
520                case RTE_PROC_AUTO:
521                        proc_type = "auto";
522                        break;
523                case RTE_PROC_PRIMARY:
524                        proc_type = "primary";
525                        break;
526                case RTE_PROC_SECONDARY:
527                        proc_type = "secondary";
528                        break;
529                case RTE_PROC_INVALID:
530                        proc_type = "invalid";
531                        break;
532                default:
533                        proc_type = "something worse than invalid!!";
534                }
535                fprintf(stderr, "---Process Type : %s\n", proc_type);
536        }
537
538}
539#endif
540
541/**
542 * Expects to be called from the master lcore and moves it to the given dpdk id
543 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
544 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
545 *               and not already in use.
546 * @return 0 is successful otherwise -1 on error.
547 */
548static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
549        struct rte_config *cfg = rte_eal_get_configuration();
550        cpu_set_t cpuset;
551        int i;
552
553        assert (core < RTE_MAX_LCORE);
554        assert (rte_get_master_lcore() == rte_lcore_id());
555
556        if (core == rte_lcore_id())
557                return 0;
558
559        /* Make sure we are not overwriting someone else */
560        assert(!rte_lcore_is_enabled(core));
561
562        /* Move the core */
563        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
564        cfg->lcore_role[core] = ROLE_RTE;
565        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
566        rte_eal_get_configuration()->master_lcore = core;
567        RTE_PER_LCORE(_lcore_id) = core;
568
569        /* Now change the affinity, either mapped to a single core or all accepted */
570        CPU_ZERO(&cpuset);
571
572        if (lcore_config[core].detected) {
573                CPU_SET(core, &cpuset);
574        } else {
575                for (i = 0; i < RTE_MAX_LCORE; ++i) {
576                        if (lcore_config[i].detected)
577                                CPU_SET(i, &cpuset);
578                }
579        }
580
581        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
582        if (i != 0) {
583                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
584                return -1;
585        }
586        return 0;
587}
588
589/**
590 * XXX This is very bad XXX
591 * But we have to do something to allow getopts nesting
592 * Luckly normally the format is last so it doesn't matter
593 * DPDK only supports modern systems so hopefully this
594 * will continue to work
595 */
596struct saved_getopts {
597        char *optarg;
598        int optind;
599        int opterr;
600        int optopt;
601};
602
603static void save_getopts(struct saved_getopts *opts) {
604        opts->optarg = optarg;
605        opts->optind = optind;
606        opts->opterr = opterr;
607        opts->optopt = optopt;
608}
609
610static void restore_getopts(struct saved_getopts *opts) {
611        optarg = opts->optarg;
612        optind = opts->optind;
613        opterr = opts->opterr;
614        optopt = opts->optopt;
615}
616
617static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
618                                        char * err, int errlen) {
619        int ret; /* Returned error codes */
620        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
621        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
622        char mem_map[20] = {0}; /* The memory name */
623        long nb_cpu; /* The number of CPUs in the system */
624        long my_cpu; /* The CPU number we want to bind to */
625        int i;
626        struct rte_config *cfg = rte_eal_get_configuration();
627        struct saved_getopts save_opts;
628
629        /* This initialises the Environment Abstraction Layer (EAL)
630         * If we had slave workers these are put into WAITING state
631         *
632         * Basically binds this thread to a fixed core, which we choose as
633         * the last core on the machine (assuming fewer interrupts mapped here).
634         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
635         * "-n" the number of memory channels into the CPU (hardware specific)
636         *      - Most likely to be half the number of ram slots in your machine.
637         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
638         * Controls where in memory packets are stored such that they are spread
639         * across the channels. We just use 1 to be safe.
640         *
641         * Using unique file prefixes mean separate memory is used, unlinking
642         * the two processes. However be careful we still cannot access a
643         * port that already in use.
644         */
645        char* argv[] = {"libtrace",
646                        "-c", cpu_number,
647                        "-n", "1",
648                        "--proc-type", "auto",
649                        "--file-prefix", mem_map,
650                        "-m", "512",
651#if DPDK_USE_LOG_LEVEL
652#       if DEBUG
653                        "--log-level", "8", /* RTE_LOG_DEBUG */
654#       else
655                        "--log-level", "5", /* RTE_LOG_WARNING */
656#       endif
657#endif
658                        NULL};
659        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
660
661#if DEBUG
662        rte_set_log_level(RTE_LOG_DEBUG);
663#else
664        rte_set_log_level(RTE_LOG_WARNING);
665#endif
666
667        /* Get the number of cpu cores in the system and use the last core
668         * on the correct numa node */
669        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
670        if (nb_cpu <= 0) {
671                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
672                       " Falling back to the first core.");
673                nb_cpu = 1; /* fallback to the first core */
674        }
675        if (nb_cpu > RTE_MAX_LCORE)
676                nb_cpu = RTE_MAX_LCORE;
677
678        my_cpu = -1;
679        /* This allows the user to specify the core - we would try to do this
680         * automatically but it's hard to tell that this is secondary
681         * before running rte_eal_init(...). Currently we are limited to 1
682         * instance per core due to the way memory is allocated. */
683        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
684                snprintf(err, errlen, "Failed to parse URI");
685                return -1;
686        }
687
688#if HAVE_LIBNUMA
689        format_data->nic_numa_node = pci_to_numa(&use_addr);
690        if (my_cpu < 0) {
691#if DEBUG
692                /* If we can assign to a core on the same numa node */
693                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
694#endif
695                if(format_data->nic_numa_node >= 0) {
696                        int max_node_cpu = -1;
697                        struct bitmask *mask = numa_allocate_cpumask();
698                        assert(mask);
699                        numa_node_to_cpus(format_data->nic_numa_node, mask);
700                        for (i = 0 ; i < nb_cpu; ++i) {
701                                if (numa_bitmask_isbitset(mask,i))
702                                        max_node_cpu = i+1;
703                        }
704                        my_cpu = max_node_cpu;
705                }
706        }
707#endif
708        if (my_cpu < 0) {
709                my_cpu = nb_cpu;
710        }
711
712
713        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
714                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
715
716        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
717                snprintf(err, errlen,
718                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
719                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
720                return -1;
721        }
722
723        /* Make our mask with all cores turned on this is so that DPDK
724         * gets all CPU info in older versions */
725        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
726        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
727
728#if !DPDK_USE_BLACKLIST
729        /* Black list all ports besides the one that we want to use */
730        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
731                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
732                         " are you sure the address is correct?: %s", strerror(-ret));
733                return -1;
734        }
735#endif
736
737        /* Give the memory map a unique name */
738        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
739        /* rte_eal_init it makes a call to getopt so we need to reset the
740         * global optind variable of getopt otherwise this fails */
741        save_getopts(&save_opts);
742        optind = 1;
743        if ((ret = rte_eal_init(argc, argv)) < 0) {
744                snprintf(err, errlen,
745                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
746                return -1;
747        }
748        restore_getopts(&save_opts);
749        // These are still running but will never do anything with DPDK v1.7 we
750        // should remove this XXX in the future
751        for(i = 0; i < RTE_MAX_LCORE; ++i) {
752                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
753                        cfg->lcore_role[i] = ROLE_OFF;
754                        cfg->lcore_count--;
755                }
756        }
757        // Only the master should be running
758        assert(cfg->lcore_count == 1);
759
760        // TODO XXX TODO
761        dpdk_move_master_lcore(NULL, my_cpu-1);
762
763#if DEBUG
764        dump_configuration();
765#endif
766
767#if DPDK_USE_PMD_INIT
768        /* This registers all available NICs with Intel DPDK
769         * These are not loaded until rte_eal_pci_probe() is called.
770         */
771        if ((ret = rte_pmd_init_all()) < 0) {
772                snprintf(err, errlen,
773                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
774                return -1;
775        }
776#endif
777
778#if DPDK_USE_BLACKLIST
779        /* Blacklist all ports besides the one that we want to use */
780        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
781                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
782                         " are you sure the address is correct?: %s", strerror(-ret));
783                return -1;
784        }
785#endif
786
787#if DPDK_USE_PCI_PROBE
788        /* This loads DPDK drivers against all ports that are not blacklisted */
789        if ((ret = rte_eal_pci_probe()) < 0) {
790                snprintf(err, errlen,
791                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
792                return -1;
793        }
794#endif
795
796        format_data->nb_ports = rte_eth_dev_count();
797
798        if (format_data->nb_ports != 1) {
799                snprintf(err, errlen,
800                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
801                         format_data->nb_ports);
802                return -1;
803        }
804
805        return 0;
806}
807
808static int dpdk_init_input (libtrace_t *libtrace) {
809        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
810        char err[500];
811        err[0] = 0;
812
813        libtrace->format_data = (struct dpdk_format_data_t *)
814                                malloc(sizeof(struct dpdk_format_data_t));
815        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
816        FORMAT(libtrace)->nb_ports = 0;
817        FORMAT(libtrace)->snaplen = 0; /* Use default */
818        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
819        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
820        FORMAT(libtrace)->nic_numa_node = -1;
821        FORMAT(libtrace)->promisc = -1;
822        FORMAT(libtrace)->pktmbuf_pool = NULL;
823#if DPDK_USE_BLACKLIST
824        FORMAT(libtrace)->nb_blacklist = 0;
825#endif
826        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
827        FORMAT(libtrace)->mempool_name[0] = 0;
828        memset(FORMAT(libtrace)->burst_pkts, 0,
829               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
830        FORMAT(libtrace)->burst_size = 0;
831        FORMAT(libtrace)->burst_offset = 0;
832
833        /* Make our first stream */
834        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
835        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
836
837        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
838                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
839                free(libtrace->format_data);
840                libtrace->format_data = NULL;
841                return -1;
842        }
843        return 0;
844}
845
846static int dpdk_init_output(libtrace_out_t *libtrace)
847{
848        char err[500];
849        err[0] = 0;
850
851        libtrace->format_data = (struct dpdk_format_data_t *)
852                                malloc(sizeof(struct dpdk_format_data_t));
853        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
854        FORMAT(libtrace)->nb_ports = 0;
855        FORMAT(libtrace)->snaplen = 0; /* Use default */
856        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
857        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
858        FORMAT(libtrace)->nic_numa_node = -1;
859        FORMAT(libtrace)->promisc = -1;
860        FORMAT(libtrace)->pktmbuf_pool = NULL;
861#if DPDK_USE_BLACKLIST
862        FORMAT(libtrace)->nb_blacklist = 0;
863#endif
864        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
865        FORMAT(libtrace)->mempool_name[0] = 0;
866        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
867        FORMAT(libtrace)->burst_size = 0;
868        FORMAT(libtrace)->burst_offset = 0;
869
870        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
871                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
872                free(libtrace->format_data);
873                libtrace->format_data = NULL;
874                return -1;
875        }
876        return 0;
877}
878
879/**
880 * Note here snaplen excludes the MAC checksum. Packets over
881 * the requested snaplen will be dropped. (Excluding MAC checksum)
882 *
883 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
884 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
885 * is set the maximum size of the returned packet would be 1518 otherwise
886 * 1514 would be the largest size possibly returned.
887 *
888 */
889static int dpdk_config_input (libtrace_t *libtrace,
890                              trace_option_t option,
891                              void *data) {
892        switch (option) {
893        case TRACE_OPTION_SNAPLEN:
894                /* Only support changing snaplen before a call to start is
895                 * made */
896                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
897                        FORMAT(libtrace)->snaplen=*(int*)data;
898                else
899                        return -1;
900                return 0;
901        case TRACE_OPTION_PROMISC:
902                FORMAT(libtrace)->promisc=*(int*)data;
903                return 0;
904        case TRACE_OPTION_HASHER:
905                switch (*((enum hasher_types *) data))
906                {
907                case HASHER_BALANCE:
908                case HASHER_UNIDIRECTIONAL:
909                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
910                        return 0;
911                case HASHER_BIDIRECTIONAL:
912                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
913                        return 0;
914                case HASHER_CUSTOM:
915                        // We don't support these
916                        return -1;
917                }
918                break;
919        case TRACE_OPTION_FILTER:
920                /* TODO filtering */
921        case TRACE_OPTION_META_FREQ:
922        case TRACE_OPTION_EVENT_REALTIME:
923                break;
924        /* Avoid default: so that future options will cause a warning
925         * here to remind us to implement it, or flag it as
926         * unimplementable
927         */
928        }
929
930        /* Don't set an error - trace_config will try to deal with the
931         * option and will set an error if it fails */
932        return -1;
933}
934
935/* Can set jumbo frames/ or limit the size of a frame by setting both
936 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
937 *
938 */
939static struct rte_eth_conf port_conf = {
940        .rxmode = {
941                .mq_mode = ETH_RSS,
942                .split_hdr_size = 0,
943                .header_split   = 0, /**< Header Split disabled */
944                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
945                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
946                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
947                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
948#if GET_MAC_CRC_CHECKSUM
949/* So it appears that if hw_strip_crc is turned off the driver will still
950 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
951 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
952 * So lets just add it back on when we receive the packet.
953 */
954                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
955#else
956/* By default strip the MAC checksum because it's a bit of a hack to
957 * actually read these. And don't want to rely on disabling this to actualy
958 * always cut off the checksum in the future
959 */
960                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
961#endif
962        },
963        .txmode = {
964                .mq_mode = ETH_DCB_NONE,
965        },
966        .rx_adv_conf = {
967                .rss_conf = {
968                        // .rss_key = &rss_key, // We set this per format
969                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
970                },
971        },
972        .intr_conf = {
973                .lsc = 1
974        }
975};
976
977static const struct rte_eth_rxconf rx_conf = {
978        .rx_thresh = {
979                .pthresh = 8,/* RX_PTHRESH prefetch */
980                .hthresh = 8,/* RX_HTHRESH host */
981                .wthresh = 4,/* RX_WTHRESH writeback */
982        },
983        .rx_free_thresh = 0,
984        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
985};
986
987static const struct rte_eth_txconf tx_conf = {
988        .tx_thresh = {
989                /*
990                 * TX_PTHRESH prefetch
991                 * Set on the NIC, if the number of unprocessed descriptors to queued on
992                 * the card fall below this try grab at least hthresh more unprocessed
993                 * descriptors.
994                 */
995                .pthresh = 36,
996
997                /* TX_HTHRESH host
998                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
999                 */
1000                .hthresh = 0,
1001
1002                /* TX_WTHRESH writeback
1003                 * Set on the NIC, the number of sent descriptors before writing back
1004                 * status to confirm the transmission. This is done more efficiently as
1005                 * a bulk DMA-transfer rather than writing one at a time.
1006                 * Similar to tx_free_thresh however this is applied to the NIC, where
1007                 * as tx_free_thresh is when DPDK will check these. This is extended
1008                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
1009                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
1010                 */
1011                .wthresh = 4,
1012        },
1013
1014        /* Used internally by DPDK rather than passed to the NIC. The number of
1015         * packet descriptors to send before checking for any responses written
1016         * back (to confirm the transmission). Default = 32 if set to 0)
1017         */
1018        .tx_free_thresh = 0,
1019
1020        /* This is the Report Status threshold, used by 10Gbit cards,
1021         * This signals the card to only write back status (such as
1022         * transmission successful) after this minimum number of transmit
1023         * descriptors are seen. The default is 32 (if set to 0) however if set
1024         * to greater than 1 TX wthresh must be set to zero, because this is kindof
1025         * a replacement. See the dpdk programmers guide for more restrictions.
1026         */
1027        .tx_rs_thresh = 1,
1028};
1029
1030/**
1031 * A callback for a link state change (LSC).
1032 *
1033 * Packets may be received before this notification. In fact the DPDK IGXBE
1034 * driver likes to put a delay upto 5sec before sending this.
1035 *
1036 * We use this to ensure the link speed is correct for our timestamp
1037 * calculations. Because packets might be received before the link up we still
1038 * update this when the packet is received.
1039 *
1040 * @param port The DPDK port
1041 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1042 * @param cb_arg The dpdk_format_data_t structure associated with the format
1043 */
1044static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1045                              void *cb_arg) {
1046        struct dpdk_format_data_t * format_data = cb_arg;
1047        struct rte_eth_link link_info;
1048        assert(event == RTE_ETH_EVENT_INTR_LSC);
1049        assert(port == format_data->port);
1050
1051        rte_eth_link_get_nowait(port, &link_info);
1052
1053        if (link_info.link_status)
1054                format_data->link_speed = link_info.link_speed;
1055        else
1056                format_data->link_speed = 0;
1057
1058#if DEBUG
1059        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1060                link_info.link_status ? "up" : "down",
1061                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1062                                          "full-duplex" : "half-duplex",
1063                (int) link_info.link_speed);
1064#endif
1065
1066        /* Turns out DPDK drivers might not come back up if the link speed
1067         * changes. So we reset the autoneg procedure. This is very unsafe
1068         * we have have threads reading packets and we stop the port. */
1069#if 0
1070        if (!link_info.link_status) {
1071                int ret;
1072                rte_eth_dev_stop(port);
1073                ret = rte_eth_dev_start(port);
1074                if (ret < 0) {
1075                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1076                                strerror(-ret));
1077                }
1078        }
1079#endif
1080}
1081
1082/** Reserve a DPDK lcore ID for a thread globally.
1083 *
1084 * @param real If true allocate a real lcore, otherwise allocate a core which
1085 * does not exist on the local machine.
1086 * @param socket the prefered NUMA socket - only used if a real core is requested
1087 * @return a valid core, which can later be used with dpdk_register_lcore() or a
1088 * -1 if have run out of cores.
1089 *
1090 * If any thread is reading or freeing packets we need to register it here
1091 * due to TLS caches in the memory pools.
1092 */
1093static int dpdk_reserve_lcore(bool real, int socket) {
1094        int new_id = -1;
1095        int i;
1096        struct rte_config *cfg = rte_eal_get_configuration();
1097
1098        pthread_mutex_lock(&dpdk_lock);
1099        /* If 'reading packets' fill in cores from 0 up and bind affinity
1100         * otherwise start from the MAX core (which is also the master) and work backwards
1101         * in this case physical cores on the system will not exist so we don't bind
1102         * these to any particular physical core */
1103        if (real) {
1104#if HAVE_LIBNUMA
1105                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1106                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
1107                                new_id = i;
1108                                if (!lcore_config[i].detected)
1109                                        new_id = -1;
1110                                break;
1111                        }
1112                }
1113#endif
1114                /* Retry without the the numa restriction */
1115                if (new_id == -1) {
1116                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1117                                if (!rte_lcore_is_enabled(i)) {
1118                                        new_id = i;
1119                                        if (!lcore_config[i].detected)
1120                                                fprintf(stderr, "Warning the"
1121                                                        " number of 'reading' "
1122                                                        "threads exceed cores\n");
1123                                        break;
1124                                }
1125                        }
1126                }
1127        } else {
1128                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1129                        if (!rte_lcore_is_enabled(i)) {
1130                                new_id = i;
1131                                break;
1132                        }
1133                }
1134        }
1135
1136        if (new_id != -1) {
1137                /* Enable the core in global DPDK structs */
1138                cfg->lcore_role[new_id] = ROLE_RTE;
1139                cfg->lcore_count++;
1140        }
1141
1142        pthread_mutex_unlock(&dpdk_lock);
1143        return new_id;
1144}
1145
1146/** Register a thread as a lcore
1147 * @param libtrace any error is set against libtrace on exit
1148 * @param real If this is a true lcore we will bind its affinty to the
1149 * requested core.
1150 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
1151 * @return 0, if successful otherwise -1 if an error occured (details are stored
1152 * in libtrace)
1153 *
1154 * @note This must be called from the thread being registered.
1155 */
1156static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
1157        int ret;
1158        RTE_PER_LCORE(_lcore_id) = lcore;
1159
1160        /* Set affinity bind to corresponding core */
1161        if (real) {
1162                cpu_set_t cpuset;
1163                CPU_ZERO(&cpuset);
1164                CPU_SET(rte_lcore_id(), &cpuset);
1165                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1166                if (ret != 0) {
1167                        trace_set_err(libtrace, errno, "Warning "
1168                                      "pthread_setaffinity_np failed");
1169                        return -1;
1170                }
1171        }
1172
1173        return 0;
1174}
1175
1176/** Allocates a new dpdk packet buffer memory pool.
1177 *
1178 * @param n The number of threads
1179 * @param pkt_size The packet size we need ot store
1180 * @param socket_id The NUMA socket id
1181 * @param A new mempool, if NULL query the DPDK library for the error code
1182 * see rte_mempool_create() documentation.
1183 *
1184 * This allocates a new pool or recycles an existing memory pool.
1185 * Call dpdk_free_memory() to free the memory.
1186 * We cannot delete memory so instead we store the pools, allowing them to be
1187 * re-used.
1188 */
1189static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1190                                             unsigned pkt_size,
1191                                             int socket_id) {
1192        struct rte_mempool *ret;
1193        size_t j,k;
1194        char name[MEMPOOL_NAME_LEN];
1195
1196        /* Add on packet size overheads */
1197        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1198
1199        pthread_mutex_lock(&dpdk_lock);
1200
1201        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1202                /* Best guess go for zero */
1203                socket_id = 0;
1204        }
1205
1206        /* Find a valid pool */
1207        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1208                if (mem_pools[socket_id][j]->size >= n &&
1209                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1210                        break;
1211                }
1212        }
1213
1214        /* Find the end (+1) of the list */
1215        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1216
1217        if (mem_pools[socket_id][j]) {
1218                ret = mem_pools[socket_id][j];
1219                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1220                mem_pools[socket_id][k-1] = NULL;
1221                mem_pools[socket_id][j] = NULL;
1222        } else {
1223                static uint32_t test = 10;
1224                test++;
1225                snprintf(name, MEMPOOL_NAME_LEN,
1226                         "libtrace_pool_%"PRIu32, test);
1227
1228                ret = rte_mempool_create(name, n, pkt_size,
1229                                         128, sizeof(struct rte_pktmbuf_pool_private),
1230                                         rte_pktmbuf_pool_init, NULL,
1231                                         rte_pktmbuf_init, NULL,
1232                                         socket_id, 0);
1233        }
1234
1235        pthread_mutex_unlock(&dpdk_lock);
1236        return ret;
1237}
1238
1239/** Stores the memory against the DPDK library.
1240 *
1241 * @param mempool The mempool to free
1242 * @param socket_id The NUMA socket this mempool was allocated upon.
1243 *
1244 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1245 * store the memory shared globally against the format.
1246 */
1247static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1248        size_t i;
1249        pthread_mutex_lock(&dpdk_lock);
1250
1251        /* We should have all entries back in the mempool */
1252        rte_mempool_audit(mempool);
1253        if (!rte_mempool_full(mempool)) {
1254                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1255                        "free all packets before finishing a trace\n",
1256                        rte_mempool_count(mempool), mempool->size);
1257        }
1258
1259        /* Find the end (+1) of the list */
1260        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1261
1262        if (i >= RTE_MAX_LCORE) {
1263                fprintf(stderr, "Too many memory pools, dropping this one\n");
1264        } else {
1265                mem_pools[socket_id][i] = mempool;
1266        }
1267
1268        pthread_mutex_unlock(&dpdk_lock);
1269}
1270
1271/* Attach memory to the port and start (or restart) the port/s.
1272 */
1273static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1274                              char *err, int errlen, uint16_t rx_queues) {
1275        int ret, i;
1276        struct rte_eth_link link_info; /* Wait for link */
1277        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1278
1279        /* Already started */
1280        if (format_data->paused == DPDK_RUNNING)
1281                return 0;
1282
1283        /* First time started we need to alloc our memory, doing this here
1284         * rather than in environment setup because we don't have snaplen then */
1285        if (format_data->paused == DPDK_NEVER_STARTED) {
1286                if (format_data->snaplen == 0) {
1287                        format_data->snaplen = RX_MBUF_SIZE;
1288                        port_conf.rxmode.jumbo_frame = 0;
1289                        port_conf.rxmode.max_rx_pkt_len = 0;
1290                } else {
1291                        /* Use jumbo frames */
1292                        port_conf.rxmode.jumbo_frame = 1;
1293                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1294                }
1295
1296#if GET_MAC_CRC_CHECKSUM
1297                /* This is additional overhead so make sure we allow space for this */
1298                format_data->snaplen += ETHER_CRC_LEN;
1299#endif
1300#if HAS_HW_TIMESTAMPS_82580
1301                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1302#endif
1303
1304                /* Create the mbuf pool, which is the place packets are allocated
1305                 * from - There is no free function (I cannot see one).
1306                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1307                 * allocate however that extra 1 packet is not used.
1308                 * (I assume <= vs < error some where in DPDK code)
1309                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1310                 * so that will fill the new buffer and wait until slots in the
1311                 * ring become available.
1312                 */
1313#if DEBUG
1314                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1315#endif
1316                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1317                                                              format_data->snaplen,
1318                                                              format_data->nic_numa_node);
1319
1320                if (format_data->pktmbuf_pool == NULL) {
1321                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1322                                 "pool failed: %s", strerror(rte_errno));
1323                        return -1;
1324                }
1325        }
1326
1327        /* ----------- Now do the setup for the port mapping ------------ */
1328        /* Order of calls must be
1329         * rte_eth_dev_configure()
1330         * rte_eth_tx_queue_setup()
1331         * rte_eth_rx_queue_setup()
1332         * rte_eth_dev_start()
1333         * other rte_eth calls
1334         */
1335
1336        /* This must be called first before another *eth* function
1337         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1338        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1339        if (ret < 0) {
1340                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1341                         " %"PRIu8" : %s", format_data->port,
1342                         strerror(-ret));
1343                return -1;
1344        }
1345#if DEBUG
1346        fprintf(stderr, "Doing dev configure\n");
1347#endif
1348        /* Initialise the TX queue a minimum value if using this port for
1349         * receiving. Otherwise a larger size if writing packets.
1350         */
1351        ret = rte_eth_tx_queue_setup(format_data->port,
1352                                     0 /* queue XXX */,
1353                                     format_data->nb_tx_buf,
1354                                     SOCKET_ID_ANY,
1355                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1356        if (ret < 0) {
1357                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1358                         " on port %"PRIu8" : %s", format_data->port,
1359                         strerror(-ret));
1360                return -1;
1361        }
1362
1363        /* Attach memory to our RX queues */
1364        for (i=0; i < rx_queues; i++) {
1365                dpdk_per_stream_t *stream;
1366#if DEBUG
1367                fprintf(stderr, "Configuring queue %d\n", i);
1368#endif
1369
1370                /* Add storage for the stream */
1371                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1372                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1373                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1374                stream->queue_id = i;
1375
1376                if (stream->lcore == -1)
1377                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1378
1379                if (stream->lcore == -1) {
1380                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1381                                 ". Too many threads?");
1382                        return -1;
1383                }
1384
1385                if (stream->mempool == NULL) {
1386                        stream->mempool = dpdk_alloc_memory(
1387                                                  format_data->nb_rx_buf*2,
1388                                                  format_data->snaplen,
1389                                                  rte_lcore_to_socket_id(stream->lcore));
1390
1391                        if (stream->mempool == NULL) {
1392                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1393                                         "pool failed: %s", strerror(rte_errno));
1394                                return -1;
1395                        }
1396                }
1397
1398                /* Initialise the RX queue with some packets from memory */
1399                ret = rte_eth_rx_queue_setup(format_data->port,
1400                                             stream->queue_id,
1401                                             format_data->nb_rx_buf,
1402                                             format_data->nic_numa_node,
1403                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1404                                             stream->mempool);
1405                if (ret < 0) {
1406                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1407                                 " RX queue on port %"PRIu8" : %s",
1408                                 format_data->port,
1409                                 strerror(-ret));
1410                        return -1;
1411                }
1412        }
1413
1414#if DEBUG
1415        fprintf(stderr, "Doing start device\n");
1416#endif
1417        rte_eth_stats_reset(format_data->port);
1418        /* Start device */
1419        ret = rte_eth_dev_start(format_data->port);
1420        if (ret < 0) {
1421                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1422                         strerror(-ret));
1423                return -1;
1424        }
1425
1426        /* Default promiscuous to on */
1427        if (format_data->promisc == -1)
1428                format_data->promisc = 1;
1429
1430        if (format_data->promisc == 1)
1431                rte_eth_promiscuous_enable(format_data->port);
1432        else
1433                rte_eth_promiscuous_disable(format_data->port);
1434
1435        /* We have now successfully started/unpased */
1436        format_data->paused = DPDK_RUNNING;
1437
1438
1439        /* Register a callback for link state changes */
1440        ret = rte_eth_dev_callback_register(format_data->port,
1441                                            RTE_ETH_EVENT_INTR_LSC,
1442                                            dpdk_lsc_callback,
1443                                            format_data);
1444#if DEBUG
1445        if (ret)
1446                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1447                        ret, strerror(-ret));
1448#endif
1449
1450        /* Get the current link status */
1451        rte_eth_link_get_nowait(format_data->port, &link_info);
1452        format_data->link_speed = link_info.link_speed;
1453#if DEBUG
1454        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1455                (int) link_info.link_duplex, (int) link_info.link_speed);
1456#endif
1457
1458        return 0;
1459}
1460
1461static int dpdk_start_input (libtrace_t *libtrace) {
1462        char err[500];
1463        err[0] = 0;
1464
1465        /* Make sure we don't reserve an extra thread for this */
1466        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1467
1468        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1469                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1470                free(libtrace->format_data);
1471                libtrace->format_data = NULL;
1472                return -1;
1473        }
1474        return 0;
1475}
1476
1477static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1478        struct rte_eth_dev_info dev_info;
1479        rte_eth_dev_info_get(port_id, &dev_info);
1480        return dev_info.max_rx_queues;
1481}
1482
1483static inline size_t dpdk_processor_count () {
1484        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1485        if (nb_cpu <= 0)
1486                return 1;
1487        else
1488                return (size_t) nb_cpu;
1489}
1490
1491static int dpdk_pstart_input (libtrace_t *libtrace) {
1492        char err[500];
1493        int i=0, phys_cores=0;
1494        int tot = libtrace->perpkt_thread_count;
1495        libtrace_list_node_t *n;
1496        err[0] = 0;
1497
1498        if (rte_lcore_id() != rte_get_master_lcore())
1499                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1500                        " from the master DPDK thread!\n");
1501
1502        /* If the master is not on the last thread we move it there */
1503        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1504                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1505                        return -1;
1506        }
1507
1508        /* Don't exceed the number of cores in the system/detected by dpdk
1509         * We don't have to force this but performance wont be good if we don't */
1510        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1511                if (lcore_config[i].detected) {
1512                        if (rte_lcore_is_enabled(i)) {
1513#if DEBUG
1514                                fprintf(stderr, "Found core %d already in use!\n", i);
1515#endif
1516                        } else {
1517                                phys_cores++;
1518                        }
1519                }
1520        }
1521        /* If we are restarting we have already allocated some threads as such
1522         * we add these back to the count for this calculation */
1523        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1524                dpdk_per_stream_t * stream = n->data;
1525                if (stream->lcore != -1)
1526                        phys_cores++;
1527        }
1528
1529        tot = MIN(libtrace->perpkt_thread_count,
1530                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1531        tot = MIN(tot, phys_cores);
1532
1533#if DEBUG
1534        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1535                libtrace->perpkt_thread_count, phys_cores);
1536#endif
1537
1538        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1539                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1540                free(libtrace->format_data);
1541                libtrace->format_data = NULL;
1542                return -1;
1543        }
1544
1545        /* Make sure we only start the number that we should */
1546        libtrace->perpkt_thread_count = tot;
1547        return 0;
1548}
1549
1550/**
1551 * Register a thread with the DPDK system,
1552 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1553 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1554 * gives it.
1555 *
1556 * We then allow a mapper thread to be started on every real core as DPDK would,
1557 * we also bind these to the corresponding CPU cores.
1558 *
1559 * @param libtrace A pointer to the trace
1560 * @param reading True if the thread will be used to read packets, i.e. will
1561 *                call pread_packet(), false if thread used to process packet
1562 *                in any other manner including statistics functions.
1563 */
1564static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1565{
1566#if DEBUG
1567        char name[99];
1568        name[0] = 0;
1569#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1570        pthread_getname_np(pthread_self(),
1571                           name, sizeof(name));
1572#endif
1573#endif
1574        if (reading) {
1575                dpdk_per_stream_t *stream;
1576                /* Attach our thread */
1577                if(t->type == THREAD_PERPKT) {
1578                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1579                        if (t->format_data == NULL) {
1580                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1581                                              "Too many threads registered");
1582                                return -1;
1583                        }
1584                } else {
1585                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1586                }
1587                stream = t->format_data;
1588#if DEBUG
1589                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1590#endif
1591                return dpdk_register_lcore(libtrace, true, stream->lcore);
1592        } else {
1593                int lcore = dpdk_reserve_lcore(reading, 0);
1594                if (lcore == -1) {
1595                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1596                                      " for DPDK");
1597                        return -1;
1598                }
1599#if DEBUG
1600                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1601#endif
1602                return dpdk_register_lcore(libtrace, false, lcore);
1603        }
1604
1605        return 0;
1606}
1607
1608/**
1609 * Unregister a thread with the DPDK system.
1610 *
1611 * Only previously registered threads should be calling this just before
1612 * they are destroyed.
1613 */
1614static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1615{
1616        struct rte_config *cfg = rte_eal_get_configuration();
1617
1618        assert(rte_lcore_id() < RTE_MAX_LCORE);
1619        pthread_mutex_lock(&dpdk_lock);
1620        /* Skip if master */
1621        if (rte_lcore_id() == rte_get_master_lcore()) {
1622                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1623                pthread_mutex_unlock(&dpdk_lock);
1624                return;
1625        }
1626
1627        /* Disable this core in global DPDK structs */
1628        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1629        cfg->lcore_count--;
1630        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1631        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1632        pthread_mutex_unlock(&dpdk_lock);
1633        return;
1634}
1635
1636static int dpdk_start_output(libtrace_out_t *libtrace)
1637{
1638        char err[500];
1639        err[0] = 0;
1640
1641        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1642                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1643                free(libtrace->format_data);
1644                libtrace->format_data = NULL;
1645                return -1;
1646        }
1647        return 0;
1648}
1649
1650static int dpdk_pause_input(libtrace_t * libtrace) {
1651        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1652        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1653        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1654#if DEBUG
1655                fprintf(stderr, "Pausing DPDK port\n");
1656#endif
1657                rte_eth_dev_stop(FORMAT(libtrace)->port);
1658                FORMAT(libtrace)->paused = DPDK_PAUSED;
1659                /* Empty the queue of packets */
1660                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1661                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1662                }
1663                FORMAT(libtrace)->burst_offset = 0;
1664                FORMAT(libtrace)->burst_size = 0;
1665
1666                for (; tmp != NULL; tmp = tmp->next) {
1667                        dpdk_per_stream_t *stream = tmp->data;
1668                        stream->ts_last_sys = 0;
1669#if HAS_HW_TIMESTAMPS_82580
1670                        stream->ts_first_sys = 0;
1671#endif
1672                }
1673
1674        }
1675        return 0;
1676}
1677
1678static int dpdk_write_packet(libtrace_out_t *trace,
1679                             libtrace_packet_t *packet){
1680        struct rte_mbuf* m_buff[1];
1681
1682        int wirelen = trace_get_wire_length(packet);
1683        int caplen = trace_get_capture_length(packet);
1684
1685        /* Check for a checksum and remove it */
1686        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1687            wirelen == caplen)
1688                caplen -= ETHER_CRC_LEN;
1689
1690        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1691        if (m_buff[0] == NULL) {
1692                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1693                return -1;
1694        } else {
1695                int ret;
1696                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1697                do {
1698                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1699                } while (ret != 1);
1700        }
1701
1702        return 0;
1703}
1704
1705static int dpdk_fin_input(libtrace_t * libtrace) {
1706        libtrace_list_node_t * n;
1707        /* Free our memory structures */
1708        if (libtrace->format_data != NULL) {
1709
1710                if (FORMAT(libtrace)->port != 0xFF)
1711                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1712                                                        RTE_ETH_EVENT_INTR_LSC,
1713                                                        dpdk_lsc_callback,
1714                                                        FORMAT(libtrace));
1715                /* Close the device completely, device cannot be restarted */
1716                rte_eth_dev_close(FORMAT(libtrace)->port);
1717
1718                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1719                                 FORMAT(libtrace)->nic_numa_node);
1720
1721                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1722                        dpdk_per_stream_t * stream = n->data;
1723                        if (stream->mempool)
1724                                dpdk_free_memory(stream->mempool,
1725                                                 rte_lcore_to_socket_id(stream->lcore));
1726                }
1727
1728                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1729                /* filter here if we used it */
1730                free(libtrace->format_data);
1731        }
1732
1733        return 0;
1734}
1735
1736
1737static int dpdk_fin_output(libtrace_out_t * libtrace) {
1738        /* Free our memory structures */
1739        if (libtrace->format_data != NULL) {
1740                /* Close the device completely, device cannot be restarted */
1741                if (FORMAT(libtrace)->port != 0xFF)
1742                        rte_eth_dev_close(FORMAT(libtrace)->port);
1743                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1744                /* filter here if we used it */
1745                free(libtrace->format_data);
1746        }
1747
1748        return 0;
1749}
1750
1751/**
1752 * Get the start of the additional header that we added to a packet.
1753 */
1754static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1755        assert(packet);
1756        assert(packet->buffer);
1757        /* Our header sits straight after the mbuf header */
1758        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1759}
1760
1761static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1762        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1763        return hdr->cap_len;
1764}
1765
1766static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1767        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1768        if (size > hdr->cap_len) {
1769                /* Cannot make a packet bigger */
1770                return trace_get_capture_length(packet);
1771        }
1772
1773        /* Reset the cached capture length first*/
1774        packet->capture_length = -1;
1775        hdr->cap_len = (uint32_t) size;
1776        return trace_get_capture_length(packet);
1777}
1778
1779static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1780        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1781        int org_cap_size; /* The original capture size */
1782        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1783                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1784                               sizeof(struct hw_timestamp_82580);
1785        } else {
1786                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1787        }
1788        if (hdr->flags & INCLUDES_CHECKSUM) {
1789                return org_cap_size;
1790        } else {
1791                /* DPDK packets are always TRACE_TYPE_ETH packets */
1792                return org_cap_size + ETHER_CRC_LEN;
1793        }
1794}
1795
1796static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1797        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1798        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1799                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1800                                sizeof(struct hw_timestamp_82580);
1801        else
1802                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1803}
1804
1805static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1806                               libtrace_packet_t *packet, void *buffer,
1807                               libtrace_rt_types_t rt_type, uint32_t flags) {
1808        assert(packet);
1809        if (packet->buffer != buffer &&
1810            packet->buf_control == TRACE_CTRL_PACKET) {
1811                free(packet->buffer);
1812        }
1813
1814        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1815                packet->buf_control = TRACE_CTRL_PACKET;
1816        else
1817                packet->buf_control = TRACE_CTRL_EXTERNAL;
1818
1819        packet->buffer = buffer;
1820        packet->header = buffer;
1821
1822        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1823        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1824        packet->type = rt_type;
1825        return 0;
1826}
1827
1828/**
1829 * Given a packet size and a link speed, computes the
1830 * time to transmit in nanoseconds.
1831 *
1832 * @param format_data The dpdk format data from which we get the link speed
1833 *        and if unset updates it in a thread safe manner
1834 * @param pkt_size The size of the packet in bytes
1835 * @return The wire time in nanoseconds
1836 */
1837static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1838        uint32_t wire_time;
1839        /* 20 extra bytes of interframe gap and preamble */
1840# if GET_MAC_CRC_CHECKSUM
1841        wire_time = ((pkt_size + 20) * 8000);
1842# else
1843        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1844# endif
1845
1846        /* Division is really slow and introduces a pipeline stall
1847         * The compiler will optimise this into magical multiplication and shifting
1848         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1849         */
1850retry_calc_wiretime:
1851        switch (format_data->link_speed) {
1852        case ETH_LINK_SPEED_40G:
1853                wire_time /=  ETH_LINK_SPEED_40G;
1854                break;
1855        case ETH_LINK_SPEED_20G:
1856                wire_time /= ETH_LINK_SPEED_20G;
1857                break;
1858        case ETH_LINK_SPEED_10G:
1859                wire_time /= ETH_LINK_SPEED_10G;
1860                break;
1861        case ETH_LINK_SPEED_1000:
1862                wire_time /= ETH_LINK_SPEED_1000;
1863                break;
1864        case 0:
1865                {
1866                /* Maybe the link was down originally, but now it should be up */
1867                struct rte_eth_link link = {0};
1868                rte_eth_link_get_nowait(format_data->port, &link);
1869                if (link.link_status && link.link_speed) {
1870                        format_data->link_speed = link.link_speed;
1871#ifdef DEBUG
1872                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1873#endif
1874                        goto retry_calc_wiretime;
1875                }
1876                /* We don't know the link speed, make sure numbers are counting up */
1877                wire_time = 1;
1878                break;
1879                }
1880        default:
1881                wire_time /= format_data->link_speed;
1882        }
1883        return wire_time;
1884}
1885
1886/**
1887 * Does any extra preperation to all captured packets
1888 * This includes adding our extra header to it with the timestamp,
1889 * and any snapping
1890 *
1891 * @param format_data The DPDK format data
1892 * @param plc The DPDK per lcore format data
1893 * @param pkts An array of size nb_pkts of DPDK packets
1894 */
1895static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1896                                   struct dpdk_per_stream_t *plc,
1897                                   struct rte_mbuf **pkts,
1898                                   size_t nb_pkts) {
1899        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1900        struct dpdk_addt_hdr *hdr;
1901        size_t i;
1902        uint64_t cur_sys_time_ns;
1903#if HAS_HW_TIMESTAMPS_82580
1904        struct hw_timestamp_82580 *hw_ts;
1905        uint64_t estimated_wraps;
1906#else
1907
1908#endif
1909
1910#if USE_CLOCK_GETTIME
1911        struct timespec cur_sys_time = {0};
1912        /* This looks terrible and I feel bad doing it. But it's OK
1913         * on new kernels, because this is a fast vsyscall */
1914        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1915        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1916#else
1917        struct timeval cur_sys_time = {0};
1918        /* Also a fast vsyscall */
1919        gettimeofday(&cur_sys_time, NULL);
1920        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1921#endif
1922
1923        /* The system clock is not perfect so when running
1924         * at linerate we could timestamp a packet in the past.
1925         * To avoid this we munge the timestamp to appear 1ns
1926         * after the previous packet. We should eventually catch up
1927         * to system time since a 64byte packet on a 10G link takes 67ns.
1928         *
1929         * Note with parallel readers timestamping packets
1930         * with duplicate stamps or out of order is unavoidable without
1931         * hardware timestamping from the NIC.
1932         */
1933#if !HAS_HW_TIMESTAMPS_82580
1934        if (plc->ts_last_sys >= cur_sys_time_ns) {
1935                cur_sys_time_ns = plc->ts_last_sys + 1;
1936        }
1937#endif
1938
1939        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1940        for (i = 0 ; i < nb_pkts ; ++i) {
1941
1942                /* We put our header straight after the dpdk header */
1943                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1944                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1945
1946#if GET_MAC_CRC_CHECKSUM
1947                /* Add back in the CRC sum */
1948                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1949                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1950                hdr->flags |= INCLUDES_CHECKSUM;
1951#endif
1952
1953                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1954
1955#if HAS_HW_TIMESTAMPS_82580
1956                /* The timestamp is sitting before our packet and is included in pkt_len */
1957                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1958                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1959                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1960
1961                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1962                 *
1963                 *        +----------+---+   +--------------+
1964                 *  82580 |    24    | 8 |   |      32      |
1965                 *        +----------+---+   +--------------+
1966                 *          reserved  \______ 40 bits _____/
1967                 *
1968                 * The 40 bit 82580 SYSTIM overflows every
1969                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1970                 *
1971                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1972                 * Endian (for the full 64 bits) i.e. picture is mirrored
1973                 */
1974
1975                /* Despite what the documentation says this is in Little
1976                 * Endian byteorder. Mask the reserved section out.
1977                 */
1978                hdr->timestamp = le64toh(hw_ts->timestamp) &
1979                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1980
1981                if (unlikely(plc->ts_first_sys == 0)) {
1982                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1983                        plc->ts_last_sys = plc->ts_first_sys;
1984                }
1985
1986                /* This will have serious problems if packets aren't read quickly
1987                 * that is within a couple of seconds because our clock cycles every
1988                 * 18 seconds */
1989                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1990                                  / (1ull<<TS_NBITS_82580);
1991
1992                /* Estimated_wraps gives the number of times the counter should have
1993                 * wrapped (however depending on value last time it could have wrapped
1994                 * twice more (if hw clock is close to its max value) or once less (allowing
1995                 * for a bit of variance between hw and sys clock). But if the clock
1996                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1997                if (unlikely(estimated_wraps >= 2)) {
1998                        /* 2 or more wrap arounds add all but the very last wrap */
1999                        plc->wrap_count += estimated_wraps - 1;
2000                }
2001
2002                /* Set the timestamp to the lowest possible value we're considering */
2003                hdr->timestamp += plc->ts_first_sys +
2004                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
2005
2006                /* In most runs only the first if() will need evaluating - i.e our
2007                 * estimate is correct. */
2008                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
2009                                              hdr->timestamp, MAXSKEW_82580))) {
2010                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
2011                        plc->wrap_count++;
2012                        hdr->timestamp += (1ull<<TS_NBITS_82580);
2013                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
2014                                             hdr->timestamp, MAXSKEW_82580)) {
2015                                /* Failed to match estimated_wraps */
2016                                plc->wrap_count++;
2017                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2018                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2019                                                     hdr->timestamp, MAXSKEW_82580)) {
2020                                        if (estimated_wraps == 0) {
2021                                                /* 0 case Failed to match estimated_wraps+2 */
2022                                                printf("WARNING - Hardware Timestamp failed to"
2023                                                       " match using systemtime!\n");
2024                                                hdr->timestamp = cur_sys_time_ns;
2025                                        } else {
2026                                                /* Failed to match estimated_wraps+1 */
2027                                                plc->wrap_count++;
2028                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2029                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2030                                                                     hdr->timestamp, MAXSKEW_82580)) {
2031                                                        /* Failed to match estimated_wraps+2 */
2032                                                        printf("WARNING - Hardware Timestamp failed to"
2033                                                               " match using systemtime!!\n");
2034                                                }
2035                                        }
2036                                }
2037                        }
2038                }
2039#else
2040
2041                hdr->timestamp = cur_sys_time_ns;
2042                /* Offset the next packet by the wire time of previous */
2043                calculate_wire_time(format_data, hdr->cap_len);
2044
2045#endif
2046        }
2047
2048        plc->ts_last_sys = cur_sys_time_ns;
2049        return;
2050}
2051
2052
2053static void dpdk_fin_packet(libtrace_packet_t *packet)
2054{
2055        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2056                rte_pktmbuf_free(packet->buffer);
2057                packet->buffer = NULL;
2058        }
2059}
2060
2061/** Reads at least one packet or returns an error
2062 */
2063static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
2064                                           dpdk_per_stream_t *stream,
2065                                           libtrace_message_queue_t *mesg,
2066                                           struct rte_mbuf* pkts_burst[],
2067                                           size_t nb_packets) {
2068        size_t nb_rx; /* Number of rx packets we've recevied */
2069        while (1) {
2070                /* Poll for a batch of packets */
2071                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2072                                         stream->queue_id, pkts_burst, nb_packets);
2073                if (nb_rx > 0) {
2074                        /* Got some packets - otherwise we keep spining */
2075                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
2076                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2077                        return nb_rx;
2078                }
2079                /* Check the message queue this could be less than 0 */
2080                if (mesg && libtrace_message_queue_count(mesg) > 0)
2081                        return READ_MESSAGE;
2082                if (libtrace_halt)
2083                        return READ_EOF;
2084                /* Wait a while, polling on memory degrades performance
2085                 * This relieves the pressure on memory allowing the NIC to DMA */
2086                rte_delay_us(10);
2087        }
2088
2089        /* We'll never get here - but if we did it would be bad */
2090        return READ_ERROR;
2091}
2092
2093static int dpdk_pread_packets (libtrace_t *libtrace,
2094                                    libtrace_thread_t *t,
2095                                    libtrace_packet_t **packets,
2096                                    size_t nb_packets) {
2097        int nb_rx; /* Number of rx packets we've recevied */
2098        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2099        int i;
2100        dpdk_per_stream_t *stream = t->format_data;
2101
2102        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
2103                                         pkts_burst, nb_packets);
2104
2105        if (nb_rx > 0) {
2106                for (i = 0; i < nb_rx; ++i) {
2107                        if (packets[i]->buffer != NULL) {
2108                                /* The packet should always be finished */
2109                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
2110                                free(packets[i]->buffer);
2111                        }
2112                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2113                        packets[i]->type = TRACE_RT_DATA_DPDK;
2114                        packets[i]->buffer = pkts_burst[i];
2115                        packets[i]->trace = libtrace;
2116                        packets[i]->error = 1;
2117                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2118                }
2119        }
2120
2121        return nb_rx;
2122}
2123
2124static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2125        int nb_rx; /* Number of rx packets we've received */
2126        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2127
2128        /* Free the last packet buffer */
2129        if (packet->buffer != NULL) {
2130                /* The packet should always be finished */
2131                assert(packet->buf_control == TRACE_CTRL_PACKET);
2132                free(packet->buffer);
2133                packet->buffer = NULL;
2134        }
2135
2136        packet->buf_control = TRACE_CTRL_EXTERNAL;
2137        packet->type = TRACE_RT_DATA_DPDK;
2138
2139        /* Check if we already have some packets buffered */
2140        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2141                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2142                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2143                return 1; // TODO should be bytes read, which essentially useless anyway
2144        }
2145
2146        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2147                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2148
2149        if (nb_rx > 0) {
2150                FORMAT(libtrace)->burst_size = nb_rx;
2151                FORMAT(libtrace)->burst_offset = 1;
2152                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2153                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2154                return 1;
2155        }
2156        return nb_rx;
2157}
2158
2159static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2160        struct timeval tv;
2161        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2162
2163        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2164        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2165        return tv;
2166}
2167
2168static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2169        struct timespec ts;
2170        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2171
2172        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2173        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2174        return ts;
2175}
2176
2177static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2178        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2179}
2180
2181static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2182        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2183        return (libtrace_direction_t) hdr->direction;
2184}
2185
2186static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2187        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2188        hdr->direction = (uint8_t) direction;
2189        return (libtrace_direction_t) hdr->direction;
2190}
2191
2192static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2193        struct rte_eth_stats dev_stats = {0};
2194
2195        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2196                return;
2197
2198        /* Grab the current stats */
2199        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2200
2201        stats->captured_valid = true;
2202        stats->captured = dev_stats.ipackets;
2203
2204        /* Not that we support adding filters but if we did this
2205         * would work */
2206        stats->filtered += dev_stats.fdirmiss;
2207
2208        stats->dropped_valid = true;
2209        stats->dropped = dev_stats.imissed;
2210
2211        /* DPDK errors includes drops */
2212        stats->errors_valid = true;
2213        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2214
2215        stats->received_valid = true;
2216        stats->received = dev_stats.ipackets + dev_stats.imissed;
2217
2218}
2219
2220/* Attempts to read a packet in a non-blocking fashion. If one is not
2221 * available a SLEEP event is returned. We do not have the ability to
2222 * create a select()able file descriptor in DPDK.
2223 */
2224static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2225                                            libtrace_packet_t *packet) {
2226        libtrace_eventobj_t event = {0,0,0.0,0};
2227        int nb_rx; /* Number of receive packets we've read */
2228        struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2229
2230        do {
2231
2232                /* See if we already have a packet waiting */
2233                nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2234                                         FORMAT_DATA_FIRST(trace)->queue_id,
2235                                         pkts_burst, 1);
2236
2237                if (nb_rx > 0) {
2238                        /* Free the last packet buffer */
2239                        if (packet->buffer != NULL) {
2240                                /* The packet should always be finished */
2241                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2242                                free(packet->buffer);
2243                                packet->buffer = NULL;
2244                        }
2245
2246                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2247                        packet->type = TRACE_RT_DATA_DPDK;
2248                        event.type = TRACE_EVENT_PACKET;
2249                        dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
2250                        packet->buffer = FORMAT(trace)->burst_pkts[0];
2251                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2252                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2253
2254                        /* XXX - Check this passes the filter trace_read_packet normally
2255                         * does this for us but this wont */
2256                        if (trace->filter) {
2257                                if (!trace_apply_filter(trace->filter, packet)) {
2258                                        /* Failed the filter so we loop for another packet */
2259                                        trace->filtered_packets ++;
2260                                        continue;
2261                                }
2262                        }
2263                        trace->accepted_packets ++;
2264                } else {
2265                        /* We only want to sleep for a very short time - we are non-blocking */
2266                        event.type = TRACE_EVENT_SLEEP;
2267                        event.seconds = 0.0001;
2268                        event.size = 0;
2269                }
2270
2271                /* If we get here we have our event */
2272                break;
2273        } while (1);
2274
2275        return event;
2276}
2277
2278static void dpdk_help(void) {
2279        printf("dpdk format module: $Revision: 1752 $\n");
2280        printf("Supported input URIs:\n");
2281        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2282        printf("\tThe -<coreid> is optional \n");
2283        printf("\t e.g. dpdk:0000:01:00.1\n");
2284        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2285        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2286        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2287        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2288        printf("\n");
2289        printf("Supported output URIs:\n");
2290        printf("\tSame format as the input URI.\n");
2291        printf("\t e.g. dpdk:0000:01:00.1\n");
2292        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2293        printf("\n");
2294}
2295
2296static struct libtrace_format_t dpdk = {
2297        "dpdk",
2298        "$Id$",
2299        TRACE_FORMAT_DPDK,
2300        NULL,                               /* probe filename */
2301        NULL,                               /* probe magic */
2302        dpdk_init_input,                    /* init_input */
2303        dpdk_config_input,                  /* config_input */
2304        dpdk_start_input,                   /* start_input */
2305        dpdk_pause_input,                   /* pause_input */
2306        dpdk_init_output,                   /* init_output */
2307        NULL,                               /* config_output */
2308        dpdk_start_output,                  /* start_ouput */
2309        dpdk_fin_input,                     /* fin_input */
2310        dpdk_fin_output,                    /* fin_output */
2311        dpdk_read_packet,                   /* read_packet */
2312        dpdk_prepare_packet,                /* prepare_packet */
2313        dpdk_fin_packet,                    /* fin_packet */
2314        dpdk_write_packet,                  /* write_packet */
2315        dpdk_get_link_type,                 /* get_link_type */
2316        dpdk_get_direction,                 /* get_direction */
2317        dpdk_set_direction,                 /* set_direction */
2318        NULL,                               /* get_erf_timestamp */
2319        dpdk_get_timeval,                   /* get_timeval */
2320        dpdk_get_timespec,                  /* get_timespec */
2321        NULL,                               /* get_seconds */
2322        NULL,                               /* seek_erf */
2323        NULL,                               /* seek_timeval */
2324        NULL,                               /* seek_seconds */
2325        dpdk_get_capture_length,            /* get_capture_length */
2326        dpdk_get_wire_length,               /* get_wire_length */
2327        dpdk_get_framing_length,            /* get_framing_length */
2328        dpdk_set_capture_length,            /* set_capture_length */
2329        NULL,                               /* get_received_packets */
2330        NULL,                               /* get_filtered_packets */
2331        NULL,                               /* get_dropped_packets */
2332        dpdk_get_stats,                     /* get_statistics */
2333        NULL,                               /* get_fd */
2334        dpdk_trace_event,                   /* trace_event */
2335        dpdk_help,                          /* help */
2336        NULL,                               /* next pointer */
2337        {true, 8},                          /* Live, NICs typically have 8 threads */
2338        dpdk_pstart_input,                  /* pstart_input */
2339        dpdk_pread_packets,                 /* pread_packets */
2340        dpdk_pause_input,                   /* ppause */
2341        dpdk_fin_input,                     /* p_fin */
2342        dpdk_pregister_thread,              /* pregister_thread */
2343        dpdk_punregister_thread,            /* punregister_thread */
2344        NULL                                /* get thread stats */
2345};
2346
2347void dpdk_constructor(void) {
2348        register_format(&dpdk);
2349}
Note: See TracBrowser for help on using the repository browser.