source: lib/format_dpdk.c @ b148e3b

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since b148e3b was b148e3b, checked in by Richard Sanger <rsanger@…>, 4 years ago

Updates DPDK to latest release and improves performance

Thanks to Richard Cziva for supplying an intial patch for this.

We now recommend using the latest release of DPDK, ideally 16.04 or newer

To support newer releases

  • Fixes RSS hashing renames
  • Fixes deprecated rte_mempool_count
  • Fixes ETH_LINK_SPEED_X rename
  • Fixes TX minimum memory requirement
  • Fixes dropped vs errored counting in recent versions (for best results use 16.04 or newer)

Tuned to allow DPDK's SSE vector in supporting drivers mode for better performance.
Bumps default internal batch size up to 32 to matches DPDK in SSE vector mode.

  • Property mode set to 100644
File size: 75.7 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 *
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44
45#ifdef HAVE_INTTYPES_H
46#  include <inttypes.h>
47#else
48# error "Can't find inttypes.h"
49#endif
50
51#include <stdlib.h>
52#include <assert.h>
53#include <unistd.h>
54#include <endian.h>
55#include <string.h>
56
57#if HAVE_LIBNUMA
58#include <numa.h>
59#endif
60
61/* We can deal with any minor differences by checking the RTE VERSION
62 * Typically DPDK backports some fixes (typically for building against
63 * newer kernels) to the older version of DPDK.
64 *
65 * These get released with the rX suffix. The following macros where added
66 * in these new releases.
67 *
68 * Below this is a log of version that required changes to the libtrace
69 * code (that we still attempt to support).
70 *
71 * DPDK 16.04 or newer is recommended.
72 * However 1.6 and newer are still likely supported.
73 */
74#include <rte_eal.h>
75#include <rte_version.h>
76#ifndef RTE_VERSION_NUM
77#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
78#endif
79#ifndef RTE_VER_PATCH_RELEASE
80#       define RTE_VER_PATCH_RELEASE 0
81#endif
82#ifndef RTE_VERSION
83#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
84        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
85#endif
86
87/* 1.6.0r2 :
88 *      rte_eal_pci_set_blacklist() is removed
89 *      device_list is renamed to pci_device_list
90 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
91 *      as such we do apply the whitelist before rte_eal_init.
92 *      This also works correctly with DPDK 1.6.0r2.
93 *
94 * Replaced by:
95 *      rte_devargs (we can simply whitelist)
96 */
97#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
98#       define DPDK_USE_BLACKLIST 1
99#else
100#       define DPDK_USE_BLACKLIST 0
101#endif
102
103/*
104 * 1.7.0 :
105 *      rte_pmd_init_all is removed
106 *
107 * Replaced by:
108 *      Nothing, no longer needed
109 */
110#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
111#       define DPDK_USE_PMD_INIT 1
112#else
113#       define DPDK_USE_PMD_INIT 0
114#endif
115
116/* 1.7.0-rc3 :
117 *
118 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
119 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
120 * it twice.
121 */
122#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
123#       define DPDK_USE_PCI_PROBE 1
124#else
125#       define DPDK_USE_PCI_PROBE 0
126#endif
127
128/* 1.8.0-rc1 :
129 * LOG LEVEL is a command line option which overrides what
130 * we previously set it to.
131 */
132#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
133#       define DPDK_USE_LOG_LEVEL 1
134#else
135#       define DPDK_USE_LOG_LEVEL 0
136#endif
137
138/* 1.8.0-rc2
139 * rx/tx_conf thresholds can be set to NULL in rte_eth_rx/tx_queue_setup
140 * this uses the default values, which are better tuned per device
141 * See issue #26
142 */
143#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 2)
144#       define DPDK_USE_NULL_QUEUE_CONFIG 1
145#else
146#       define DPDK_USE_NULL_QUEUE_CONFIG 0
147#endif
148
149/* 2.0.0-rc1
150 * Unifies RSS hash between cards
151 */
152#if RTE_VERSION >= RTE_VERSION_NUM(2, 0, 0, 1)
153#       define RX_RSS_FLAGS (ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | \
154                             ETH_RSS_SCTP)
155#else
156#       define RX_RSS_FLAGS (ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | \
157                             ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP |\
158                             ETH_RSS_IPV6_UDP)
159#endif
160
161/* v16.07-rc1 - deprecated
162 * rte_mempool_avail_count to replace rte_mempool_count
163 * rte_mempool_in_use_count to replace rte_mempool_free_count
164 */
165#if RTE_VERSION < RTE_VERSION_NUM(16, 7, 0, 1)
166#define rte_mempool_avail_count rte_mempool_count
167#define rte_mempool_in_use_count rte_mempool_free_count
168#endif
169
170#include <rte_per_lcore.h>
171#include <rte_debug.h>
172#include <rte_errno.h>
173#include <rte_common.h>
174#include <rte_log.h>
175#include <rte_memcpy.h>
176#include <rte_prefetch.h>
177#include <rte_branch_prediction.h>
178#include <rte_pci.h>
179#include <rte_ether.h>
180#include <rte_ethdev.h>
181#include <rte_ring.h>
182#include <rte_mempool.h>
183#include <rte_mbuf.h>
184#include <rte_launch.h>
185#include <rte_lcore.h>
186#include <rte_per_lcore.h>
187#include <rte_cycles.h>
188#include <pthread.h>
189#ifdef __FreeBSD__
190#include <pthread_np.h>
191#endif
192
193/* 16.04-rc3 ETH_LINK_SPEED_X are replaced with ETH_SPEED_NUM_X.
194 * ETH_LINK_SPEED_ are reused as flags, ugly.
195 * We use the new way in this code.
196 */
197#ifndef ETH_SPEED_NUM_1G
198        #define ETH_SPEED_NUM_1G ETH_LINK_SPEED_1000
199        #define ETH_SPEED_NUM_10G ETH_LINK_SPEED_10G
200        #define ETH_SPEED_NUM_20G ETH_LINK_SPEED_20G
201        #define ETH_SPEED_NUM_40G ETH_LINK_SPEED_40G
202#endif
203
204/* The default size of memory buffers to use - This is the max size of standard
205 * ethernet packet less the size of the MAC CHECKSUM */
206#define RX_MBUF_SIZE 1514
207
208/* The minimum number of memory buffers per queue tx or rx. Based on
209 * the requirement of the memory pool with 128 per thread buffers, needing
210 * at least 128*1.5 = 192 buffers. Our code allocates 128*2 to be safe.
211 */
212#define MIN_NB_BUF 128
213
214/* Number of receive memory buffers to use
215 * By default this is limited by driver to 4k and must be a multiple of 128.
216 * A modification can be made to the driver to remove this limit.
217 * This can be increased in the driver and here.
218 * Should be at least MIN_NB_BUF.
219 * We choose 2K rather than 4K because it enables the usage of sse vector
220 * drivers which are significantly faster than using the larger buffer.
221 */
222#define NB_RX_MBUF (4096/2)
223
224/* Number of send memory buffers to use.
225 * Same limits apply as those to NB_TX_MBUF.
226 */
227#define NB_TX_MBUF 1024
228
229/* The size of the PCI blacklist needs to be big enough to contain
230 * every PCI device address (listed by lspci every bus:device.function tuple).
231 */
232#define BLACK_LIST_SIZE 50
233
234/* The maximum number of characters the mempool name can be */
235#define MEMPOOL_NAME_LEN 20
236
237/* For single threaded libtrace we read packets as a batch/burst
238 * this is the maximum size of said burst */
239#define BURST_SIZE 50
240
241#define MBUF(x) ((struct rte_mbuf *) x)
242/* Get the original placement of the packet data */
243#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
244#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
245#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
246
247#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
248#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
249
250#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
251                        (uint64_t) tv.tv_usec*1000ull)
252#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
253                        (uint64_t) ts.tv_nsec)
254
255#if RTE_PKTMBUF_HEADROOM != 128
256#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
257         "any libtrace instance processing these packet must be have the" \
258         "same RTE_PKTMBUF_HEADROOM set"
259#endif
260
261/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
262 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
263 *
264 * Make sure you understand what these are doing before enabling them.
265 * They might make traces incompatible with other builds etc.
266 *
267 * These are also included to show how to do somethings which aren't
268 * obvious in the DPDK documentation.
269 */
270
271/* Print verbose messages to stderr */
272#define DEBUG 1
273
274/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
275 * only turn on if you know clock_gettime is a vsyscall on your system
276 * otherwise could be a large overhead. Again gettimeofday() should be
277 * vsyscall also if it's not you should seriously consider updating your
278 * kernel.
279 */
280#ifdef HAVE_CLOCK_GETTIME
281/* You can turn this on (set to 1) to prefer clock_gettime */
282#define USE_CLOCK_GETTIME 1
283#else
284/* DON'T CHANGE THIS !!! */
285#define USE_CLOCK_GETTIME 0
286#endif
287
288/* This is fairly safe to turn on - currently there appears to be a 'bug'
289 * in DPDK that will remove the checksum by making the packet appear 4bytes
290 * smaller than what it really is. Most formats don't include the checksum
291 * hence writing out a port such as int: ring: and dpdk: assumes there
292 * is no checksum and will attempt to write the checksum as part of the
293 * packet
294 */
295#define GET_MAC_CRC_CHECKSUM 0
296
297/* This requires a modification of the pmd drivers (inside Intel DPDK)
298 * TODO this requires updating (packet sizes are wrong TS most likely also)
299 */
300#define HAS_HW_TIMESTAMPS_82580 0
301
302#if HAS_HW_TIMESTAMPS_82580
303# define TS_NBITS_82580     40
304/* The maximum on the +ve or -ve side that we can be, make it half way */
305# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
306#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
307#endif
308
309static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
310/* Memory pools Per NUMA node */
311static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
312
313/* As per Intel 82580 specification - mismatch in 82580 datasheet
314 * it states ts is stored in Big Endian, however its actually Little */
315struct hw_timestamp_82580 {
316        uint64_t reserved;
317        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
318};
319
320enum paused_state {
321        DPDK_NEVER_STARTED,
322        DPDK_RUNNING,
323        DPDK_PAUSED,
324};
325
326struct dpdk_per_stream_t
327{
328        uint16_t queue_id;
329        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
330        struct rte_mempool *mempool;
331        int lcore;
332#if HAS_HW_TIMESTAMPS_82580
333        /* Timestamping only relevant to RX */
334        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
335        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
336#endif
337} ALIGN_STRUCT(CACHE_LINE_SIZE);
338
339#if HAS_HW_TIMESTAMPS_82580
340#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
341#else
342#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
343#endif
344
345typedef struct dpdk_per_stream_t dpdk_per_stream_t;
346
347/* Used by both input and output however some fields are not used
348 * for output */
349struct dpdk_format_data_t {
350        int8_t promisc; /* promiscuous mode - RX only */
351        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
352        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
353        uint8_t paused; /* See paused_state */
354        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
355        int snaplen; /* The snap length for the capture - RX only */
356        /* We always have to setup both rx and tx queues even if we don't want them */
357        int nb_rx_buf; /* The number of packet buffers in the rx ring */
358        int nb_tx_buf; /* The number of packet buffers in the tx ring */
359        int nic_numa_node; /* The NUMA node that the NIC is attached to */
360        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
361#if DPDK_USE_BLACKLIST
362        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
363        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
364#endif
365        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
366        uint8_t rss_key[40]; // This is the RSS KEY
367        /* To improve single-threaded performance we always batch reading
368         * packets, in a burst, otherwise the parallel library does this for us */
369        struct rte_mbuf* burst_pkts[BURST_SIZE];
370        int burst_size; /* The total number read in the burst */
371        int burst_offset; /* The offset we are into the burst */
372
373        /* Our parallel streams */
374        libtrace_list_t *per_stream;
375};
376
377enum dpdk_addt_hdr_flags {
378        INCLUDES_CHECKSUM = 0x1,
379        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
380};
381
382/**
383 * A structure placed in front of the packet where we can store
384 * additional information about the given packet.
385 * +--------------------------+
386 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
387 * +--------------------------+
388 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
389 * +--------------------------+
390 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
391 * +--------------------------+
392 * *   hw_timestamp_82580     * 16 bytes Optional
393 * +--------------------------+
394 * |       Packet data        | Variable Size
395 * |                          |
396 */
397struct dpdk_addt_hdr {
398        uint64_t timestamp;
399        uint8_t flags;
400        uint8_t direction;
401        uint8_t reserved1;
402        uint8_t reserved2;
403        uint32_t cap_len; /* The size to say the capture is */
404};
405
406/**
407 * We want to blacklist all devices except those on the whitelist
408 * (I say list, but yes it is only the one).
409 *
410 * The default behaviour of rte_pci_probe() will map every possible device
411 * to its DPDK driver. The DPDK driver will take the ethernet device
412 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
413 *
414 * So blacklist all devices except the one that we wish to use so that
415 * the others can still be used as standard ethernet ports.
416 *
417 * @return 0 if successful, otherwise -1 on error.
418 */
419#if DPDK_USE_BLACKLIST
420static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
421{
422        struct rte_pci_device *dev = NULL;
423        format_data->nb_blacklist = 0;
424
425        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
426
427        TAILQ_FOREACH(dev, &device_list, next) {
428        if (whitelist != NULL && whitelist->domain == dev->addr.domain
429            && whitelist->bus == dev->addr.bus
430            && whitelist->devid == dev->addr.devid
431            && whitelist->function == dev->addr.function)
432            continue;
433                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
434                                / sizeof (format_data->blacklist[0])) {
435                        fprintf(stderr, "Warning: too many devices to blacklist consider"
436                                        " increasing BLACK_LIST_SIZE");
437                        break;
438                }
439                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
440                ++format_data->nb_blacklist;
441        }
442
443        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
444        return 0;
445}
446#else /* DPDK_USE_BLACKLIST */
447#include <rte_devargs.h>
448static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
449{
450        char pci_str[20] = {0};
451        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
452                 whitelist->domain,
453                 whitelist->bus,
454                 whitelist->devid,
455                 whitelist->function);
456        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
457                return -1;
458        }
459        return 0;
460}
461#endif
462
463/**
464 * Parse the URI format as a pci address
465 * Fills in addr, note core is optional and is unchanged if
466 * a value for it is not provided.
467 *
468 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
469 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
470 */
471static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
472        int matches;
473        assert(str);
474        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
475                         &addr->domain, &addr->bus, &addr->devid,
476                         &addr->function, core);
477        if (matches >= 4) {
478                return 0;
479        } else {
480                return -1;
481        }
482}
483
484/**
485 * Convert a pci address to the numa node it is
486 * connected to.
487 *
488 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
489 * so we can call it before DPDK
490 *
491 * @return -1 if unknown otherwise a number 0 or higher of the numa node
492 */
493static int pci_to_numa(struct rte_pci_addr * dev_addr) {
494        char path[50] = {0};
495        FILE *file;
496
497        /* Read from the system */
498        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
499                 dev_addr->domain,
500                 dev_addr->bus,
501                 dev_addr->devid,
502                 dev_addr->function);
503
504        if((file = fopen(path, "r")) != NULL) {
505                int numa_node = -1;
506                fscanf(file, "%d", &numa_node);
507                fclose(file);
508                return numa_node;
509        }
510        return -1;
511}
512
513#if DEBUG
514/* For debugging */
515static inline void dump_configuration()
516{
517        struct rte_config * global_config;
518        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
519
520        if (nb_cpu <= 0) {
521                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
522                       " Falling back to the first core.");
523                nb_cpu = 1; /* fallback to just 1 core */
524        }
525        if (nb_cpu > RTE_MAX_LCORE)
526                nb_cpu = RTE_MAX_LCORE;
527
528        global_config = rte_eal_get_configuration();
529
530        if (global_config != NULL) {
531                int i;
532                fprintf(stderr, "Intel DPDK setup\n"
533                        "---Version      : %s\n"
534                        "---Master LCore : %"PRIu32"\n"
535                        "---LCore Count  : %"PRIu32"\n",
536                        rte_version(),
537                        global_config->master_lcore, global_config->lcore_count);
538
539                for (i = 0 ; i < nb_cpu; i++) {
540                        fprintf(stderr, "   ---Core %d : %s\n", i,
541                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
542                }
543
544                const char * proc_type;
545                switch (global_config->process_type) {
546                case RTE_PROC_AUTO:
547                        proc_type = "auto";
548                        break;
549                case RTE_PROC_PRIMARY:
550                        proc_type = "primary";
551                        break;
552                case RTE_PROC_SECONDARY:
553                        proc_type = "secondary";
554                        break;
555                case RTE_PROC_INVALID:
556                        proc_type = "invalid";
557                        break;
558                default:
559                        proc_type = "something worse than invalid!!";
560                }
561                fprintf(stderr, "---Process Type : %s\n", proc_type);
562        }
563
564}
565#endif
566
567/**
568 * Expects to be called from the master lcore and moves it to the given dpdk id
569 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
570 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
571 *               and not already in use.
572 * @return 0 is successful otherwise -1 on error.
573 */
574static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
575        struct rte_config *cfg = rte_eal_get_configuration();
576        cpu_set_t cpuset;
577        int i;
578
579        assert (core < RTE_MAX_LCORE);
580        assert (rte_get_master_lcore() == rte_lcore_id());
581
582        if (core == rte_lcore_id())
583                return 0;
584
585        /* Make sure we are not overwriting someone else */
586        assert(!rte_lcore_is_enabled(core));
587
588        /* Move the core */
589        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
590        cfg->lcore_role[core] = ROLE_RTE;
591        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
592        rte_eal_get_configuration()->master_lcore = core;
593        RTE_PER_LCORE(_lcore_id) = core;
594
595        /* Now change the affinity, either mapped to a single core or all accepted */
596        CPU_ZERO(&cpuset);
597
598        if (lcore_config[core].detected) {
599                CPU_SET(core, &cpuset);
600        } else {
601                for (i = 0; i < RTE_MAX_LCORE; ++i) {
602                        if (lcore_config[i].detected)
603                                CPU_SET(i, &cpuset);
604                }
605        }
606
607        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
608        if (i != 0) {
609                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
610                return -1;
611        }
612        return 0;
613}
614
615/**
616 * XXX This is very bad XXX
617 * But we have to do something to allow getopts nesting
618 * Luckly normally the format is last so it doesn't matter
619 * DPDK only supports modern systems so hopefully this
620 * will continue to work
621 */
622struct saved_getopts {
623        char *optarg;
624        int optind;
625        int opterr;
626        int optopt;
627};
628
629static void save_getopts(struct saved_getopts *opts) {
630        opts->optarg = optarg;
631        opts->optind = optind;
632        opts->opterr = opterr;
633        opts->optopt = optopt;
634}
635
636static void restore_getopts(struct saved_getopts *opts) {
637        optarg = opts->optarg;
638        optind = opts->optind;
639        opterr = opts->opterr;
640        optopt = opts->optopt;
641}
642
643static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
644                                        char * err, int errlen) {
645        int ret; /* Returned error codes */
646        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
647        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
648        char mem_map[20] = {0}; /* The memory name */
649        long nb_cpu; /* The number of CPUs in the system */
650        long my_cpu; /* The CPU number we want to bind to */
651        int i;
652        struct rte_config *cfg = rte_eal_get_configuration();
653        struct saved_getopts save_opts;
654
655        /* This initialises the Environment Abstraction Layer (EAL)
656         * If we had slave workers these are put into WAITING state
657         *
658         * Basically binds this thread to a fixed core, which we choose as
659         * the last core on the machine (assuming fewer interrupts mapped here).
660         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
661         * "-n" the number of memory channels into the CPU (hardware specific)
662         *      - Most likely to be half the number of ram slots in your machine.
663         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
664         * Controls where in memory packets are stored such that they are spread
665         * across the channels. We just use 1 to be safe.
666         *
667         * Using unique file prefixes mean separate memory is used, unlinking
668         * the two processes. However be careful we still cannot access a
669         * port that already in use.
670         */
671        char* argv[] = {"libtrace",
672                        "-c", cpu_number,
673                        "-n", "1",
674                        "--proc-type", "auto",
675                        "--file-prefix", mem_map,
676                        "-m", "512",
677#if DPDK_USE_LOG_LEVEL
678#       if DEBUG
679                        "--log-level", "8", /* RTE_LOG_DEBUG */
680#       else
681                        "--log-level", "5", /* RTE_LOG_WARNING */
682#       endif
683#endif
684                        NULL};
685        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
686
687#if DEBUG
688        rte_set_log_level(RTE_LOG_DEBUG);
689#else
690        rte_set_log_level(RTE_LOG_WARNING);
691#endif
692
693        /* Get the number of cpu cores in the system and use the last core
694         * on the correct numa node */
695        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
696        if (nb_cpu <= 0) {
697                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
698                       " Falling back to the first core.");
699                nb_cpu = 1; /* fallback to the first core */
700        }
701        if (nb_cpu > RTE_MAX_LCORE)
702                nb_cpu = RTE_MAX_LCORE;
703
704        my_cpu = -1;
705        /* This allows the user to specify the core - we would try to do this
706         * automatically but it's hard to tell that this is secondary
707         * before running rte_eal_init(...). Currently we are limited to 1
708         * instance per core due to the way memory is allocated. */
709        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
710                snprintf(err, errlen, "Failed to parse URI");
711                return -1;
712        }
713
714#if HAVE_LIBNUMA
715        format_data->nic_numa_node = pci_to_numa(&use_addr);
716        if (my_cpu < 0) {
717#if DEBUG
718                /* If we can assign to a core on the same numa node */
719                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
720#endif
721                if(format_data->nic_numa_node >= 0) {
722                        int max_node_cpu = -1;
723                        struct bitmask *mask = numa_allocate_cpumask();
724                        assert(mask);
725                        numa_node_to_cpus(format_data->nic_numa_node, mask);
726                        for (i = 0 ; i < nb_cpu; ++i) {
727                                if (numa_bitmask_isbitset(mask,i))
728                                        max_node_cpu = i+1;
729                        }
730                        my_cpu = max_node_cpu;
731                }
732        }
733#endif
734        if (my_cpu < 0) {
735                my_cpu = nb_cpu;
736        }
737
738
739        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
740                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
741
742        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
743                snprintf(err, errlen,
744                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
745                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
746                return -1;
747        }
748
749        /* Make our mask with all cores turned on this is so that DPDK
750         * gets all CPU info in older versions */
751        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
752        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
753
754#if !DPDK_USE_BLACKLIST
755        /* Black list all ports besides the one that we want to use */
756        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
757                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
758                         " are you sure the address is correct?: %s", strerror(-ret));
759                return -1;
760        }
761#endif
762
763        /* Give the memory map a unique name */
764        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
765        /* rte_eal_init it makes a call to getopt so we need to reset the
766         * global optind variable of getopt otherwise this fails */
767        save_getopts(&save_opts);
768        optind = 1;
769        if ((ret = rte_eal_init(argc, argv)) < 0) {
770                snprintf(err, errlen,
771                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
772                return -1;
773        }
774        restore_getopts(&save_opts);
775        // These are still running but will never do anything with DPDK v1.7 we
776        // should remove this XXX in the future
777        for(i = 0; i < RTE_MAX_LCORE; ++i) {
778                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
779                        cfg->lcore_role[i] = ROLE_OFF;
780                        cfg->lcore_count--;
781                }
782        }
783        // Only the master should be running
784        assert(cfg->lcore_count == 1);
785
786        // TODO XXX TODO
787        dpdk_move_master_lcore(NULL, my_cpu-1);
788
789#if DEBUG
790        dump_configuration();
791#endif
792
793#if DPDK_USE_PMD_INIT
794        /* This registers all available NICs with Intel DPDK
795         * These are not loaded until rte_eal_pci_probe() is called.
796         */
797        if ((ret = rte_pmd_init_all()) < 0) {
798                snprintf(err, errlen,
799                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
800                return -1;
801        }
802#endif
803
804#if DPDK_USE_BLACKLIST
805        /* Blacklist all ports besides the one that we want to use */
806        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
807                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
808                         " are you sure the address is correct?: %s", strerror(-ret));
809                return -1;
810        }
811#endif
812
813#if DPDK_USE_PCI_PROBE
814        /* This loads DPDK drivers against all ports that are not blacklisted */
815        if ((ret = rte_eal_pci_probe()) < 0) {
816                snprintf(err, errlen,
817                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
818                return -1;
819        }
820#endif
821
822        format_data->nb_ports = rte_eth_dev_count();
823
824        if (format_data->nb_ports != 1) {
825                snprintf(err, errlen,
826                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
827                         format_data->nb_ports);
828                return -1;
829        }
830
831        return 0;
832}
833
834static int dpdk_init_input (libtrace_t *libtrace) {
835        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
836        char err[500];
837        err[0] = 0;
838
839        libtrace->format_data = (struct dpdk_format_data_t *)
840                                malloc(sizeof(struct dpdk_format_data_t));
841        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
842        FORMAT(libtrace)->nb_ports = 0;
843        FORMAT(libtrace)->snaplen = 0; /* Use default */
844        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
845        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
846        FORMAT(libtrace)->nic_numa_node = -1;
847        FORMAT(libtrace)->promisc = -1;
848        FORMAT(libtrace)->pktmbuf_pool = NULL;
849#if DPDK_USE_BLACKLIST
850        FORMAT(libtrace)->nb_blacklist = 0;
851#endif
852        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
853        FORMAT(libtrace)->mempool_name[0] = 0;
854        memset(FORMAT(libtrace)->burst_pkts, 0,
855               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
856        FORMAT(libtrace)->burst_size = 0;
857        FORMAT(libtrace)->burst_offset = 0;
858
859        /* Make our first stream */
860        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
861        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
862
863        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
864                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
865                free(libtrace->format_data);
866                libtrace->format_data = NULL;
867                return -1;
868        }
869        return 0;
870}
871
872static int dpdk_init_output(libtrace_out_t *libtrace)
873{
874        char err[500];
875        err[0] = 0;
876
877        libtrace->format_data = (struct dpdk_format_data_t *)
878                                malloc(sizeof(struct dpdk_format_data_t));
879        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
880        FORMAT(libtrace)->nb_ports = 0;
881        FORMAT(libtrace)->snaplen = 0; /* Use default */
882        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
883        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
884        FORMAT(libtrace)->nic_numa_node = -1;
885        FORMAT(libtrace)->promisc = -1;
886        FORMAT(libtrace)->pktmbuf_pool = NULL;
887#if DPDK_USE_BLACKLIST
888        FORMAT(libtrace)->nb_blacklist = 0;
889#endif
890        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
891        FORMAT(libtrace)->mempool_name[0] = 0;
892        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
893        FORMAT(libtrace)->burst_size = 0;
894        FORMAT(libtrace)->burst_offset = 0;
895
896        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
897                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
898                free(libtrace->format_data);
899                libtrace->format_data = NULL;
900                return -1;
901        }
902        return 0;
903}
904
905/**
906 * Note here snaplen excludes the MAC checksum. Packets over
907 * the requested snaplen will be dropped. (Excluding MAC checksum)
908 *
909 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
910 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
911 * is set the maximum size of the returned packet would be 1518 otherwise
912 * 1514 would be the largest size possibly returned.
913 *
914 */
915static int dpdk_config_input (libtrace_t *libtrace,
916                              trace_option_t option,
917                              void *data) {
918        switch (option) {
919        case TRACE_OPTION_SNAPLEN:
920                /* Only support changing snaplen before a call to start is
921                 * made */
922                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
923                        FORMAT(libtrace)->snaplen=*(int*)data;
924                else
925                        return -1;
926                return 0;
927        case TRACE_OPTION_PROMISC:
928                FORMAT(libtrace)->promisc=*(int*)data;
929                return 0;
930        case TRACE_OPTION_HASHER:
931                switch (*((enum hasher_types *) data))
932                {
933                case HASHER_BALANCE:
934                case HASHER_UNIDIRECTIONAL:
935                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
936                        return 0;
937                case HASHER_BIDIRECTIONAL:
938                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
939                        return 0;
940                case HASHER_CUSTOM:
941                        // We don't support these
942                        return -1;
943                }
944                break;
945        case TRACE_OPTION_FILTER:
946                /* TODO filtering */
947        case TRACE_OPTION_META_FREQ:
948        case TRACE_OPTION_EVENT_REALTIME:
949                break;
950        /* Avoid default: so that future options will cause a warning
951         * here to remind us to implement it, or flag it as
952         * unimplementable
953         */
954        }
955
956        /* Don't set an error - trace_config will try to deal with the
957         * option and will set an error if it fails */
958        return -1;
959}
960
961/* Can set jumbo frames/ or limit the size of a frame by setting both
962 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
963 *
964 */
965static struct rte_eth_conf port_conf = {
966        .rxmode = {
967                .mq_mode = ETH_RSS,
968                .split_hdr_size = 0,
969                .header_split   = 0, /**< Header Split disabled */
970                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
971                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
972                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
973                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
974#if GET_MAC_CRC_CHECKSUM
975/* So it appears that if hw_strip_crc is turned off the driver will still
976 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
977 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
978 * So lets just add it back on when we receive the packet.
979 */
980                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
981#else
982/* By default strip the MAC checksum because it's a bit of a hack to
983 * actually read these. And don't want to rely on disabling this to actualy
984 * always cut off the checksum in the future
985 */
986                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
987#endif
988        },
989        .txmode = {
990                .mq_mode = ETH_DCB_NONE,
991        },
992        .rx_adv_conf = {
993                .rss_conf = {
994                        // .rss_key = &rss_key, // We set this per format
995                        .rss_hf = RX_RSS_FLAGS,
996                },
997        },
998        .intr_conf = {
999                .lsc = 1
1000        }
1001};
1002
1003static const struct rte_eth_rxconf rx_conf = {
1004        .rx_thresh = {
1005                .pthresh = 8,/* RX_PTHRESH prefetch */
1006                .hthresh = 8,/* RX_HTHRESH host */
1007                .wthresh = 4,/* RX_WTHRESH writeback */
1008        },
1009        .rx_free_thresh = 0,
1010        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
1011};
1012
1013static const struct rte_eth_txconf tx_conf = {
1014        .tx_thresh = {
1015                /*
1016                 * TX_PTHRESH prefetch
1017                 * Set on the NIC, if the number of unprocessed descriptors to queued on
1018                 * the card fall below this try grab at least hthresh more unprocessed
1019                 * descriptors.
1020                 */
1021                .pthresh = 36,
1022
1023                /* TX_HTHRESH host
1024                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
1025                 */
1026                .hthresh = 0,
1027
1028                /* TX_WTHRESH writeback
1029                 * Set on the NIC, the number of sent descriptors before writing back
1030                 * status to confirm the transmission. This is done more efficiently as
1031                 * a bulk DMA-transfer rather than writing one at a time.
1032                 * Similar to tx_free_thresh however this is applied to the NIC, where
1033                 * as tx_free_thresh is when DPDK will check these. This is extended
1034                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
1035                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
1036                 */
1037                .wthresh = 4,
1038        },
1039
1040        /* Used internally by DPDK rather than passed to the NIC. The number of
1041         * packet descriptors to send before checking for any responses written
1042         * back (to confirm the transmission). Default = 32 if set to 0)
1043         */
1044        .tx_free_thresh = 0,
1045
1046        /* This is the Report Status threshold, used by 10Gbit cards,
1047         * This signals the card to only write back status (such as
1048         * transmission successful) after this minimum number of transmit
1049         * descriptors are seen. The default is 32 (if set to 0) however if set
1050         * to greater than 1 TX wthresh must be set to zero, because this is kindof
1051         * a replacement. See the dpdk programmers guide for more restrictions.
1052         */
1053        .tx_rs_thresh = 1,
1054};
1055
1056/**
1057 * A callback for a link state change (LSC).
1058 *
1059 * Packets may be received before this notification. In fact the DPDK IGXBE
1060 * driver likes to put a delay upto 5sec before sending this.
1061 *
1062 * We use this to ensure the link speed is correct for our timestamp
1063 * calculations. Because packets might be received before the link up we still
1064 * update this when the packet is received.
1065 *
1066 * @param port The DPDK port
1067 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1068 * @param cb_arg The dpdk_format_data_t structure associated with the format
1069 */
1070static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1071                              void *cb_arg) {
1072        struct dpdk_format_data_t * format_data = cb_arg;
1073        struct rte_eth_link link_info;
1074        assert(event == RTE_ETH_EVENT_INTR_LSC);
1075        assert(port == format_data->port);
1076
1077        rte_eth_link_get_nowait(port, &link_info);
1078
1079        if (link_info.link_status)
1080                format_data->link_speed = link_info.link_speed;
1081        else
1082                format_data->link_speed = 0;
1083
1084#if DEBUG
1085        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1086                link_info.link_status ? "up" : "down",
1087                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1088                                          "full-duplex" : "half-duplex",
1089                (int) link_info.link_speed);
1090#endif
1091
1092        /* Turns out DPDK drivers might not come back up if the link speed
1093         * changes. So we reset the autoneg procedure. This is very unsafe
1094         * we have have threads reading packets and we stop the port. */
1095#if 0
1096        if (!link_info.link_status) {
1097                int ret;
1098                rte_eth_dev_stop(port);
1099                ret = rte_eth_dev_start(port);
1100                if (ret < 0) {
1101                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1102                                strerror(-ret));
1103                }
1104        }
1105#endif
1106}
1107
1108/** Reserve a DPDK lcore ID for a thread globally.
1109 *
1110 * @param real If true allocate a real lcore, otherwise allocate a core which
1111 * does not exist on the local machine.
1112 * @param socket the prefered NUMA socket - only used if a real core is requested
1113 * @return a valid core, which can later be used with dpdk_register_lcore() or a
1114 * -1 if have run out of cores.
1115 *
1116 * If any thread is reading or freeing packets we need to register it here
1117 * due to TLS caches in the memory pools.
1118 */
1119static int dpdk_reserve_lcore(bool real, int socket) {
1120        int new_id = -1;
1121        int i;
1122        struct rte_config *cfg = rte_eal_get_configuration();
1123        (void) socket;
1124
1125        pthread_mutex_lock(&dpdk_lock);
1126        /* If 'reading packets' fill in cores from 0 up and bind affinity
1127         * otherwise start from the MAX core (which is also the master) and work backwards
1128         * in this case physical cores on the system will not exist so we don't bind
1129         * these to any particular physical core */
1130        if (real) {
1131#if HAVE_LIBNUMA
1132                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1133                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
1134                                new_id = i;
1135                                if (!lcore_config[i].detected)
1136                                        new_id = -1;
1137                                break;
1138                        }
1139                }
1140#endif
1141                /* Retry without the the numa restriction */
1142                if (new_id == -1) {
1143                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1144                                if (!rte_lcore_is_enabled(i)) {
1145                                        new_id = i;
1146                                        if (!lcore_config[i].detected)
1147                                                fprintf(stderr, "Warning the"
1148                                                        " number of 'reading' "
1149                                                        "threads exceed cores\n");
1150                                        break;
1151                                }
1152                        }
1153                }
1154        } else {
1155                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1156                        if (!rte_lcore_is_enabled(i)) {
1157                                new_id = i;
1158                                break;
1159                        }
1160                }
1161        }
1162
1163        if (new_id != -1) {
1164                /* Enable the core in global DPDK structs */
1165                cfg->lcore_role[new_id] = ROLE_RTE;
1166                cfg->lcore_count++;
1167        }
1168
1169        pthread_mutex_unlock(&dpdk_lock);
1170        return new_id;
1171}
1172
1173/** Register a thread as a lcore
1174 * @param libtrace any error is set against libtrace on exit
1175 * @param real If this is a true lcore we will bind its affinty to the
1176 * requested core.
1177 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
1178 * @return 0, if successful otherwise -1 if an error occured (details are stored
1179 * in libtrace)
1180 *
1181 * @note This must be called from the thread being registered.
1182 */
1183static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
1184        int ret;
1185        RTE_PER_LCORE(_lcore_id) = lcore;
1186
1187        /* Set affinity bind to corresponding core */
1188        if (real) {
1189                cpu_set_t cpuset;
1190                CPU_ZERO(&cpuset);
1191                CPU_SET(rte_lcore_id(), &cpuset);
1192                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1193                if (ret != 0) {
1194                        trace_set_err(libtrace, errno, "Warning "
1195                                      "pthread_setaffinity_np failed");
1196                        return -1;
1197                }
1198        }
1199
1200        return 0;
1201}
1202
1203/** Allocates a new dpdk packet buffer memory pool.
1204 *
1205 * @param n The number of threads
1206 * @param pkt_size The packet size we need ot store
1207 * @param socket_id The NUMA socket id
1208 * @param A new mempool, if NULL query the DPDK library for the error code
1209 * see rte_mempool_create() documentation.
1210 *
1211 * This allocates a new pool or recycles an existing memory pool.
1212 * Call dpdk_free_memory() to free the memory.
1213 * We cannot delete memory so instead we store the pools, allowing them to be
1214 * re-used.
1215 */
1216static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1217                                             unsigned pkt_size,
1218                                             int socket_id) {
1219        struct rte_mempool *ret;
1220        size_t j,k;
1221        char name[MEMPOOL_NAME_LEN];
1222
1223        /* Add on packet size overheads */
1224        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1225
1226        pthread_mutex_lock(&dpdk_lock);
1227
1228        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1229                /* Best guess go for zero */
1230                socket_id = 0;
1231        }
1232
1233        /* Find a valid pool */
1234        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1235                if (mem_pools[socket_id][j]->size >= n &&
1236                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1237                        break;
1238                }
1239        }
1240
1241        /* Find the end (+1) of the list */
1242        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1243
1244        if (mem_pools[socket_id][j]) {
1245                ret = mem_pools[socket_id][j];
1246                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1247                mem_pools[socket_id][k-1] = NULL;
1248                mem_pools[socket_id][j] = NULL;
1249        } else {
1250                static uint32_t test = 10;
1251                test++;
1252                snprintf(name, MEMPOOL_NAME_LEN,
1253                         "libtrace_pool_%"PRIu32, test);
1254
1255                ret = rte_mempool_create(name, n, pkt_size,
1256                                         128, sizeof(struct rte_pktmbuf_pool_private),
1257                                         rte_pktmbuf_pool_init, NULL,
1258                                         rte_pktmbuf_init, NULL,
1259                                         socket_id, 0);
1260        }
1261
1262        pthread_mutex_unlock(&dpdk_lock);
1263        return ret;
1264}
1265
1266/** Stores the memory against the DPDK library.
1267 *
1268 * @param mempool The mempool to free
1269 * @param socket_id The NUMA socket this mempool was allocated upon.
1270 *
1271 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1272 * store the memory shared globally against the format.
1273 */
1274static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1275        size_t i;
1276        pthread_mutex_lock(&dpdk_lock);
1277
1278        /* We should have all entries back in the mempool */
1279        rte_mempool_audit(mempool);
1280        if (!rte_mempool_full(mempool)) {
1281                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1282                        "free all packets before finishing a trace\n",
1283                        rte_mempool_avail_count(mempool), mempool->size);
1284        }
1285
1286        /* Find the end (+1) of the list */
1287        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1288
1289        if (i >= RTE_MAX_LCORE) {
1290                fprintf(stderr, "Too many memory pools, dropping this one\n");
1291        } else {
1292                mem_pools[socket_id][i] = mempool;
1293        }
1294
1295        pthread_mutex_unlock(&dpdk_lock);
1296}
1297
1298/* Attach memory to the port and start (or restart) the port/s.
1299 */
1300static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1301                              char *err, int errlen, uint16_t rx_queues) {
1302        int ret, i;
1303        struct rte_eth_link link_info; /* Wait for link */
1304        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1305
1306        /* Already started */
1307        if (format_data->paused == DPDK_RUNNING)
1308                return 0;
1309
1310        /* First time started we need to alloc our memory, doing this here
1311         * rather than in environment setup because we don't have snaplen then */
1312        if (format_data->paused == DPDK_NEVER_STARTED) {
1313                if (format_data->snaplen == 0) {
1314                        format_data->snaplen = RX_MBUF_SIZE;
1315                        port_conf.rxmode.jumbo_frame = 0;
1316                        port_conf.rxmode.max_rx_pkt_len = 0;
1317                } else {
1318                        /* Use jumbo frames */
1319                        port_conf.rxmode.jumbo_frame = 1;
1320                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1321                }
1322
1323#if GET_MAC_CRC_CHECKSUM
1324                /* This is additional overhead so make sure we allow space for this */
1325                format_data->snaplen += ETHER_CRC_LEN;
1326#endif
1327#if HAS_HW_TIMESTAMPS_82580
1328                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1329#endif
1330
1331                /* Create the mbuf pool, which is the place packets are allocated
1332                 * from - There is no free function (I cannot see one).
1333                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1334                 * allocate however that extra 1 packet is not used.
1335                 * (I assume <= vs < error some where in DPDK code)
1336                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1337                 * so that will fill the new buffer and wait until slots in the
1338                 * ring become available.
1339                 */
1340#if DEBUG
1341                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1342#endif
1343                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1344                                                              format_data->snaplen,
1345                                                              format_data->nic_numa_node);
1346
1347                if (format_data->pktmbuf_pool == NULL) {
1348                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1349                                 "pool failed: %s", strerror(rte_errno));
1350                        return -1;
1351                }
1352        }
1353
1354        /* ----------- Now do the setup for the port mapping ------------ */
1355        /* Order of calls must be
1356         * rte_eth_dev_configure()
1357         * rte_eth_tx_queue_setup()
1358         * rte_eth_rx_queue_setup()
1359         * rte_eth_dev_start()
1360         * other rte_eth calls
1361         */
1362
1363        /* This must be called first before another *eth* function
1364         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1365        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1366        if (ret < 0) {
1367                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1368                         " %"PRIu8" : %s", format_data->port,
1369                         strerror(-ret));
1370                return -1;
1371        }
1372#if DEBUG
1373        fprintf(stderr, "Doing dev configure\n");
1374#endif
1375        /* Initialise the TX queue a minimum value if using this port for
1376         * receiving. Otherwise a larger size if writing packets.
1377         */
1378        ret = rte_eth_tx_queue_setup(format_data->port,
1379                                     0 /* queue XXX */,
1380                                     format_data->nb_tx_buf,
1381                                     SOCKET_ID_ANY,
1382                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1383        if (ret < 0) {
1384                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1385                         " on port %"PRIu8" : %s", format_data->port,
1386                         strerror(-ret));
1387                return -1;
1388        }
1389
1390        /* Attach memory to our RX queues */
1391        for (i=0; i < rx_queues; i++) {
1392                dpdk_per_stream_t *stream;
1393#if DEBUG
1394                fprintf(stderr, "Configuring queue %d\n", i);
1395#endif
1396
1397                /* Add storage for the stream */
1398                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1399                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1400                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1401                stream->queue_id = i;
1402
1403                if (stream->lcore == -1)
1404                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1405
1406                if (stream->lcore == -1) {
1407                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1408                                 ". Too many threads?");
1409                        return -1;
1410                }
1411
1412                if (stream->mempool == NULL) {
1413                        stream->mempool = dpdk_alloc_memory(
1414                                                  format_data->nb_rx_buf*2,
1415                                                  format_data->snaplen,
1416                                                  rte_lcore_to_socket_id(stream->lcore));
1417
1418                        if (stream->mempool == NULL) {
1419                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1420                                         "pool failed: %s", strerror(rte_errno));
1421                                return -1;
1422                        }
1423                }
1424
1425                /* Initialise the RX queue with some packets from memory */
1426                ret = rte_eth_rx_queue_setup(format_data->port,
1427                                             stream->queue_id,
1428                                             format_data->nb_rx_buf,
1429                                             format_data->nic_numa_node,
1430                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1431                                             stream->mempool);
1432                if (ret < 0) {
1433                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1434                                 " RX queue on port %"PRIu8" : %s",
1435                                 format_data->port,
1436                                 strerror(-ret));
1437                        return -1;
1438                }
1439        }
1440
1441#if DEBUG
1442        fprintf(stderr, "Doing start device\n");
1443#endif
1444        rte_eth_stats_reset(format_data->port);
1445        /* Start device */
1446        ret = rte_eth_dev_start(format_data->port);
1447        if (ret < 0) {
1448                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1449                         strerror(-ret));
1450                return -1;
1451        }
1452
1453        /* Default promiscuous to on */
1454        if (format_data->promisc == -1)
1455                format_data->promisc = 1;
1456
1457        if (format_data->promisc == 1)
1458                rte_eth_promiscuous_enable(format_data->port);
1459        else
1460                rte_eth_promiscuous_disable(format_data->port);
1461
1462        /* We have now successfully started/unpased */
1463        format_data->paused = DPDK_RUNNING;
1464
1465
1466        /* Register a callback for link state changes */
1467        ret = rte_eth_dev_callback_register(format_data->port,
1468                                            RTE_ETH_EVENT_INTR_LSC,
1469                                            dpdk_lsc_callback,
1470                                            format_data);
1471#if DEBUG
1472        if (ret)
1473                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1474                        ret, strerror(-ret));
1475#endif
1476
1477        /* Get the current link status */
1478        rte_eth_link_get_nowait(format_data->port, &link_info);
1479        format_data->link_speed = link_info.link_speed;
1480#if DEBUG
1481        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1482                (int) link_info.link_duplex, (int) link_info.link_speed);
1483#endif
1484
1485        return 0;
1486}
1487
1488static int dpdk_start_input (libtrace_t *libtrace) {
1489        char err[500];
1490        err[0] = 0;
1491
1492        /* Make sure we don't reserve an extra thread for this */
1493        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1494
1495        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1496                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1497                free(libtrace->format_data);
1498                libtrace->format_data = NULL;
1499                return -1;
1500        }
1501        return 0;
1502}
1503
1504static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1505        struct rte_eth_dev_info dev_info;
1506        rte_eth_dev_info_get(port_id, &dev_info);
1507        return dev_info.max_rx_queues;
1508}
1509
1510static inline size_t dpdk_processor_count () {
1511        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1512        if (nb_cpu <= 0)
1513                return 1;
1514        else
1515                return (size_t) nb_cpu;
1516}
1517
1518static int dpdk_pstart_input (libtrace_t *libtrace) {
1519        char err[500];
1520        int i=0, phys_cores=0;
1521        int tot = libtrace->perpkt_thread_count;
1522        libtrace_list_node_t *n;
1523        err[0] = 0;
1524
1525        if (rte_lcore_id() != rte_get_master_lcore())
1526                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1527                        " from the master DPDK thread!\n");
1528
1529        /* If the master is not on the last thread we move it there */
1530        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1531                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1532                        return -1;
1533        }
1534
1535        /* Don't exceed the number of cores in the system/detected by dpdk
1536         * We don't have to force this but performance wont be good if we don't */
1537        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1538                if (lcore_config[i].detected) {
1539                        if (rte_lcore_is_enabled(i)) {
1540#if DEBUG
1541                                fprintf(stderr, "Found core %d already in use!\n", i);
1542#endif
1543                        } else {
1544                                phys_cores++;
1545                        }
1546                }
1547        }
1548        /* If we are restarting we have already allocated some threads as such
1549         * we add these back to the count for this calculation */
1550        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1551                dpdk_per_stream_t * stream = n->data;
1552                if (stream->lcore != -1)
1553                        phys_cores++;
1554        }
1555
1556        tot = MIN(libtrace->perpkt_thread_count,
1557                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1558        tot = MIN(tot, phys_cores);
1559
1560#if DEBUG
1561        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1562                libtrace->perpkt_thread_count, phys_cores);
1563#endif
1564
1565        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1566                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1567                free(libtrace->format_data);
1568                libtrace->format_data = NULL;
1569                return -1;
1570        }
1571
1572        /* Make sure we only start the number that we should */
1573        libtrace->perpkt_thread_count = tot;
1574        return 0;
1575}
1576
1577/**
1578 * Register a thread with the DPDK system,
1579 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1580 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1581 * gives it.
1582 *
1583 * We then allow a mapper thread to be started on every real core as DPDK would,
1584 * we also bind these to the corresponding CPU cores.
1585 *
1586 * @param libtrace A pointer to the trace
1587 * @param reading True if the thread will be used to read packets, i.e. will
1588 *                call pread_packet(), false if thread used to process packet
1589 *                in any other manner including statistics functions.
1590 */
1591static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1592{
1593#if DEBUG
1594        char name[99];
1595        name[0] = 0;
1596#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1597        pthread_getname_np(pthread_self(),
1598                           name, sizeof(name));
1599#endif
1600#endif
1601        if (reading) {
1602                dpdk_per_stream_t *stream;
1603                /* Attach our thread */
1604                if(t->type == THREAD_PERPKT) {
1605                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1606                        if (t->format_data == NULL) {
1607                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1608                                              "Too many threads registered");
1609                                return -1;
1610                        }
1611                } else {
1612                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1613                }
1614                stream = t->format_data;
1615#if DEBUG
1616                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1617#endif
1618                return dpdk_register_lcore(libtrace, true, stream->lcore);
1619        } else {
1620                int lcore = dpdk_reserve_lcore(reading, 0);
1621                if (lcore == -1) {
1622                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1623                                      " for DPDK");
1624                        return -1;
1625                }
1626#if DEBUG
1627                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1628#endif
1629                return dpdk_register_lcore(libtrace, false, lcore);
1630        }
1631
1632        return 0;
1633}
1634
1635/**
1636 * Unregister a thread with the DPDK system.
1637 *
1638 * Only previously registered threads should be calling this just before
1639 * they are destroyed.
1640 */
1641static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1642{
1643        struct rte_config *cfg = rte_eal_get_configuration();
1644
1645        assert(rte_lcore_id() < RTE_MAX_LCORE);
1646        pthread_mutex_lock(&dpdk_lock);
1647        /* Skip if master */
1648        if (rte_lcore_id() == rte_get_master_lcore()) {
1649                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1650                pthread_mutex_unlock(&dpdk_lock);
1651                return;
1652        }
1653
1654        /* Disable this core in global DPDK structs */
1655        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1656        cfg->lcore_count--;
1657        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1658        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1659        pthread_mutex_unlock(&dpdk_lock);
1660        return;
1661}
1662
1663static int dpdk_start_output(libtrace_out_t *libtrace)
1664{
1665        char err[500];
1666        err[0] = 0;
1667
1668        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1669                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1670                free(libtrace->format_data);
1671                libtrace->format_data = NULL;
1672                return -1;
1673        }
1674        return 0;
1675}
1676
1677static int dpdk_pause_input(libtrace_t * libtrace) {
1678        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1679        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1680        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1681#if DEBUG
1682                fprintf(stderr, "Pausing DPDK port\n");
1683#endif
1684                rte_eth_dev_stop(FORMAT(libtrace)->port);
1685                FORMAT(libtrace)->paused = DPDK_PAUSED;
1686                /* Empty the queue of packets */
1687                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1688                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1689                }
1690                FORMAT(libtrace)->burst_offset = 0;
1691                FORMAT(libtrace)->burst_size = 0;
1692
1693                for (; tmp != NULL; tmp = tmp->next) {
1694                        dpdk_per_stream_t *stream = tmp->data;
1695                        stream->ts_last_sys = 0;
1696#if HAS_HW_TIMESTAMPS_82580
1697                        stream->ts_first_sys = 0;
1698#endif
1699                }
1700
1701        }
1702        return 0;
1703}
1704
1705static int dpdk_write_packet(libtrace_out_t *trace,
1706                             libtrace_packet_t *packet){
1707        struct rte_mbuf* m_buff[1];
1708
1709        int wirelen = trace_get_wire_length(packet);
1710        int caplen = trace_get_capture_length(packet);
1711
1712        /* Check for a checksum and remove it */
1713        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1714            wirelen == caplen)
1715                caplen -= ETHER_CRC_LEN;
1716
1717        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1718        if (m_buff[0] == NULL) {
1719                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1720                return -1;
1721        } else {
1722                int ret;
1723                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1724                do {
1725                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1726                } while (ret != 1);
1727        }
1728
1729        return 0;
1730}
1731
1732static int dpdk_fin_input(libtrace_t * libtrace) {
1733        libtrace_list_node_t * n;
1734        /* Free our memory structures */
1735        if (libtrace->format_data != NULL) {
1736
1737                if (FORMAT(libtrace)->port != 0xFF)
1738                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1739                                                        RTE_ETH_EVENT_INTR_LSC,
1740                                                        dpdk_lsc_callback,
1741                                                        FORMAT(libtrace));
1742                /* Close the device completely, device cannot be restarted */
1743                rte_eth_dev_close(FORMAT(libtrace)->port);
1744
1745                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1746                                 FORMAT(libtrace)->nic_numa_node);
1747
1748                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1749                        dpdk_per_stream_t * stream = n->data;
1750                        if (stream->mempool)
1751                                dpdk_free_memory(stream->mempool,
1752                                                 rte_lcore_to_socket_id(stream->lcore));
1753                }
1754
1755                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1756                /* filter here if we used it */
1757                free(libtrace->format_data);
1758        }
1759
1760        return 0;
1761}
1762
1763
1764static int dpdk_fin_output(libtrace_out_t * libtrace) {
1765        /* Free our memory structures */
1766        if (libtrace->format_data != NULL) {
1767                /* Close the device completely, device cannot be restarted */
1768                if (FORMAT(libtrace)->port != 0xFF)
1769                        rte_eth_dev_close(FORMAT(libtrace)->port);
1770                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1771                /* filter here if we used it */
1772                free(libtrace->format_data);
1773        }
1774
1775        return 0;
1776}
1777
1778/**
1779 * Get the start of the additional header that we added to a packet.
1780 */
1781static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1782        assert(packet);
1783        assert(packet->buffer);
1784        /* Our header sits straight after the mbuf header */
1785        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1786}
1787
1788static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1789        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1790        return hdr->cap_len;
1791}
1792
1793static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1794        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1795        if (size > hdr->cap_len) {
1796                /* Cannot make a packet bigger */
1797                return trace_get_capture_length(packet);
1798        }
1799
1800        /* Reset the cached capture length first*/
1801        packet->capture_length = -1;
1802        hdr->cap_len = (uint32_t) size;
1803        return trace_get_capture_length(packet);
1804}
1805
1806static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1807        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1808        int org_cap_size; /* The original capture size */
1809        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1810                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1811                               sizeof(struct hw_timestamp_82580);
1812        } else {
1813                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1814        }
1815        if (hdr->flags & INCLUDES_CHECKSUM) {
1816                return org_cap_size;
1817        } else {
1818                /* DPDK packets are always TRACE_TYPE_ETH packets */
1819                return org_cap_size + ETHER_CRC_LEN;
1820        }
1821}
1822
1823static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1824        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1825        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1826                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1827                                sizeof(struct hw_timestamp_82580);
1828        else
1829                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1830}
1831
1832static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1833                               libtrace_packet_t *packet, void *buffer,
1834                               libtrace_rt_types_t rt_type, uint32_t flags) {
1835        assert(packet);
1836        if (packet->buffer != buffer &&
1837            packet->buf_control == TRACE_CTRL_PACKET) {
1838                free(packet->buffer);
1839        }
1840
1841        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1842                packet->buf_control = TRACE_CTRL_PACKET;
1843        else
1844                packet->buf_control = TRACE_CTRL_EXTERNAL;
1845
1846        packet->buffer = buffer;
1847        packet->header = buffer;
1848
1849        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1850        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1851        packet->type = rt_type;
1852        return 0;
1853}
1854
1855/**
1856 * Given a packet size and a link speed, computes the
1857 * time to transmit in nanoseconds.
1858 *
1859 * @param format_data The dpdk format data from which we get the link speed
1860 *        and if unset updates it in a thread safe manner
1861 * @param pkt_size The size of the packet in bytes
1862 * @return The wire time in nanoseconds
1863 */
1864static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1865        uint32_t wire_time;
1866        /* 20 extra bytes of interframe gap and preamble */
1867# if GET_MAC_CRC_CHECKSUM
1868        wire_time = ((pkt_size + 20) * 8000);
1869# else
1870        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1871# endif
1872
1873        /* Division is really slow and introduces a pipeline stall
1874         * The compiler will optimise this into magical multiplication and shifting
1875         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1876         */
1877retry_calc_wiretime:
1878        switch (format_data->link_speed) {
1879        case ETH_SPEED_NUM_40G:
1880                wire_time /=  ETH_SPEED_NUM_40G;
1881                break;
1882        case ETH_SPEED_NUM_20G:
1883                wire_time /= ETH_SPEED_NUM_20G;
1884                break;
1885        case ETH_SPEED_NUM_10G:
1886                wire_time /= ETH_SPEED_NUM_10G;
1887                break;
1888        case ETH_SPEED_NUM_1G:
1889                wire_time /= ETH_SPEED_NUM_1G;
1890                break;
1891        case 0:
1892                {
1893                /* Maybe the link was down originally, but now it should be up */
1894                struct rte_eth_link link = {0};
1895                rte_eth_link_get_nowait(format_data->port, &link);
1896                if (link.link_status && link.link_speed) {
1897                        format_data->link_speed = link.link_speed;
1898#ifdef DEBUG
1899                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1900#endif
1901                        goto retry_calc_wiretime;
1902                }
1903                /* We don't know the link speed, make sure numbers are counting up */
1904                wire_time = 1;
1905                break;
1906                }
1907        default:
1908                wire_time /= format_data->link_speed;
1909        }
1910        return wire_time;
1911}
1912
1913/**
1914 * Does any extra preperation to all captured packets
1915 * This includes adding our extra header to it with the timestamp,
1916 * and any snapping
1917 *
1918 * @param format_data The DPDK format data
1919 * @param plc The DPDK per lcore format data
1920 * @param pkts An array of size nb_pkts of DPDK packets
1921 */
1922static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1923                                   struct dpdk_per_stream_t *plc,
1924                                   struct rte_mbuf **pkts,
1925                                   size_t nb_pkts) {
1926        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1927        struct dpdk_addt_hdr *hdr;
1928        size_t i;
1929        uint64_t cur_sys_time_ns;
1930#if HAS_HW_TIMESTAMPS_82580
1931        struct hw_timestamp_82580 *hw_ts;
1932        uint64_t estimated_wraps;
1933#else
1934
1935#endif
1936
1937#if USE_CLOCK_GETTIME
1938        struct timespec cur_sys_time = {0};
1939        /* This looks terrible and I feel bad doing it. But it's OK
1940         * on new kernels, because this is a fast vsyscall */
1941        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1942        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1943#else
1944        struct timeval cur_sys_time = {0};
1945        /* Also a fast vsyscall */
1946        gettimeofday(&cur_sys_time, NULL);
1947        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1948#endif
1949
1950        /* The system clock is not perfect so when running
1951         * at linerate we could timestamp a packet in the past.
1952         * To avoid this we munge the timestamp to appear 1ns
1953         * after the previous packet. We should eventually catch up
1954         * to system time since a 64byte packet on a 10G link takes 67ns.
1955         *
1956         * Note with parallel readers timestamping packets
1957         * with duplicate stamps or out of order is unavoidable without
1958         * hardware timestamping from the NIC.
1959         */
1960#if !HAS_HW_TIMESTAMPS_82580
1961        if (plc->ts_last_sys >= cur_sys_time_ns) {
1962                cur_sys_time_ns = plc->ts_last_sys + 1;
1963        }
1964#endif
1965
1966        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1967        for (i = 0 ; i < nb_pkts ; ++i) {
1968
1969                /* We put our header straight after the dpdk header */
1970                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1971                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1972
1973#if GET_MAC_CRC_CHECKSUM
1974                /* Add back in the CRC sum */
1975                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1976                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1977                hdr->flags |= INCLUDES_CHECKSUM;
1978#endif
1979
1980                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1981
1982#if HAS_HW_TIMESTAMPS_82580
1983                /* The timestamp is sitting before our packet and is included in pkt_len */
1984                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1985                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1986                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1987
1988                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1989                 *
1990                 *        +----------+---+   +--------------+
1991                 *  82580 |    24    | 8 |   |      32      |
1992                 *        +----------+---+   +--------------+
1993                 *          reserved  \______ 40 bits _____/
1994                 *
1995                 * The 40 bit 82580 SYSTIM overflows every
1996                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1997                 *
1998                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1999                 * Endian (for the full 64 bits) i.e. picture is mirrored
2000                 */
2001
2002                /* Despite what the documentation says this is in Little
2003                 * Endian byteorder. Mask the reserved section out.
2004                 */
2005                hdr->timestamp = le64toh(hw_ts->timestamp) &
2006                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
2007
2008                if (unlikely(plc->ts_first_sys == 0)) {
2009                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
2010                        plc->ts_last_sys = plc->ts_first_sys;
2011                }
2012
2013                /* This will have serious problems if packets aren't read quickly
2014                 * that is within a couple of seconds because our clock cycles every
2015                 * 18 seconds */
2016                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
2017                                  / (1ull<<TS_NBITS_82580);
2018
2019                /* Estimated_wraps gives the number of times the counter should have
2020                 * wrapped (however depending on value last time it could have wrapped
2021                 * twice more (if hw clock is close to its max value) or once less (allowing
2022                 * for a bit of variance between hw and sys clock). But if the clock
2023                 * shouldn't have wrapped once then don't allow it to go backwards in time */
2024                if (unlikely(estimated_wraps >= 2)) {
2025                        /* 2 or more wrap arounds add all but the very last wrap */
2026                        plc->wrap_count += estimated_wraps - 1;
2027                }
2028
2029                /* Set the timestamp to the lowest possible value we're considering */
2030                hdr->timestamp += plc->ts_first_sys +
2031                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
2032
2033                /* In most runs only the first if() will need evaluating - i.e our
2034                 * estimate is correct. */
2035                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
2036                                              hdr->timestamp, MAXSKEW_82580))) {
2037                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
2038                        plc->wrap_count++;
2039                        hdr->timestamp += (1ull<<TS_NBITS_82580);
2040                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
2041                                             hdr->timestamp, MAXSKEW_82580)) {
2042                                /* Failed to match estimated_wraps */
2043                                plc->wrap_count++;
2044                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2045                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2046                                                     hdr->timestamp, MAXSKEW_82580)) {
2047                                        if (estimated_wraps == 0) {
2048                                                /* 0 case Failed to match estimated_wraps+2 */
2049                                                printf("WARNING - Hardware Timestamp failed to"
2050                                                       " match using systemtime!\n");
2051                                                hdr->timestamp = cur_sys_time_ns;
2052                                        } else {
2053                                                /* Failed to match estimated_wraps+1 */
2054                                                plc->wrap_count++;
2055                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2056                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2057                                                                     hdr->timestamp, MAXSKEW_82580)) {
2058                                                        /* Failed to match estimated_wraps+2 */
2059                                                        printf("WARNING - Hardware Timestamp failed to"
2060                                                               " match using systemtime!!\n");
2061                                                }
2062                                        }
2063                                }
2064                        }
2065                }
2066#else
2067
2068                hdr->timestamp = cur_sys_time_ns;
2069                /* Offset the next packet by the wire time of previous */
2070                calculate_wire_time(format_data, hdr->cap_len);
2071
2072#endif
2073        }
2074
2075        plc->ts_last_sys = cur_sys_time_ns;
2076        return;
2077}
2078
2079
2080static void dpdk_fin_packet(libtrace_packet_t *packet)
2081{
2082        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2083                rte_pktmbuf_free(packet->buffer);
2084                packet->buffer = NULL;
2085        }
2086}
2087
2088/** Reads at least one packet or returns an error
2089 */
2090static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
2091                                           dpdk_per_stream_t *stream,
2092                                           libtrace_message_queue_t *mesg,
2093                                           struct rte_mbuf* pkts_burst[],
2094                                           size_t nb_packets) {
2095        size_t nb_rx; /* Number of rx packets we've recevied */
2096        while (1) {
2097                /* Poll for a batch of packets */
2098                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2099                                         stream->queue_id, pkts_burst, nb_packets);
2100                if (nb_rx > 0) {
2101                        /* Got some packets - otherwise we keep spining */
2102                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
2103                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2104                        return nb_rx;
2105                }
2106                /* Check the message queue this could be less than 0 */
2107                if (mesg && libtrace_message_queue_count(mesg) > 0)
2108                        return READ_MESSAGE;
2109                if (libtrace_halt)
2110                        return READ_EOF;
2111                /* Wait a while, polling on memory degrades performance
2112                 * This relieves the pressure on memory allowing the NIC to DMA */
2113                rte_delay_us(10);
2114        }
2115
2116        /* We'll never get here - but if we did it would be bad */
2117        return READ_ERROR;
2118}
2119
2120static int dpdk_pread_packets (libtrace_t *libtrace,
2121                                    libtrace_thread_t *t,
2122                                    libtrace_packet_t **packets,
2123                                    size_t nb_packets) {
2124        int nb_rx; /* Number of rx packets we've recevied */
2125        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2126        int i;
2127        dpdk_per_stream_t *stream = t->format_data;
2128
2129        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
2130                                         pkts_burst, nb_packets);
2131
2132        if (nb_rx > 0) {
2133                for (i = 0; i < nb_rx; ++i) {
2134                        if (packets[i]->buffer != NULL) {
2135                                /* The packet should always be finished */
2136                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
2137                                free(packets[i]->buffer);
2138                        }
2139                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2140                        packets[i]->type = TRACE_RT_DATA_DPDK;
2141                        packets[i]->buffer = pkts_burst[i];
2142                        packets[i]->trace = libtrace;
2143                        packets[i]->error = 1;
2144                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2145                }
2146        }
2147
2148        return nb_rx;
2149}
2150
2151static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2152        int nb_rx; /* Number of rx packets we've received */
2153        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2154
2155        /* Free the last packet buffer */
2156        if (packet->buffer != NULL) {
2157                /* The packet should always be finished */
2158                assert(packet->buf_control == TRACE_CTRL_PACKET);
2159                free(packet->buffer);
2160                packet->buffer = NULL;
2161        }
2162
2163        packet->buf_control = TRACE_CTRL_EXTERNAL;
2164        packet->type = TRACE_RT_DATA_DPDK;
2165
2166        /* Check if we already have some packets buffered */
2167        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2168                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2169                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2170                return 1; // TODO should be bytes read, which essentially useless anyway
2171        }
2172
2173        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2174                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2175
2176        if (nb_rx > 0) {
2177                FORMAT(libtrace)->burst_size = nb_rx;
2178                FORMAT(libtrace)->burst_offset = 1;
2179                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2180                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2181                return 1;
2182        }
2183        return nb_rx;
2184}
2185
2186static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2187        struct timeval tv;
2188        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2189
2190        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2191        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2192        return tv;
2193}
2194
2195static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2196        struct timespec ts;
2197        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2198
2199        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2200        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2201        return ts;
2202}
2203
2204static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2205        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2206}
2207
2208static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2209        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2210        return (libtrace_direction_t) hdr->direction;
2211}
2212
2213static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2214        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2215        hdr->direction = (uint8_t) direction;
2216        return (libtrace_direction_t) hdr->direction;
2217}
2218
2219static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2220        struct rte_eth_stats dev_stats = {0};
2221
2222        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2223                return;
2224
2225        /* Grab the current stats */
2226        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2227
2228        stats->captured_valid = true;
2229        stats->captured = dev_stats.ipackets;
2230
2231        stats->dropped_valid = true;
2232        stats->dropped = dev_stats.imissed;
2233
2234#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2235        /* DPDK commit 86057c fixes ensures missed does not get counted as
2236         * errors */
2237        stats->errors_valid = true;
2238        stats->errors = dev_stats.ierrors;
2239#else
2240        /* DPDK errors includes drops */
2241        stats->errors_valid = true;
2242        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2243#endif
2244        stats->received_valid = true;
2245        stats->received = dev_stats.ipackets + dev_stats.imissed;
2246
2247}
2248
2249/* Attempts to read a packet in a non-blocking fashion. If one is not
2250 * available a SLEEP event is returned. We do not have the ability to
2251 * create a select()able file descriptor in DPDK.
2252 */
2253static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2254                                            libtrace_packet_t *packet) {
2255        libtrace_eventobj_t event = {0,0,0.0,0};
2256        int nb_rx; /* Number of receive packets we've read */
2257        struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2258
2259        do {
2260
2261                /* See if we already have a packet waiting */
2262                nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2263                                         FORMAT_DATA_FIRST(trace)->queue_id,
2264                                         pkts_burst, 1);
2265
2266                if (nb_rx > 0) {
2267                        /* Free the last packet buffer */
2268                        if (packet->buffer != NULL) {
2269                                /* The packet should always be finished */
2270                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2271                                free(packet->buffer);
2272                                packet->buffer = NULL;
2273                        }
2274
2275                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2276                        packet->type = TRACE_RT_DATA_DPDK;
2277                        event.type = TRACE_EVENT_PACKET;
2278                        dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
2279                        packet->buffer = FORMAT(trace)->burst_pkts[0];
2280                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2281                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2282
2283                        /* XXX - Check this passes the filter trace_read_packet normally
2284                         * does this for us but this wont */
2285                        if (trace->filter) {
2286                                if (!trace_apply_filter(trace->filter, packet)) {
2287                                        /* Failed the filter so we loop for another packet */
2288                                        trace->filtered_packets ++;
2289                                        continue;
2290                                }
2291                        }
2292                        trace->accepted_packets ++;
2293                } else {
2294                        /* We only want to sleep for a very short time - we are non-blocking */
2295                        event.type = TRACE_EVENT_SLEEP;
2296                        event.seconds = 0.0001;
2297                        event.size = 0;
2298                }
2299
2300                /* If we get here we have our event */
2301                break;
2302        } while (1);
2303
2304        return event;
2305}
2306
2307static void dpdk_help(void) {
2308        printf("dpdk format module: $Revision: 1752 $\n");
2309        printf("Supported input URIs:\n");
2310        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2311        printf("\tThe -<coreid> is optional \n");
2312        printf("\t e.g. dpdk:0000:01:00.1\n");
2313        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2314        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2315        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2316        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2317        printf("\n");
2318        printf("Supported output URIs:\n");
2319        printf("\tSame format as the input URI.\n");
2320        printf("\t e.g. dpdk:0000:01:00.1\n");
2321        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2322        printf("\n");
2323}
2324
2325static struct libtrace_format_t dpdk = {
2326        "dpdk",
2327        "$Id$",
2328        TRACE_FORMAT_DPDK,
2329        NULL,                               /* probe filename */
2330        NULL,                               /* probe magic */
2331        dpdk_init_input,                    /* init_input */
2332        dpdk_config_input,                  /* config_input */
2333        dpdk_start_input,                   /* start_input */
2334        dpdk_pause_input,                   /* pause_input */
2335        dpdk_init_output,                   /* init_output */
2336        NULL,                               /* config_output */
2337        dpdk_start_output,                  /* start_ouput */
2338        dpdk_fin_input,                     /* fin_input */
2339        dpdk_fin_output,                    /* fin_output */
2340        dpdk_read_packet,                   /* read_packet */
2341        dpdk_prepare_packet,                /* prepare_packet */
2342        dpdk_fin_packet,                    /* fin_packet */
2343        dpdk_write_packet,                  /* write_packet */
2344        dpdk_get_link_type,                 /* get_link_type */
2345        dpdk_get_direction,                 /* get_direction */
2346        dpdk_set_direction,                 /* set_direction */
2347        NULL,                               /* get_erf_timestamp */
2348        dpdk_get_timeval,                   /* get_timeval */
2349        dpdk_get_timespec,                  /* get_timespec */
2350        NULL,                               /* get_seconds */
2351        NULL,                               /* seek_erf */
2352        NULL,                               /* seek_timeval */
2353        NULL,                               /* seek_seconds */
2354        dpdk_get_capture_length,            /* get_capture_length */
2355        dpdk_get_wire_length,               /* get_wire_length */
2356        dpdk_get_framing_length,            /* get_framing_length */
2357        dpdk_set_capture_length,            /* set_capture_length */
2358        NULL,                               /* get_received_packets */
2359        NULL,                               /* get_filtered_packets */
2360        NULL,                               /* get_dropped_packets */
2361        dpdk_get_stats,                     /* get_statistics */
2362        NULL,                               /* get_fd */
2363        dpdk_trace_event,                   /* trace_event */
2364        dpdk_help,                          /* help */
2365        NULL,                               /* next pointer */
2366        {true, 8},                          /* Live, NICs typically have 8 threads */
2367        dpdk_pstart_input,                  /* pstart_input */
2368        dpdk_pread_packets,                 /* pread_packets */
2369        dpdk_pause_input,                   /* ppause */
2370        dpdk_fin_input,                     /* p_fin */
2371        dpdk_pregister_thread,              /* pregister_thread */
2372        dpdk_punregister_thread,            /* punregister_thread */
2373        NULL                                /* get thread stats */
2374};
2375
2376void dpdk_constructor(void) {
2377        register_format(&dpdk);
2378}
Note: See TracBrowser for help on using the repository browser.