source: lib/format_dpdk.c @ e47ab4d

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since e47ab4d was e47ab4d, checked in by GitHub <noreply@…>, 5 years ago

Support uni/bi directional hasher for dpdk

For HASHER_BALLANCE, use default non-symmetric rss key in driver.
For HASHER_BIDIRECTIONAL, use 2-bytes-repeated rss key.

  • Property mode set to 100644
File size: 76.6 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 *
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44
45#ifdef HAVE_INTTYPES_H
46#  include <inttypes.h>
47#else
48# error "Can't find inttypes.h"
49#endif
50
51#include <stdlib.h>
52#include <assert.h>
53#include <unistd.h>
54#include <endian.h>
55#include <string.h>
56
57#if HAVE_LIBNUMA
58#include <numa.h>
59#endif
60
61/* We can deal with any minor differences by checking the RTE VERSION
62 * Typically DPDK backports some fixes (typically for building against
63 * newer kernels) to the older version of DPDK.
64 *
65 * These get released with the rX suffix. The following macros where added
66 * in these new releases.
67 *
68 * Below this is a log of version that required changes to the libtrace
69 * code (that we still attempt to support).
70 *
71 * DPDK 16.04 or newer is recommended.
72 * However 1.6 and newer are still likely supported.
73 */
74#include <rte_eal.h>
75#include <rte_version.h>
76#ifndef RTE_VERSION_NUM
77#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
78#endif
79#ifndef RTE_VER_PATCH_RELEASE
80#       define RTE_VER_PATCH_RELEASE 0
81#endif
82#ifndef RTE_VERSION
83#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
84        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
85#endif
86
87/* 1.6.0r2 :
88 *      rte_eal_pci_set_blacklist() is removed
89 *      device_list is renamed to pci_device_list
90 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
91 *      as such we do apply the whitelist before rte_eal_init.
92 *      This also works correctly with DPDK 1.6.0r2.
93 *
94 * Replaced by:
95 *      rte_devargs (we can simply whitelist)
96 */
97#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
98#       define DPDK_USE_BLACKLIST 1
99#else
100#       define DPDK_USE_BLACKLIST 0
101#endif
102
103/*
104 * 1.7.0 :
105 *      rte_pmd_init_all is removed
106 *
107 * Replaced by:
108 *      Nothing, no longer needed
109 */
110#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
111#       define DPDK_USE_PMD_INIT 1
112#else
113#       define DPDK_USE_PMD_INIT 0
114#endif
115
116/* 1.7.0-rc3 :
117 *
118 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
119 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
120 * it twice.
121 */
122#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
123#       define DPDK_USE_PCI_PROBE 1
124#else
125#       define DPDK_USE_PCI_PROBE 0
126#endif
127
128/* 1.8.0-rc1 :
129 * LOG LEVEL is a command line option which overrides what
130 * we previously set it to.
131 */
132#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
133#       define DPDK_USE_LOG_LEVEL 1
134#else
135#       define DPDK_USE_LOG_LEVEL 0
136#endif
137
138/* 1.8.0-rc2
139 * rx/tx_conf thresholds can be set to NULL in rte_eth_rx/tx_queue_setup
140 * this uses the default values, which are better tuned per device
141 * See issue #26
142 */
143#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 2)
144#       define DPDK_USE_NULL_QUEUE_CONFIG 1
145#else
146#       define DPDK_USE_NULL_QUEUE_CONFIG 0
147#endif
148
149/* 2.0.0-rc1
150 * Unifies RSS hash between cards
151 */
152#if RTE_VERSION >= RTE_VERSION_NUM(2, 0, 0, 1)
153#       define RX_RSS_FLAGS (ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | \
154                             ETH_RSS_SCTP)
155#else
156#       define RX_RSS_FLAGS (ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | \
157                             ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP |\
158                             ETH_RSS_IPV6_UDP)
159#endif
160
161/* v16.07-rc1 - deprecated
162 * rte_mempool_avail_count to replace rte_mempool_count
163 * rte_mempool_in_use_count to replace rte_mempool_free_count
164 */
165#if RTE_VERSION < RTE_VERSION_NUM(16, 7, 0, 1)
166#define rte_mempool_avail_count rte_mempool_count
167#define rte_mempool_in_use_count rte_mempool_free_count
168#endif
169
170#include <rte_per_lcore.h>
171#include <rte_debug.h>
172#include <rte_errno.h>
173#include <rte_common.h>
174#include <rte_log.h>
175#include <rte_memcpy.h>
176#include <rte_prefetch.h>
177#include <rte_branch_prediction.h>
178#include <rte_pci.h>
179#include <rte_ether.h>
180#include <rte_ethdev.h>
181#include <rte_ring.h>
182#include <rte_mempool.h>
183#include <rte_mbuf.h>
184#include <rte_launch.h>
185#include <rte_lcore.h>
186#include <rte_per_lcore.h>
187#include <rte_cycles.h>
188#include <pthread.h>
189#ifdef __FreeBSD__
190#include <pthread_np.h>
191#endif
192
193/* 16.04-rc3 ETH_LINK_SPEED_X are replaced with ETH_SPEED_NUM_X.
194 * ETH_LINK_SPEED_ are reused as flags, ugly.
195 * We use the new way in this code.
196 */
197#ifndef ETH_SPEED_NUM_1G
198        #define ETH_SPEED_NUM_1G ETH_LINK_SPEED_1000
199        #define ETH_SPEED_NUM_10G ETH_LINK_SPEED_10G
200        #define ETH_SPEED_NUM_20G ETH_LINK_SPEED_20G
201        #define ETH_SPEED_NUM_40G ETH_LINK_SPEED_40G
202#endif
203
204/* The default size of memory buffers to use - This is the max size of standard
205 * ethernet packet less the size of the MAC CHECKSUM */
206#define RX_MBUF_SIZE 1514
207
208/* The minimum number of memory buffers per queue tx or rx. Based on
209 * the requirement of the memory pool with 128 per thread buffers, needing
210 * at least 128*1.5 = 192 buffers. Our code allocates 128*2 to be safe.
211 */
212#define MIN_NB_BUF 128
213
214/* Number of receive memory buffers to use
215 * By default this is limited by driver to 4k and must be a multiple of 128.
216 * A modification can be made to the driver to remove this limit.
217 * This can be increased in the driver and here.
218 * Should be at least MIN_NB_BUF.
219 * We choose 2K rather than 4K because it enables the usage of sse vector
220 * drivers which are significantly faster than using the larger buffer.
221 */
222#define NB_RX_MBUF (4096/2)
223
224/* Number of send memory buffers to use.
225 * Same limits apply as those to NB_TX_MBUF.
226 */
227#define NB_TX_MBUF 1024
228
229/* The size of the PCI blacklist needs to be big enough to contain
230 * every PCI device address (listed by lspci every bus:device.function tuple).
231 */
232#define BLACK_LIST_SIZE 50
233
234/* The maximum number of characters the mempool name can be */
235#define MEMPOOL_NAME_LEN 20
236
237/* For single threaded libtrace we read packets as a batch/burst
238 * this is the maximum size of said burst */
239#define BURST_SIZE 32
240
241#define MBUF(x) ((struct rte_mbuf *) x)
242/* Get the original placement of the packet data */
243#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
244#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
245#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
246
247#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
248#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
249
250#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
251                        (uint64_t) tv.tv_usec*1000ull)
252#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
253                        (uint64_t) ts.tv_nsec)
254
255#if RTE_PKTMBUF_HEADROOM != 128
256#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
257         "any libtrace instance processing these packet must be have the" \
258         "same RTE_PKTMBUF_HEADROOM set"
259#endif
260
261/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
262 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
263 *
264 * Make sure you understand what these are doing before enabling them.
265 * They might make traces incompatible with other builds etc.
266 *
267 * These are also included to show how to do somethings which aren't
268 * obvious in the DPDK documentation.
269 */
270
271/* Print verbose messages to stderr */
272#define DEBUG 0
273
274/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
275 * only turn on if you know clock_gettime is a vsyscall on your system
276 * otherwise could be a large overhead. Again gettimeofday() should be
277 * vsyscall also if it's not you should seriously consider updating your
278 * kernel.
279 */
280#ifdef HAVE_CLOCK_GETTIME
281/* You can turn this on (set to 1) to prefer clock_gettime */
282#define USE_CLOCK_GETTIME 1
283#else
284/* DON'T CHANGE THIS !!! */
285#define USE_CLOCK_GETTIME 0
286#endif
287
288/* This is fairly safe to turn on - currently there appears to be a 'bug'
289 * in DPDK that will remove the checksum by making the packet appear 4bytes
290 * smaller than what it really is. Most formats don't include the checksum
291 * hence writing out a port such as int: ring: and dpdk: assumes there
292 * is no checksum and will attempt to write the checksum as part of the
293 * packet
294 */
295#define GET_MAC_CRC_CHECKSUM 0
296
297/* This requires a modification of the pmd drivers (inside Intel DPDK)
298 * TODO this requires updating (packet sizes are wrong TS most likely also)
299 */
300#define HAS_HW_TIMESTAMPS_82580 0
301
302#if HAS_HW_TIMESTAMPS_82580
303# define TS_NBITS_82580     40
304/* The maximum on the +ve or -ve side that we can be, make it half way */
305# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
306#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
307#endif
308
309static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
310/* Memory pools Per NUMA node */
311static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
312
313/* As per Intel 82580 specification - mismatch in 82580 datasheet
314 * it states ts is stored in Big Endian, however its actually Little */
315struct hw_timestamp_82580 {
316        uint64_t reserved;
317        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
318};
319
320enum paused_state {
321        DPDK_NEVER_STARTED,
322        DPDK_RUNNING,
323        DPDK_PAUSED,
324};
325
326struct dpdk_per_stream_t
327{
328        uint16_t queue_id;
329        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
330        struct rte_mempool *mempool;
331        int lcore;
332#if HAS_HW_TIMESTAMPS_82580
333        /* Timestamping only relevant to RX */
334        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
335        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
336#endif
337} ALIGN_STRUCT(CACHE_LINE_SIZE);
338
339#if HAS_HW_TIMESTAMPS_82580
340#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
341#else
342#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
343#endif
344
345typedef struct dpdk_per_stream_t dpdk_per_stream_t;
346
347/* Used by both input and output however some fields are not used
348 * for output */
349struct dpdk_format_data_t {
350        int8_t promisc; /* promiscuous mode - RX only */
351        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
352        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
353        uint8_t paused; /* See paused_state */
354        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
355        int snaplen; /* The snap length for the capture - RX only */
356        /* We always have to setup both rx and tx queues even if we don't want them */
357        int nb_rx_buf; /* The number of packet buffers in the rx ring */
358        int nb_tx_buf; /* The number of packet buffers in the tx ring */
359        int nic_numa_node; /* The NUMA node that the NIC is attached to */
360        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
361#if DPDK_USE_BLACKLIST
362        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
363        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
364#endif
365        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
366        enum hasher_types hasher_type;
367        /* To improve single-threaded performance we always batch reading
368         * packets, in a burst, otherwise the parallel library does this for us */
369        struct rte_mbuf* burst_pkts[BURST_SIZE];
370        int burst_size; /* The total number read in the burst */
371        int burst_offset; /* The offset we are into the burst */
372
373        /* Our parallel streams */
374        libtrace_list_t *per_stream;
375};
376
377enum dpdk_addt_hdr_flags {
378        INCLUDES_CHECKSUM = 0x1,
379        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
380};
381
382/**
383 * A structure placed in front of the packet where we can store
384 * additional information about the given packet.
385 * +--------------------------+
386 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
387 * +--------------------------+
388 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
389 * +--------------------------+
390 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
391 * +--------------------------+
392 * *   hw_timestamp_82580     * 16 bytes Optional
393 * +--------------------------+
394 * |       Packet data        | Variable Size
395 * |                          |
396 */
397struct dpdk_addt_hdr {
398        uint64_t timestamp;
399        uint8_t flags;
400        uint8_t direction;
401        uint8_t reserved1;
402        uint8_t reserved2;
403        uint32_t cap_len; /* The size to say the capture is */
404};
405
406/**
407 * We want to blacklist all devices except those on the whitelist
408 * (I say list, but yes it is only the one).
409 *
410 * The default behaviour of rte_pci_probe() will map every possible device
411 * to its DPDK driver. The DPDK driver will take the ethernet device
412 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
413 *
414 * So blacklist all devices except the one that we wish to use so that
415 * the others can still be used as standard ethernet ports.
416 *
417 * @return 0 if successful, otherwise -1 on error.
418 */
419#if DPDK_USE_BLACKLIST
420static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
421{
422        struct rte_pci_device *dev = NULL;
423        format_data->nb_blacklist = 0;
424
425        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
426
427        TAILQ_FOREACH(dev, &device_list, next) {
428        if (whitelist != NULL && whitelist->domain == dev->addr.domain
429            && whitelist->bus == dev->addr.bus
430            && whitelist->devid == dev->addr.devid
431            && whitelist->function == dev->addr.function)
432            continue;
433                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
434                                / sizeof (format_data->blacklist[0])) {
435                        fprintf(stderr, "Warning: too many devices to blacklist consider"
436                                        " increasing BLACK_LIST_SIZE");
437                        break;
438                }
439                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
440                ++format_data->nb_blacklist;
441        }
442
443        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
444        return 0;
445}
446#else /* DPDK_USE_BLACKLIST */
447#include <rte_devargs.h>
448static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
449{
450        char pci_str[20] = {0};
451        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
452                 whitelist->domain,
453                 whitelist->bus,
454                 whitelist->devid,
455                 whitelist->function);
456        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
457                return -1;
458        }
459        return 0;
460}
461#endif
462
463/**
464 * Parse the URI format as a pci address
465 * Fills in addr, note core is optional and is unchanged if
466 * a value for it is not provided.
467 *
468 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
469 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
470 */
471static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
472        int matches;
473        assert(str);
474        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
475                         &addr->domain, &addr->bus, &addr->devid,
476                         &addr->function, core);
477        if (matches >= 4) {
478                return 0;
479        } else {
480                return -1;
481        }
482}
483
484/**
485 * Convert a pci address to the numa node it is
486 * connected to.
487 *
488 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
489 * so we can call it before DPDK
490 *
491 * @return -1 if unknown otherwise a number 0 or higher of the numa node
492 */
493static int pci_to_numa(struct rte_pci_addr * dev_addr) {
494        char path[50] = {0};
495        FILE *file;
496
497        /* Read from the system */
498        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
499                 dev_addr->domain,
500                 dev_addr->bus,
501                 dev_addr->devid,
502                 dev_addr->function);
503
504        if((file = fopen(path, "r")) != NULL) {
505                int numa_node = -1;
506                fscanf(file, "%d", &numa_node);
507                fclose(file);
508                return numa_node;
509        }
510        return -1;
511}
512
513#if DEBUG
514/* For debugging */
515static inline void dump_configuration()
516{
517        struct rte_config * global_config;
518        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
519
520        if (nb_cpu <= 0) {
521                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
522                       " Falling back to the first core.");
523                nb_cpu = 1; /* fallback to just 1 core */
524        }
525        if (nb_cpu > RTE_MAX_LCORE)
526                nb_cpu = RTE_MAX_LCORE;
527
528        global_config = rte_eal_get_configuration();
529
530        if (global_config != NULL) {
531                int i;
532                fprintf(stderr, "Intel DPDK setup\n"
533                        "---Version      : %s\n"
534                        "---Master LCore : %"PRIu32"\n"
535                        "---LCore Count  : %"PRIu32"\n",
536                        rte_version(),
537                        global_config->master_lcore, global_config->lcore_count);
538
539                for (i = 0 ; i < nb_cpu; i++) {
540                        fprintf(stderr, "   ---Core %d : %s\n", i,
541                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
542                }
543
544                const char * proc_type;
545                switch (global_config->process_type) {
546                case RTE_PROC_AUTO:
547                        proc_type = "auto";
548                        break;
549                case RTE_PROC_PRIMARY:
550                        proc_type = "primary";
551                        break;
552                case RTE_PROC_SECONDARY:
553                        proc_type = "secondary";
554                        break;
555                case RTE_PROC_INVALID:
556                        proc_type = "invalid";
557                        break;
558                default:
559                        proc_type = "something worse than invalid!!";
560                }
561                fprintf(stderr, "---Process Type : %s\n", proc_type);
562        }
563
564}
565#endif
566
567/**
568 * Expects to be called from the master lcore and moves it to the given dpdk id
569 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
570 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
571 *               and not already in use.
572 * @return 0 is successful otherwise -1 on error.
573 */
574static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
575        struct rte_config *cfg = rte_eal_get_configuration();
576        cpu_set_t cpuset;
577        int i;
578
579        assert (core < RTE_MAX_LCORE);
580        assert (rte_get_master_lcore() == rte_lcore_id());
581
582        if (core == rte_lcore_id())
583                return 0;
584
585        /* Make sure we are not overwriting someone else */
586        assert(!rte_lcore_is_enabled(core));
587
588        /* Move the core */
589        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
590        cfg->lcore_role[core] = ROLE_RTE;
591        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
592        rte_eal_get_configuration()->master_lcore = core;
593        RTE_PER_LCORE(_lcore_id) = core;
594
595        /* Now change the affinity, either mapped to a single core or all accepted */
596        CPU_ZERO(&cpuset);
597
598        if (lcore_config[core].detected) {
599                CPU_SET(core, &cpuset);
600        } else {
601                for (i = 0; i < RTE_MAX_LCORE; ++i) {
602                        if (lcore_config[i].detected)
603                                CPU_SET(i, &cpuset);
604                }
605        }
606
607        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
608        if (i != 0) {
609                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
610                return -1;
611        }
612        return 0;
613}
614
615/**
616 * XXX This is very bad XXX
617 * But we have to do something to allow getopts nesting
618 * Luckly normally the format is last so it doesn't matter
619 * DPDK only supports modern systems so hopefully this
620 * will continue to work
621 */
622struct saved_getopts {
623        char *optarg;
624        int optind;
625        int opterr;
626        int optopt;
627};
628
629static void save_getopts(struct saved_getopts *opts) {
630        opts->optarg = optarg;
631        opts->optind = optind;
632        opts->opterr = opterr;
633        opts->optopt = optopt;
634}
635
636static void restore_getopts(struct saved_getopts *opts) {
637        optarg = opts->optarg;
638        optind = opts->optind;
639        opterr = opts->opterr;
640        optopt = opts->optopt;
641}
642
643static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
644                                        char * err, int errlen) {
645        int ret; /* Returned error codes */
646        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
647        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
648        char mem_map[20] = {0}; /* The memory name */
649        long nb_cpu; /* The number of CPUs in the system */
650        long my_cpu; /* The CPU number we want to bind to */
651        int i;
652        struct rte_config *cfg = rte_eal_get_configuration();
653        struct saved_getopts save_opts;
654
655        /* This initialises the Environment Abstraction Layer (EAL)
656         * If we had slave workers these are put into WAITING state
657         *
658         * Basically binds this thread to a fixed core, which we choose as
659         * the last core on the machine (assuming fewer interrupts mapped here).
660         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
661         * "-n" the number of memory channels into the CPU (hardware specific)
662         *      - Most likely to be half the number of ram slots in your machine.
663         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
664         * Controls where in memory packets are stored such that they are spread
665         * across the channels. We just use 1 to be safe.
666         *
667         * Using unique file prefixes mean separate memory is used, unlinking
668         * the two processes. However be careful we still cannot access a
669         * port that already in use.
670         */
671        char* argv[] = {"libtrace",
672                        "-c", cpu_number,
673                        "-n", "1",
674                        "--proc-type", "auto",
675                        "--file-prefix", mem_map,
676                        "-m", "512",
677#if DPDK_USE_LOG_LEVEL
678#       if DEBUG
679                        "--log-level", "8", /* RTE_LOG_DEBUG */
680#       else
681                        "--log-level", "5", /* RTE_LOG_WARNING */
682#       endif
683#endif
684                        NULL};
685        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
686
687#if DEBUG
688        rte_set_log_level(RTE_LOG_DEBUG);
689#else
690        rte_set_log_level(RTE_LOG_WARNING);
691#endif
692
693        /* Get the number of cpu cores in the system and use the last core
694         * on the correct numa node */
695        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
696        if (nb_cpu <= 0) {
697                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
698                       " Falling back to the first core.");
699                nb_cpu = 1; /* fallback to the first core */
700        }
701        if (nb_cpu > RTE_MAX_LCORE)
702                nb_cpu = RTE_MAX_LCORE;
703
704        my_cpu = -1;
705        /* This allows the user to specify the core - we would try to do this
706         * automatically but it's hard to tell that this is secondary
707         * before running rte_eal_init(...). Currently we are limited to 1
708         * instance per core due to the way memory is allocated. */
709        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
710                snprintf(err, errlen, "Failed to parse URI");
711                return -1;
712        }
713
714#if HAVE_LIBNUMA
715        format_data->nic_numa_node = pci_to_numa(&use_addr);
716        if (my_cpu < 0) {
717#if DEBUG
718                /* If we can assign to a core on the same numa node */
719                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
720#endif
721                if(format_data->nic_numa_node >= 0) {
722                        int max_node_cpu = -1;
723                        struct bitmask *mask = numa_allocate_cpumask();
724                        assert(mask);
725                        numa_node_to_cpus(format_data->nic_numa_node, mask);
726                        for (i = 0 ; i < nb_cpu; ++i) {
727                                if (numa_bitmask_isbitset(mask,i))
728                                        max_node_cpu = i+1;
729                        }
730                        my_cpu = max_node_cpu;
731                }
732        }
733#endif
734        if (my_cpu < 0) {
735                my_cpu = nb_cpu;
736        }
737
738
739        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
740                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
741
742        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
743                snprintf(err, errlen,
744                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
745                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
746                return -1;
747        }
748
749        /* Make our mask with all cores turned on this is so that DPDK
750         * gets all CPU info in older versions */
751        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
752        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
753
754#if !DPDK_USE_BLACKLIST
755        /* Black list all ports besides the one that we want to use */
756        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
757                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
758                         " are you sure the address is correct?: %s", strerror(-ret));
759                return -1;
760        }
761#endif
762
763        /* Give the memory map a unique name */
764        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
765        /* rte_eal_init it makes a call to getopt so we need to reset the
766         * global optind variable of getopt otherwise this fails */
767        save_getopts(&save_opts);
768        optind = 1;
769        if ((ret = rte_eal_init(argc, argv)) < 0) {
770                snprintf(err, errlen,
771                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
772                return -1;
773        }
774        restore_getopts(&save_opts);
775        // These are still running but will never do anything with DPDK v1.7 we
776        // should remove this XXX in the future
777        for(i = 0; i < RTE_MAX_LCORE; ++i) {
778                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
779                        cfg->lcore_role[i] = ROLE_OFF;
780                        cfg->lcore_count--;
781                }
782        }
783        // Only the master should be running
784        assert(cfg->lcore_count == 1);
785
786        // TODO XXX TODO
787        dpdk_move_master_lcore(NULL, my_cpu-1);
788
789#if DEBUG
790        dump_configuration();
791#endif
792
793#if DPDK_USE_PMD_INIT
794        /* This registers all available NICs with Intel DPDK
795         * These are not loaded until rte_eal_pci_probe() is called.
796         */
797        if ((ret = rte_pmd_init_all()) < 0) {
798                snprintf(err, errlen,
799                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
800                return -1;
801        }
802#endif
803
804#if DPDK_USE_BLACKLIST
805        /* Blacklist all ports besides the one that we want to use */
806        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
807                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
808                         " are you sure the address is correct?: %s", strerror(-ret));
809                return -1;
810        }
811#endif
812
813#if DPDK_USE_PCI_PROBE
814        /* This loads DPDK drivers against all ports that are not blacklisted */
815        if ((ret = rte_eal_pci_probe()) < 0) {
816                snprintf(err, errlen,
817                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
818                return -1;
819        }
820#endif
821
822        format_data->nb_ports = rte_eth_dev_count();
823
824        if (format_data->nb_ports != 1) {
825                snprintf(err, errlen,
826                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
827                         format_data->nb_ports);
828                return -1;
829        }
830
831        return 0;
832}
833
834static int dpdk_init_input (libtrace_t *libtrace) {
835        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
836        char err[500];
837        err[0] = 0;
838
839        libtrace->format_data = (struct dpdk_format_data_t *)
840                                malloc(sizeof(struct dpdk_format_data_t));
841        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
842        FORMAT(libtrace)->nb_ports = 0;
843        FORMAT(libtrace)->snaplen = 0; /* Use default */
844        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
845        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
846        FORMAT(libtrace)->nic_numa_node = -1;
847        FORMAT(libtrace)->promisc = -1;
848        FORMAT(libtrace)->pktmbuf_pool = NULL;
849#if DPDK_USE_BLACKLIST
850        FORMAT(libtrace)->nb_blacklist = 0;
851#endif
852        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
853        FORMAT(libtrace)->mempool_name[0] = 0;
854        memset(FORMAT(libtrace)->burst_pkts, 0,
855               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
856        FORMAT(libtrace)->burst_size = 0;
857        FORMAT(libtrace)->burst_offset = 0;
858        FORMAT(libtrace)->hasher_type = HASHER_BALANCE;
859
860        /* Make our first stream */
861        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
862        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
863
864        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
865                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
866                free(libtrace->format_data);
867                libtrace->format_data = NULL;
868                return -1;
869        }
870        return 0;
871}
872
873static int dpdk_init_output(libtrace_out_t *libtrace)
874{
875        char err[500];
876        err[0] = 0;
877
878        libtrace->format_data = (struct dpdk_format_data_t *)
879                                malloc(sizeof(struct dpdk_format_data_t));
880        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
881        FORMAT(libtrace)->nb_ports = 0;
882        FORMAT(libtrace)->snaplen = 0; /* Use default */
883        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
884        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
885        FORMAT(libtrace)->nic_numa_node = -1;
886        FORMAT(libtrace)->promisc = -1;
887        FORMAT(libtrace)->pktmbuf_pool = NULL;
888#if DPDK_USE_BLACKLIST
889        FORMAT(libtrace)->nb_blacklist = 0;
890#endif
891        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
892        FORMAT(libtrace)->mempool_name[0] = 0;
893        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
894        FORMAT(libtrace)->burst_size = 0;
895        FORMAT(libtrace)->burst_offset = 0;
896
897        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
898                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
899                free(libtrace->format_data);
900                libtrace->format_data = NULL;
901                return -1;
902        }
903        return 0;
904}
905
906/**
907 * Note here snaplen excludes the MAC checksum. Packets over
908 * the requested snaplen will be dropped. (Excluding MAC checksum)
909 *
910 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
911 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
912 * is set the maximum size of the returned packet would be 1518 otherwise
913 * 1514 would be the largest size possibly returned.
914 *
915 */
916static int dpdk_config_input (libtrace_t *libtrace,
917                              trace_option_t option,
918                              void *data) {
919        switch (option) {
920        case TRACE_OPTION_SNAPLEN:
921                /* Only support changing snaplen before a call to start is
922                 * made */
923                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
924                        FORMAT(libtrace)->snaplen=*(int*)data;
925                else
926                        return -1;
927                return 0;
928        case TRACE_OPTION_PROMISC:
929                FORMAT(libtrace)->promisc=*(int*)data;
930                return 0;
931        case TRACE_OPTION_HASHER:
932                FORMAT(libtrace)->hasher_type=*(enum hasher_types*)data;
933                return 0;
934        case TRACE_OPTION_FILTER:
935                /* TODO filtering */
936        case TRACE_OPTION_META_FREQ:
937        case TRACE_OPTION_EVENT_REALTIME:
938                break;
939        /* Avoid default: so that future options will cause a warning
940         * here to remind us to implement it, or flag it as
941         * unimplementable
942         */
943        }
944
945        /* Don't set an error - trace_config will try to deal with the
946         * option and will set an error if it fails */
947        return -1;
948}
949
950/* Can set jumbo frames/ or limit the size of a frame by setting both
951 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
952 *
953 */
954static struct rte_eth_conf port_conf = {
955        .rxmode = {
956                .mq_mode = ETH_RSS,
957                .split_hdr_size = 0,
958                .header_split   = 0, /**< Header Split disabled */
959                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
960                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
961                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
962                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
963#if GET_MAC_CRC_CHECKSUM
964/* So it appears that if hw_strip_crc is turned off the driver will still
965 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
966 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
967 * So lets just add it back on when we receive the packet.
968 */
969                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
970#else
971/* By default strip the MAC checksum because it's a bit of a hack to
972 * actually read these. And don't want to rely on disabling this to actualy
973 * always cut off the checksum in the future
974 */
975                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
976#endif
977        },
978        .txmode = {
979                .mq_mode = ETH_DCB_NONE,
980        },
981        .rx_adv_conf = {
982                .rss_conf = {
983                        .rss_hf = RX_RSS_FLAGS,
984                },
985        },
986        .intr_conf = {
987                .lsc = 1
988        }
989};
990
991static const struct rte_eth_rxconf rx_conf = {
992        .rx_thresh = {
993                .pthresh = 8,/* RX_PTHRESH prefetch */
994                .hthresh = 8,/* RX_HTHRESH host */
995                .wthresh = 4,/* RX_WTHRESH writeback */
996        },
997        .rx_free_thresh = 0,
998        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
999};
1000
1001static const struct rte_eth_txconf tx_conf = {
1002        .tx_thresh = {
1003                /*
1004                 * TX_PTHRESH prefetch
1005                 * Set on the NIC, if the number of unprocessed descriptors to queued on
1006                 * the card fall below this try grab at least hthresh more unprocessed
1007                 * descriptors.
1008                 */
1009                .pthresh = 36,
1010
1011                /* TX_HTHRESH host
1012                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
1013                 */
1014                .hthresh = 0,
1015
1016                /* TX_WTHRESH writeback
1017                 * Set on the NIC, the number of sent descriptors before writing back
1018                 * status to confirm the transmission. This is done more efficiently as
1019                 * a bulk DMA-transfer rather than writing one at a time.
1020                 * Similar to tx_free_thresh however this is applied to the NIC, where
1021                 * as tx_free_thresh is when DPDK will check these. This is extended
1022                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
1023                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
1024                 */
1025                .wthresh = 4,
1026        },
1027
1028        /* Used internally by DPDK rather than passed to the NIC. The number of
1029         * packet descriptors to send before checking for any responses written
1030         * back (to confirm the transmission). Default = 32 if set to 0)
1031         */
1032        .tx_free_thresh = 0,
1033
1034        /* This is the Report Status threshold, used by 10Gbit cards,
1035         * This signals the card to only write back status (such as
1036         * transmission successful) after this minimum number of transmit
1037         * descriptors are seen. The default is 32 (if set to 0) however if set
1038         * to greater than 1 TX wthresh must be set to zero, because this is kindof
1039         * a replacement. See the dpdk programmers guide for more restrictions.
1040         */
1041        .tx_rs_thresh = 1,
1042};
1043
1044/**
1045 * A callback for a link state change (LSC).
1046 *
1047 * Packets may be received before this notification. In fact the DPDK IGXBE
1048 * driver likes to put a delay upto 5sec before sending this.
1049 *
1050 * We use this to ensure the link speed is correct for our timestamp
1051 * calculations. Because packets might be received before the link up we still
1052 * update this when the packet is received.
1053 *
1054 * @param port The DPDK port
1055 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1056 * @param cb_arg The dpdk_format_data_t structure associated with the format
1057 */
1058static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1059                              void *cb_arg) {
1060        struct dpdk_format_data_t * format_data = cb_arg;
1061        struct rte_eth_link link_info;
1062        assert(event == RTE_ETH_EVENT_INTR_LSC);
1063        assert(port == format_data->port);
1064
1065        rte_eth_link_get_nowait(port, &link_info);
1066
1067        if (link_info.link_status)
1068                format_data->link_speed = link_info.link_speed;
1069        else
1070                format_data->link_speed = 0;
1071
1072#if DEBUG
1073        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1074                link_info.link_status ? "up" : "down",
1075                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1076                                          "full-duplex" : "half-duplex",
1077                (int) link_info.link_speed);
1078#endif
1079
1080        /* Turns out DPDK drivers might not come back up if the link speed
1081         * changes. So we reset the autoneg procedure. This is very unsafe
1082         * we have have threads reading packets and we stop the port. */
1083#if 0
1084        if (!link_info.link_status) {
1085                int ret;
1086                rte_eth_dev_stop(port);
1087                ret = rte_eth_dev_start(port);
1088                if (ret < 0) {
1089                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1090                                strerror(-ret));
1091                }
1092        }
1093#endif
1094}
1095
1096/** Reserve a DPDK lcore ID for a thread globally.
1097 *
1098 * @param real If true allocate a real lcore, otherwise allocate a core which
1099 * does not exist on the local machine.
1100 * @param socket the prefered NUMA socket - only used if a real core is requested
1101 * @return a valid core, which can later be used with dpdk_register_lcore() or a
1102 * -1 if have run out of cores.
1103 *
1104 * If any thread is reading or freeing packets we need to register it here
1105 * due to TLS caches in the memory pools.
1106 */
1107static int dpdk_reserve_lcore(bool real, int socket) {
1108        int new_id = -1;
1109        int i;
1110        struct rte_config *cfg = rte_eal_get_configuration();
1111        (void) socket;
1112
1113        pthread_mutex_lock(&dpdk_lock);
1114        /* If 'reading packets' fill in cores from 0 up and bind affinity
1115         * otherwise start from the MAX core (which is also the master) and work backwards
1116         * in this case physical cores on the system will not exist so we don't bind
1117         * these to any particular physical core */
1118        if (real) {
1119#if HAVE_LIBNUMA
1120                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1121                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
1122                                new_id = i;
1123                                if (!lcore_config[i].detected)
1124                                        new_id = -1;
1125                                break;
1126                        }
1127                }
1128#endif
1129                /* Retry without the the numa restriction */
1130                if (new_id == -1) {
1131                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1132                                if (!rte_lcore_is_enabled(i)) {
1133                                        new_id = i;
1134                                        if (!lcore_config[i].detected)
1135                                                fprintf(stderr, "Warning the"
1136                                                        " number of 'reading' "
1137                                                        "threads exceed cores\n");
1138                                        break;
1139                                }
1140                        }
1141                }
1142        } else {
1143                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1144                        if (!rte_lcore_is_enabled(i)) {
1145                                new_id = i;
1146                                break;
1147                        }
1148                }
1149        }
1150
1151        if (new_id != -1) {
1152                /* Enable the core in global DPDK structs */
1153                cfg->lcore_role[new_id] = ROLE_RTE;
1154                cfg->lcore_count++;
1155        }
1156
1157        pthread_mutex_unlock(&dpdk_lock);
1158        return new_id;
1159}
1160
1161/** Register a thread as a lcore
1162 * @param libtrace any error is set against libtrace on exit
1163 * @param real If this is a true lcore we will bind its affinty to the
1164 * requested core.
1165 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
1166 * @return 0, if successful otherwise -1 if an error occured (details are stored
1167 * in libtrace)
1168 *
1169 * @note This must be called from the thread being registered.
1170 */
1171static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
1172        int ret;
1173        RTE_PER_LCORE(_lcore_id) = lcore;
1174
1175        /* Set affinity bind to corresponding core */
1176        if (real) {
1177                cpu_set_t cpuset;
1178                CPU_ZERO(&cpuset);
1179                CPU_SET(rte_lcore_id(), &cpuset);
1180                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1181                if (ret != 0) {
1182                        trace_set_err(libtrace, errno, "Warning "
1183                                      "pthread_setaffinity_np failed");
1184                        return -1;
1185                }
1186        }
1187
1188        return 0;
1189}
1190
1191/** Allocates a new dpdk packet buffer memory pool.
1192 *
1193 * @param n The number of threads
1194 * @param pkt_size The packet size we need ot store
1195 * @param socket_id The NUMA socket id
1196 * @param A new mempool, if NULL query the DPDK library for the error code
1197 * see rte_mempool_create() documentation.
1198 *
1199 * This allocates a new pool or recycles an existing memory pool.
1200 * Call dpdk_free_memory() to free the memory.
1201 * We cannot delete memory so instead we store the pools, allowing them to be
1202 * re-used.
1203 */
1204static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1205                                             unsigned pkt_size,
1206                                             int socket_id) {
1207        struct rte_mempool *ret;
1208        size_t j,k;
1209        char name[MEMPOOL_NAME_LEN];
1210
1211        /* Add on packet size overheads */
1212        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1213
1214        pthread_mutex_lock(&dpdk_lock);
1215
1216        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1217                /* Best guess go for zero */
1218                socket_id = 0;
1219        }
1220
1221        /* Find a valid pool */
1222        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1223                if (mem_pools[socket_id][j]->size >= n &&
1224                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1225                        break;
1226                }
1227        }
1228
1229        /* Find the end (+1) of the list */
1230        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1231
1232        if (mem_pools[socket_id][j]) {
1233                ret = mem_pools[socket_id][j];
1234                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1235                mem_pools[socket_id][k-1] = NULL;
1236                mem_pools[socket_id][j] = NULL;
1237        } else {
1238                static uint32_t test = 10;
1239                test++;
1240                snprintf(name, MEMPOOL_NAME_LEN,
1241                         "libtrace_pool_%"PRIu32, test);
1242
1243                ret = rte_mempool_create(name, n, pkt_size,
1244                                         128, sizeof(struct rte_pktmbuf_pool_private),
1245                                         rte_pktmbuf_pool_init, NULL,
1246                                         rte_pktmbuf_init, NULL,
1247                                         socket_id, 0);
1248        }
1249
1250        pthread_mutex_unlock(&dpdk_lock);
1251        return ret;
1252}
1253
1254/** Stores the memory against the DPDK library.
1255 *
1256 * @param mempool The mempool to free
1257 * @param socket_id The NUMA socket this mempool was allocated upon.
1258 *
1259 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1260 * store the memory shared globally against the format.
1261 */
1262static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1263        size_t i;
1264        pthread_mutex_lock(&dpdk_lock);
1265
1266        /* We should have all entries back in the mempool */
1267        rte_mempool_audit(mempool);
1268        if (!rte_mempool_full(mempool)) {
1269                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1270                        "free all packets before finishing a trace\n",
1271                        rte_mempool_avail_count(mempool), mempool->size);
1272        }
1273
1274        /* Find the end (+1) of the list */
1275        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1276
1277        if (i >= RTE_MAX_LCORE) {
1278                fprintf(stderr, "Too many memory pools, dropping this one\n");
1279        } else {
1280                mem_pools[socket_id][i] = mempool;
1281        }
1282
1283        pthread_mutex_unlock(&dpdk_lock);
1284}
1285
1286/* Attach memory to the port and start (or restart) the port/s.
1287 */
1288static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1289                              char *err, int errlen, uint16_t rx_queues) {
1290        int ret, i;
1291        struct rte_eth_link link_info; /* Wait for link */
1292        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1293
1294        /* Already started */
1295        if (format_data->paused == DPDK_RUNNING)
1296                return 0;
1297
1298        /* First time started we need to alloc our memory, doing this here
1299         * rather than in environment setup because we don't have snaplen then */
1300        if (format_data->paused == DPDK_NEVER_STARTED) {
1301                if (format_data->snaplen == 0) {
1302                        format_data->snaplen = RX_MBUF_SIZE;
1303                        port_conf.rxmode.jumbo_frame = 0;
1304                        port_conf.rxmode.max_rx_pkt_len = 0;
1305                } else {
1306                        /* Use jumbo frames */
1307                        port_conf.rxmode.jumbo_frame = 1;
1308                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1309                }
1310
1311#if GET_MAC_CRC_CHECKSUM
1312                /* This is additional overhead so make sure we allow space for this */
1313                format_data->snaplen += ETHER_CRC_LEN;
1314#endif
1315#if HAS_HW_TIMESTAMPS_82580
1316                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1317#endif
1318
1319                /* Create the mbuf pool, which is the place packets are allocated
1320                 * from - There is no free function (I cannot see one).
1321                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1322                 * allocate however that extra 1 packet is not used.
1323                 * (I assume <= vs < error some where in DPDK code)
1324                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1325                 * so that will fill the new buffer and wait until slots in the
1326                 * ring become available.
1327                 */
1328#if DEBUG
1329                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1330#endif
1331                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1332                                                              format_data->snaplen,
1333                                                              format_data->nic_numa_node);
1334
1335                if (format_data->pktmbuf_pool == NULL) {
1336                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1337                                 "pool failed: %s", strerror(rte_errno));
1338                        return -1;
1339                }
1340        }
1341
1342        // for symmetric rss, repeat 2 bytes
1343        // otherwise, use default rss key in drivers
1344        uint8_t rss_key[52]; // 52 for i40e, 40 for others
1345        if (format_data->hasher_type == HASHER_BIDIRECTIONAL) {
1346#if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
1347                struct rte_eth_dev_info dev_info;
1348                rte_eth_dev_info_get(format_data->port, &dev_info);
1349                port_conf.rx_adv_conf.rss_conf.rss_key_len = dev_info.hash_key_size;
1350#else
1351                port_conf.rx_adv_conf.rss_conf.rss_key_len = sizeof(rss_key);
1352#endif
1353                // first 2 bytes of rss_intel_key from drivers/net/e1000/igb_rxtx.c
1354                static uint8_t rss_key_2bytes[] = {0x6D, 0x5A};
1355                int i;
1356                for (i=0; i<port_conf.rx_adv_conf.rss_conf.rss_key_len; i += sizeof(rss_key_2bytes))
1357                        memcpy(rss_key + i, rss_key_2bytes, sizeof(rss_key_2bytes));
1358                port_conf.rx_adv_conf.rss_conf.rss_key = rss_key;
1359        }
1360
1361        /* ----------- Now do the setup for the port mapping ------------ */
1362        /* Order of calls must be
1363         * rte_eth_dev_configure()
1364         * rte_eth_tx_queue_setup()
1365         * rte_eth_rx_queue_setup()
1366         * rte_eth_dev_start()
1367         * other rte_eth calls
1368         */
1369
1370        /* This must be called first before another *eth* function
1371         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1372        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1373        if (ret < 0) {
1374                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1375                         " %"PRIu8" : %s", format_data->port,
1376                         strerror(-ret));
1377                return -1;
1378        }
1379#if DEBUG
1380        fprintf(stderr, "Doing dev configure\n");
1381#endif
1382        /* Initialise the TX queue a minimum value if using this port for
1383         * receiving. Otherwise a larger size if writing packets.
1384         */
1385        ret = rte_eth_tx_queue_setup(format_data->port,
1386                                     0 /* queue XXX */,
1387                                     format_data->nb_tx_buf,
1388                                     SOCKET_ID_ANY,
1389                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1390        if (ret < 0) {
1391                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1392                         " on port %"PRIu8" : %s", format_data->port,
1393                         strerror(-ret));
1394                return -1;
1395        }
1396
1397        /* Attach memory to our RX queues */
1398        for (i=0; i < rx_queues; i++) {
1399                dpdk_per_stream_t *stream;
1400#if DEBUG
1401                fprintf(stderr, "Configuring queue %d\n", i);
1402#endif
1403
1404                /* Add storage for the stream */
1405                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1406                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1407                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1408                stream->queue_id = i;
1409
1410                if (stream->lcore == -1)
1411                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1412
1413                if (stream->lcore == -1) {
1414                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1415                                 ". Too many threads?");
1416                        return -1;
1417                }
1418
1419                if (stream->mempool == NULL) {
1420                        stream->mempool = dpdk_alloc_memory(
1421                                                  format_data->nb_rx_buf*2,
1422                                                  format_data->snaplen,
1423                                                  rte_lcore_to_socket_id(stream->lcore));
1424
1425                        if (stream->mempool == NULL) {
1426                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1427                                         "pool failed: %s", strerror(rte_errno));
1428                                return -1;
1429                        }
1430                }
1431
1432                /* Initialise the RX queue with some packets from memory */
1433                ret = rte_eth_rx_queue_setup(format_data->port,
1434                                             stream->queue_id,
1435                                             format_data->nb_rx_buf,
1436                                             format_data->nic_numa_node,
1437                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1438                                             stream->mempool);
1439                if (ret < 0) {
1440                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1441                                 " RX queue on port %"PRIu8" : %s",
1442                                 format_data->port,
1443                                 strerror(-ret));
1444                        return -1;
1445                }
1446        }
1447
1448#if DEBUG
1449        fprintf(stderr, "Doing start device\n");
1450#endif
1451        rte_eth_stats_reset(format_data->port);
1452        /* Start device */
1453        ret = rte_eth_dev_start(format_data->port);
1454        if (ret < 0) {
1455                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1456                         strerror(-ret));
1457                return -1;
1458        }
1459
1460        /* Default promiscuous to on */
1461        if (format_data->promisc == -1)
1462                format_data->promisc = 1;
1463
1464        if (format_data->promisc == 1)
1465                rte_eth_promiscuous_enable(format_data->port);
1466        else
1467                rte_eth_promiscuous_disable(format_data->port);
1468
1469        /* We have now successfully started/unpased */
1470        format_data->paused = DPDK_RUNNING;
1471
1472
1473        /* Register a callback for link state changes */
1474        ret = rte_eth_dev_callback_register(format_data->port,
1475                                            RTE_ETH_EVENT_INTR_LSC,
1476                                            dpdk_lsc_callback,
1477                                            format_data);
1478#if DEBUG
1479        if (ret)
1480                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1481                        ret, strerror(-ret));
1482#endif
1483
1484        /* Get the current link status */
1485        rte_eth_link_get_nowait(format_data->port, &link_info);
1486        format_data->link_speed = link_info.link_speed;
1487#if DEBUG
1488        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1489                (int) link_info.link_duplex, (int) link_info.link_speed);
1490#endif
1491
1492        return 0;
1493}
1494
1495static int dpdk_start_input (libtrace_t *libtrace) {
1496        char err[500];
1497        err[0] = 0;
1498
1499        /* Make sure we don't reserve an extra thread for this */
1500        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1501
1502        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1503                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1504                free(libtrace->format_data);
1505                libtrace->format_data = NULL;
1506                return -1;
1507        }
1508        return 0;
1509}
1510
1511static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1512        struct rte_eth_dev_info dev_info;
1513        rte_eth_dev_info_get(port_id, &dev_info);
1514        return dev_info.max_rx_queues;
1515}
1516
1517static inline size_t dpdk_processor_count () {
1518        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1519        if (nb_cpu <= 0)
1520                return 1;
1521        else
1522                return (size_t) nb_cpu;
1523}
1524
1525static int dpdk_pstart_input (libtrace_t *libtrace) {
1526        char err[500];
1527        int i=0, phys_cores=0;
1528        int tot = libtrace->perpkt_thread_count;
1529        libtrace_list_node_t *n;
1530        err[0] = 0;
1531
1532        if (rte_lcore_id() != rte_get_master_lcore())
1533                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1534                        " from the master DPDK thread!\n");
1535
1536        /* If the master is not on the last thread we move it there */
1537        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1538                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1539                        return -1;
1540        }
1541
1542        /* Don't exceed the number of cores in the system/detected by dpdk
1543         * We don't have to force this but performance wont be good if we don't */
1544        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1545                if (lcore_config[i].detected) {
1546                        if (rte_lcore_is_enabled(i)) {
1547#if DEBUG
1548                                fprintf(stderr, "Found core %d already in use!\n", i);
1549#endif
1550                        } else {
1551                                phys_cores++;
1552                        }
1553                }
1554        }
1555        /* If we are restarting we have already allocated some threads as such
1556         * we add these back to the count for this calculation */
1557        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1558                dpdk_per_stream_t * stream = n->data;
1559                if (stream->lcore != -1)
1560                        phys_cores++;
1561        }
1562
1563        tot = MIN(libtrace->perpkt_thread_count,
1564                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1565        tot = MIN(tot, phys_cores);
1566
1567#if DEBUG
1568        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1569                libtrace->perpkt_thread_count, phys_cores);
1570#endif
1571
1572        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1573                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1574                free(libtrace->format_data);
1575                libtrace->format_data = NULL;
1576                return -1;
1577        }
1578
1579        /* Make sure we only start the number that we should */
1580        libtrace->perpkt_thread_count = tot;
1581        return 0;
1582}
1583
1584/**
1585 * Register a thread with the DPDK system,
1586 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1587 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1588 * gives it.
1589 *
1590 * We then allow a mapper thread to be started on every real core as DPDK would,
1591 * we also bind these to the corresponding CPU cores.
1592 *
1593 * @param libtrace A pointer to the trace
1594 * @param reading True if the thread will be used to read packets, i.e. will
1595 *                call pread_packet(), false if thread used to process packet
1596 *                in any other manner including statistics functions.
1597 */
1598static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1599{
1600#if DEBUG
1601        char name[99];
1602        name[0] = 0;
1603#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1604        pthread_getname_np(pthread_self(),
1605                           name, sizeof(name));
1606#endif
1607#endif
1608        if (reading) {
1609                dpdk_per_stream_t *stream;
1610                /* Attach our thread */
1611                if(t->type == THREAD_PERPKT) {
1612                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1613                        if (t->format_data == NULL) {
1614                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1615                                              "Too many threads registered");
1616                                return -1;
1617                        }
1618                } else {
1619                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1620                }
1621                stream = t->format_data;
1622#if DEBUG
1623                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1624#endif
1625                return dpdk_register_lcore(libtrace, true, stream->lcore);
1626        } else {
1627                int lcore = dpdk_reserve_lcore(reading, 0);
1628                if (lcore == -1) {
1629                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1630                                      " for DPDK");
1631                        return -1;
1632                }
1633#if DEBUG
1634                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1635#endif
1636                return dpdk_register_lcore(libtrace, false, lcore);
1637        }
1638
1639        return 0;
1640}
1641
1642/**
1643 * Unregister a thread with the DPDK system.
1644 *
1645 * Only previously registered threads should be calling this just before
1646 * they are destroyed.
1647 */
1648static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1649{
1650        struct rte_config *cfg = rte_eal_get_configuration();
1651
1652        assert(rte_lcore_id() < RTE_MAX_LCORE);
1653        pthread_mutex_lock(&dpdk_lock);
1654        /* Skip if master */
1655        if (rte_lcore_id() == rte_get_master_lcore()) {
1656                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1657                pthread_mutex_unlock(&dpdk_lock);
1658                return;
1659        }
1660
1661        /* Disable this core in global DPDK structs */
1662        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1663        cfg->lcore_count--;
1664        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1665        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1666        pthread_mutex_unlock(&dpdk_lock);
1667        return;
1668}
1669
1670static int dpdk_start_output(libtrace_out_t *libtrace)
1671{
1672        char err[500];
1673        err[0] = 0;
1674
1675        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1676                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1677                free(libtrace->format_data);
1678                libtrace->format_data = NULL;
1679                return -1;
1680        }
1681        return 0;
1682}
1683
1684static int dpdk_pause_input(libtrace_t * libtrace) {
1685        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1686        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1687        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1688#if DEBUG
1689                fprintf(stderr, "Pausing DPDK port\n");
1690#endif
1691                rte_eth_dev_stop(FORMAT(libtrace)->port);
1692                FORMAT(libtrace)->paused = DPDK_PAUSED;
1693                /* Empty the queue of packets */
1694                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1695                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1696                }
1697                FORMAT(libtrace)->burst_offset = 0;
1698                FORMAT(libtrace)->burst_size = 0;
1699
1700                for (; tmp != NULL; tmp = tmp->next) {
1701                        dpdk_per_stream_t *stream = tmp->data;
1702                        stream->ts_last_sys = 0;
1703#if HAS_HW_TIMESTAMPS_82580
1704                        stream->ts_first_sys = 0;
1705#endif
1706                }
1707
1708        }
1709        return 0;
1710}
1711
1712static int dpdk_write_packet(libtrace_out_t *trace,
1713                             libtrace_packet_t *packet){
1714        struct rte_mbuf* m_buff[1];
1715
1716        int wirelen = trace_get_wire_length(packet);
1717        int caplen = trace_get_capture_length(packet);
1718
1719        /* Check for a checksum and remove it */
1720        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1721            wirelen == caplen)
1722                caplen -= ETHER_CRC_LEN;
1723
1724        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1725        if (m_buff[0] == NULL) {
1726                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1727                return -1;
1728        } else {
1729                int ret;
1730                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1731                do {
1732                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1733                } while (ret != 1);
1734        }
1735
1736        return 0;
1737}
1738
1739static int dpdk_fin_input(libtrace_t * libtrace) {
1740        libtrace_list_node_t * n;
1741        /* Free our memory structures */
1742        if (libtrace->format_data != NULL) {
1743
1744                if (FORMAT(libtrace)->port != 0xFF)
1745                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1746                                                        RTE_ETH_EVENT_INTR_LSC,
1747                                                        dpdk_lsc_callback,
1748                                                        FORMAT(libtrace));
1749                /* Close the device completely, device cannot be restarted */
1750                rte_eth_dev_close(FORMAT(libtrace)->port);
1751
1752                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1753                                 FORMAT(libtrace)->nic_numa_node);
1754
1755                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1756                        dpdk_per_stream_t * stream = n->data;
1757                        if (stream->mempool)
1758                                dpdk_free_memory(stream->mempool,
1759                                                 rte_lcore_to_socket_id(stream->lcore));
1760                }
1761
1762                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1763                /* filter here if we used it */
1764                free(libtrace->format_data);
1765        }
1766
1767        return 0;
1768}
1769
1770
1771static int dpdk_fin_output(libtrace_out_t * libtrace) {
1772        /* Free our memory structures */
1773        if (libtrace->format_data != NULL) {
1774                /* Close the device completely, device cannot be restarted */
1775                if (FORMAT(libtrace)->port != 0xFF)
1776                        rte_eth_dev_close(FORMAT(libtrace)->port);
1777                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1778                /* filter here if we used it */
1779                free(libtrace->format_data);
1780        }
1781
1782        return 0;
1783}
1784
1785/**
1786 * Get the start of the additional header that we added to a packet.
1787 */
1788static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1789        assert(packet);
1790        assert(packet->buffer);
1791        /* Our header sits straight after the mbuf header */
1792        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1793}
1794
1795static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1796        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1797        return hdr->cap_len;
1798}
1799
1800static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1801        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1802        if (size > hdr->cap_len) {
1803                /* Cannot make a packet bigger */
1804                return trace_get_capture_length(packet);
1805        }
1806
1807        /* Reset the cached capture length first*/
1808        packet->capture_length = -1;
1809        hdr->cap_len = (uint32_t) size;
1810        return trace_get_capture_length(packet);
1811}
1812
1813static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1814        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1815        int org_cap_size; /* The original capture size */
1816        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1817                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1818                               sizeof(struct hw_timestamp_82580);
1819        } else {
1820                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1821        }
1822        if (hdr->flags & INCLUDES_CHECKSUM) {
1823                return org_cap_size;
1824        } else {
1825                /* DPDK packets are always TRACE_TYPE_ETH packets */
1826                return org_cap_size + ETHER_CRC_LEN;
1827        }
1828}
1829
1830static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1831        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1832        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1833                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1834                                sizeof(struct hw_timestamp_82580);
1835        else
1836                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1837}
1838
1839static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1840                               libtrace_packet_t *packet, void *buffer,
1841                               libtrace_rt_types_t rt_type, uint32_t flags) {
1842        assert(packet);
1843        if (packet->buffer != buffer &&
1844            packet->buf_control == TRACE_CTRL_PACKET) {
1845                free(packet->buffer);
1846        }
1847
1848        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1849                packet->buf_control = TRACE_CTRL_PACKET;
1850        else
1851                packet->buf_control = TRACE_CTRL_EXTERNAL;
1852
1853        packet->buffer = buffer;
1854        packet->header = buffer;
1855
1856        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1857        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1858        packet->type = rt_type;
1859        return 0;
1860}
1861
1862/**
1863 * Given a packet size and a link speed, computes the
1864 * time to transmit in nanoseconds.
1865 *
1866 * @param format_data The dpdk format data from which we get the link speed
1867 *        and if unset updates it in a thread safe manner
1868 * @param pkt_size The size of the packet in bytes
1869 * @return The wire time in nanoseconds
1870 */
1871static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1872        uint32_t wire_time;
1873        /* 20 extra bytes of interframe gap and preamble */
1874# if GET_MAC_CRC_CHECKSUM
1875        wire_time = ((pkt_size + 20) * 8000);
1876# else
1877        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1878# endif
1879
1880        /* Division is really slow and introduces a pipeline stall
1881         * The compiler will optimise this into magical multiplication and shifting
1882         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1883         */
1884retry_calc_wiretime:
1885        switch (format_data->link_speed) {
1886        case ETH_SPEED_NUM_40G:
1887                wire_time /=  ETH_SPEED_NUM_40G;
1888                break;
1889        case ETH_SPEED_NUM_20G:
1890                wire_time /= ETH_SPEED_NUM_20G;
1891                break;
1892        case ETH_SPEED_NUM_10G:
1893                wire_time /= ETH_SPEED_NUM_10G;
1894                break;
1895        case ETH_SPEED_NUM_1G:
1896                wire_time /= ETH_SPEED_NUM_1G;
1897                break;
1898        case 0:
1899                {
1900                /* Maybe the link was down originally, but now it should be up */
1901                struct rte_eth_link link = {0};
1902                rte_eth_link_get_nowait(format_data->port, &link);
1903                if (link.link_status && link.link_speed) {
1904                        format_data->link_speed = link.link_speed;
1905#ifdef DEBUG
1906                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1907#endif
1908                        goto retry_calc_wiretime;
1909                }
1910                /* We don't know the link speed, make sure numbers are counting up */
1911                wire_time = 1;
1912                break;
1913                }
1914        default:
1915                wire_time /= format_data->link_speed;
1916        }
1917        return wire_time;
1918}
1919
1920/**
1921 * Does any extra preperation to all captured packets
1922 * This includes adding our extra header to it with the timestamp,
1923 * and any snapping
1924 *
1925 * @param format_data The DPDK format data
1926 * @param plc The DPDK per lcore format data
1927 * @param pkts An array of size nb_pkts of DPDK packets
1928 */
1929static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1930                                   struct dpdk_per_stream_t *plc,
1931                                   struct rte_mbuf **pkts,
1932                                   size_t nb_pkts) {
1933        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1934        struct dpdk_addt_hdr *hdr;
1935        size_t i;
1936        uint64_t cur_sys_time_ns;
1937#if HAS_HW_TIMESTAMPS_82580
1938        struct hw_timestamp_82580 *hw_ts;
1939        uint64_t estimated_wraps;
1940#else
1941
1942#endif
1943
1944#if USE_CLOCK_GETTIME
1945        struct timespec cur_sys_time = {0};
1946        /* This looks terrible and I feel bad doing it. But it's OK
1947         * on new kernels, because this is a fast vsyscall */
1948        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1949        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1950#else
1951        struct timeval cur_sys_time = {0};
1952        /* Also a fast vsyscall */
1953        gettimeofday(&cur_sys_time, NULL);
1954        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1955#endif
1956
1957        /* The system clock is not perfect so when running
1958         * at linerate we could timestamp a packet in the past.
1959         * To avoid this we munge the timestamp to appear 1ns
1960         * after the previous packet. We should eventually catch up
1961         * to system time since a 64byte packet on a 10G link takes 67ns.
1962         *
1963         * Note with parallel readers timestamping packets
1964         * with duplicate stamps or out of order is unavoidable without
1965         * hardware timestamping from the NIC.
1966         */
1967#if !HAS_HW_TIMESTAMPS_82580
1968        if (plc->ts_last_sys >= cur_sys_time_ns) {
1969                cur_sys_time_ns = plc->ts_last_sys + 1;
1970        }
1971#endif
1972
1973        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1974        for (i = 0 ; i < nb_pkts ; ++i) {
1975
1976                /* We put our header straight after the dpdk header */
1977                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1978                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1979
1980#if GET_MAC_CRC_CHECKSUM
1981                /* Add back in the CRC sum */
1982                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1983                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1984                hdr->flags |= INCLUDES_CHECKSUM;
1985#endif
1986
1987                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1988
1989#if HAS_HW_TIMESTAMPS_82580
1990                /* The timestamp is sitting before our packet and is included in pkt_len */
1991                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1992                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1993                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1994
1995                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1996                 *
1997                 *        +----------+---+   +--------------+
1998                 *  82580 |    24    | 8 |   |      32      |
1999                 *        +----------+---+   +--------------+
2000                 *          reserved  \______ 40 bits _____/
2001                 *
2002                 * The 40 bit 82580 SYSTIM overflows every
2003                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
2004                 *
2005                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
2006                 * Endian (for the full 64 bits) i.e. picture is mirrored
2007                 */
2008
2009                /* Despite what the documentation says this is in Little
2010                 * Endian byteorder. Mask the reserved section out.
2011                 */
2012                hdr->timestamp = le64toh(hw_ts->timestamp) &
2013                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
2014
2015                if (unlikely(plc->ts_first_sys == 0)) {
2016                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
2017                        plc->ts_last_sys = plc->ts_first_sys;
2018                }
2019
2020                /* This will have serious problems if packets aren't read quickly
2021                 * that is within a couple of seconds because our clock cycles every
2022                 * 18 seconds */
2023                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
2024                                  / (1ull<<TS_NBITS_82580);
2025
2026                /* Estimated_wraps gives the number of times the counter should have
2027                 * wrapped (however depending on value last time it could have wrapped
2028                 * twice more (if hw clock is close to its max value) or once less (allowing
2029                 * for a bit of variance between hw and sys clock). But if the clock
2030                 * shouldn't have wrapped once then don't allow it to go backwards in time */
2031                if (unlikely(estimated_wraps >= 2)) {
2032                        /* 2 or more wrap arounds add all but the very last wrap */
2033                        plc->wrap_count += estimated_wraps - 1;
2034                }
2035
2036                /* Set the timestamp to the lowest possible value we're considering */
2037                hdr->timestamp += plc->ts_first_sys +
2038                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
2039
2040                /* In most runs only the first if() will need evaluating - i.e our
2041                 * estimate is correct. */
2042                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
2043                                              hdr->timestamp, MAXSKEW_82580))) {
2044                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
2045                        plc->wrap_count++;
2046                        hdr->timestamp += (1ull<<TS_NBITS_82580);
2047                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
2048                                             hdr->timestamp, MAXSKEW_82580)) {
2049                                /* Failed to match estimated_wraps */
2050                                plc->wrap_count++;
2051                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2052                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2053                                                     hdr->timestamp, MAXSKEW_82580)) {
2054                                        if (estimated_wraps == 0) {
2055                                                /* 0 case Failed to match estimated_wraps+2 */
2056                                                printf("WARNING - Hardware Timestamp failed to"
2057                                                       " match using systemtime!\n");
2058                                                hdr->timestamp = cur_sys_time_ns;
2059                                        } else {
2060                                                /* Failed to match estimated_wraps+1 */
2061                                                plc->wrap_count++;
2062                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2063                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2064                                                                     hdr->timestamp, MAXSKEW_82580)) {
2065                                                        /* Failed to match estimated_wraps+2 */
2066                                                        printf("WARNING - Hardware Timestamp failed to"
2067                                                               " match using systemtime!!\n");
2068                                                }
2069                                        }
2070                                }
2071                        }
2072                }
2073#else
2074
2075                hdr->timestamp = cur_sys_time_ns;
2076                /* Offset the next packet by the wire time of previous */
2077                calculate_wire_time(format_data, hdr->cap_len);
2078
2079#endif
2080        }
2081
2082        plc->ts_last_sys = cur_sys_time_ns;
2083        return;
2084}
2085
2086
2087static void dpdk_fin_packet(libtrace_packet_t *packet)
2088{
2089        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2090                rte_pktmbuf_free(packet->buffer);
2091                packet->buffer = NULL;
2092        }
2093}
2094
2095/** Reads at least one packet or returns an error
2096 */
2097static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
2098                                           dpdk_per_stream_t *stream,
2099                                           libtrace_message_queue_t *mesg,
2100                                           struct rte_mbuf* pkts_burst[],
2101                                           size_t nb_packets) {
2102        size_t nb_rx; /* Number of rx packets we've recevied */
2103        while (1) {
2104                /* Poll for a batch of packets */
2105                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2106                                         stream->queue_id, pkts_burst, nb_packets);
2107                if (nb_rx > 0) {
2108                        /* Got some packets - otherwise we keep spining */
2109                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
2110                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2111                        return nb_rx;
2112                }
2113                /* Check the message queue this could be less than 0 */
2114                if (mesg && libtrace_message_queue_count(mesg) > 0)
2115                        return READ_MESSAGE;
2116                if ((nb_rx=is_halted(libtrace)) != -1)
2117                        return nb_rx;
2118                /* Wait a while, polling on memory degrades performance
2119                 * This relieves the pressure on memory allowing the NIC to DMA */
2120                rte_delay_us(10);
2121        }
2122
2123        /* We'll never get here - but if we did it would be bad */
2124        return READ_ERROR;
2125}
2126
2127static int dpdk_pread_packets (libtrace_t *libtrace,
2128                                    libtrace_thread_t *t,
2129                                    libtrace_packet_t **packets,
2130                                    size_t nb_packets) {
2131        int nb_rx; /* Number of rx packets we've recevied */
2132        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2133        int i;
2134        dpdk_per_stream_t *stream = t->format_data;
2135
2136        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
2137                                         pkts_burst, nb_packets);
2138
2139        if (nb_rx > 0) {
2140                for (i = 0; i < nb_rx; ++i) {
2141                        if (packets[i]->buffer != NULL) {
2142                                /* The packet should always be finished */
2143                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
2144                                free(packets[i]->buffer);
2145                        }
2146                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2147                        packets[i]->type = TRACE_RT_DATA_DPDK;
2148                        packets[i]->buffer = pkts_burst[i];
2149                        packets[i]->trace = libtrace;
2150                        packets[i]->error = 1;
2151                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2152                }
2153        }
2154
2155        return nb_rx;
2156}
2157
2158static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2159        int nb_rx; /* Number of rx packets we've received */
2160        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2161
2162        /* Free the last packet buffer */
2163        if (packet->buffer != NULL) {
2164                /* The packet should always be finished */
2165                assert(packet->buf_control == TRACE_CTRL_PACKET);
2166                free(packet->buffer);
2167                packet->buffer = NULL;
2168        }
2169
2170        packet->buf_control = TRACE_CTRL_EXTERNAL;
2171        packet->type = TRACE_RT_DATA_DPDK;
2172
2173        /* Check if we already have some packets buffered */
2174        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2175                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2176                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2177                return 1; // TODO should be bytes read, which essentially useless anyway
2178        }
2179
2180        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2181                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2182
2183        if (nb_rx > 0) {
2184                FORMAT(libtrace)->burst_size = nb_rx;
2185                FORMAT(libtrace)->burst_offset = 1;
2186                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2187                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2188                return 1;
2189        }
2190        return nb_rx;
2191}
2192
2193static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2194        struct timeval tv;
2195        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2196
2197        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2198        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2199        return tv;
2200}
2201
2202static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2203        struct timespec ts;
2204        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2205
2206        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2207        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2208        return ts;
2209}
2210
2211static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2212        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2213}
2214
2215static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2216        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2217        return (libtrace_direction_t) hdr->direction;
2218}
2219
2220static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2221        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2222        hdr->direction = (uint8_t) direction;
2223        return (libtrace_direction_t) hdr->direction;
2224}
2225
2226static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2227        struct rte_eth_stats dev_stats = {0};
2228
2229        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2230                return;
2231
2232        /* Grab the current stats */
2233        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2234
2235        stats->captured_valid = true;
2236        stats->captured = dev_stats.ipackets;
2237
2238        stats->dropped_valid = true;
2239        stats->dropped = dev_stats.imissed;
2240
2241#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2242        /* DPDK commit 86057c fixes ensures missed does not get counted as
2243         * errors */
2244        stats->errors_valid = true;
2245        stats->errors = dev_stats.ierrors;
2246#else
2247        /* DPDK errors includes drops */
2248        stats->errors_valid = true;
2249        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2250#endif
2251        stats->received_valid = true;
2252        stats->received = dev_stats.ipackets + dev_stats.imissed;
2253
2254}
2255
2256/* Attempts to read a packet in a non-blocking fashion. If one is not
2257 * available a SLEEP event is returned. We do not have the ability to
2258 * create a select()able file descriptor in DPDK.
2259 */
2260static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2261                                            libtrace_packet_t *packet) {
2262        libtrace_eventobj_t event = {0,0,0.0,0};
2263        size_t nb_rx; /* Number of received packets we've read */
2264
2265        do {
2266
2267                /* No packets waiting in our buffer? Try and read some more */
2268                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
2269                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2270                                                 FORMAT_DATA_FIRST(trace)->queue_id,
2271                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
2272                        if (nb_rx > 0) {
2273                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
2274                                                FORMAT(trace)->burst_pkts, nb_rx);
2275                                FORMAT(trace)->burst_size = nb_rx;
2276                                FORMAT(trace)->burst_offset = 0;
2277                        }
2278                }
2279
2280                /* Now do we have packets waiting? */
2281                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
2282                        /* Free the last packet buffer */
2283                        if (packet->buffer != NULL) {
2284                                /* The packet should always be finished */
2285                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2286                                free(packet->buffer);
2287                                packet->buffer = NULL;
2288                        }
2289
2290                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2291                        packet->type = TRACE_RT_DATA_DPDK;
2292                        event.type = TRACE_EVENT_PACKET;
2293                        packet->buffer = FORMAT(trace)->burst_pkts[
2294                                             FORMAT(trace)->burst_offset++];
2295                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2296                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2297
2298                        /* XXX - Check this passes the filter trace_read_packet normally
2299                         * does this for us but this wont */
2300                        if (trace->filter) {
2301                                if (!trace_apply_filter(trace->filter, packet)) {
2302                                        /* Failed the filter so we loop for another packet */
2303                                        trace->filtered_packets ++;
2304                                        continue;
2305                                }
2306                        }
2307                        trace->accepted_packets ++;
2308                } else {
2309                        /* We only want to sleep for a very short time - we are non-blocking */
2310                        event.type = TRACE_EVENT_SLEEP;
2311                        event.seconds = 0.0001;
2312                        event.size = 0;
2313                }
2314
2315                /* If we get here we have our event */
2316                break;
2317        } while (1);
2318
2319        return event;
2320}
2321
2322static void dpdk_help(void) {
2323        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
2324        printf("Supported input URIs:\n");
2325        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2326        printf("\tThe -<coreid> is optional \n");
2327        printf("\t e.g. dpdk:0000:01:00.1\n");
2328        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2329        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2330        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2331        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2332        printf("\n");
2333        printf("Supported output URIs:\n");
2334        printf("\tSame format as the input URI.\n");
2335        printf("\t e.g. dpdk:0000:01:00.1\n");
2336        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2337        printf("\n");
2338}
2339
2340static struct libtrace_format_t dpdk = {
2341        "dpdk",
2342        "$Id$",
2343        TRACE_FORMAT_DPDK,
2344        NULL,                               /* probe filename */
2345        NULL,                               /* probe magic */
2346        dpdk_init_input,                    /* init_input */
2347        dpdk_config_input,                  /* config_input */
2348        dpdk_start_input,                   /* start_input */
2349        dpdk_pause_input,                   /* pause_input */
2350        dpdk_init_output,                   /* init_output */
2351        NULL,                               /* config_output */
2352        dpdk_start_output,                  /* start_ouput */
2353        dpdk_fin_input,                     /* fin_input */
2354        dpdk_fin_output,                    /* fin_output */
2355        dpdk_read_packet,                   /* read_packet */
2356        dpdk_prepare_packet,                /* prepare_packet */
2357        dpdk_fin_packet,                    /* fin_packet */
2358        dpdk_write_packet,                  /* write_packet */
2359        dpdk_get_link_type,                 /* get_link_type */
2360        dpdk_get_direction,                 /* get_direction */
2361        dpdk_set_direction,                 /* set_direction */
2362        NULL,                               /* get_erf_timestamp */
2363        dpdk_get_timeval,                   /* get_timeval */
2364        dpdk_get_timespec,                  /* get_timespec */
2365        NULL,                               /* get_seconds */
2366        NULL,                               /* seek_erf */
2367        NULL,                               /* seek_timeval */
2368        NULL,                               /* seek_seconds */
2369        dpdk_get_capture_length,            /* get_capture_length */
2370        dpdk_get_wire_length,               /* get_wire_length */
2371        dpdk_get_framing_length,            /* get_framing_length */
2372        dpdk_set_capture_length,            /* set_capture_length */
2373        NULL,                               /* get_received_packets */
2374        NULL,                               /* get_filtered_packets */
2375        NULL,                               /* get_dropped_packets */
2376        dpdk_get_stats,                     /* get_statistics */
2377        NULL,                               /* get_fd */
2378        dpdk_trace_event,                   /* trace_event */
2379        dpdk_help,                          /* help */
2380        NULL,                               /* next pointer */
2381        {true, 8},                          /* Live, NICs typically have 8 threads */
2382        dpdk_pstart_input,                  /* pstart_input */
2383        dpdk_pread_packets,                 /* pread_packets */
2384        dpdk_pause_input,                   /* ppause */
2385        dpdk_fin_input,                     /* p_fin */
2386        dpdk_pregister_thread,              /* pregister_thread */
2387        dpdk_punregister_thread,            /* punregister_thread */
2388        NULL                                /* get thread stats */
2389};
2390
2391void dpdk_constructor(void) {
2392        register_format(&dpdk);
2393}
Note: See TracBrowser for help on using the repository browser.