source: lib/format_dpdk.c @ db84bb2

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since db84bb2 was db84bb2, checked in by Shane Alcock <salcock@…>, 4 years ago

Ensure packet->order is always strictly incrementing

We cannot equate timestamp with packet->order, as some timestamp
methods are not strictly monotonic (ring: and int:).

Each format is now responsible for determining packet->order
during pread, so that the format can detect and correct such
inaccuracies.

More specifically, ring: and int: will cache the last reported
timestamp per thread and if time goes backwards, the order will
be set to last+1, otherwise the timestamp will be used.

DAG and DPDK still use the timestamp for ordering, since there
have been no issues with the timestamp ordering for these formats
(thus far!).

  • Property mode set to 100644
File size: 77.2 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 *
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44
45#ifdef HAVE_INTTYPES_H
46#  include <inttypes.h>
47#else
48# error "Can't find inttypes.h"
49#endif
50
51#include <stdlib.h>
52#include <assert.h>
53#include <unistd.h>
54#include <endian.h>
55#include <string.h>
56
57#if HAVE_LIBNUMA
58#include <numa.h>
59#endif
60
61/* We can deal with any minor differences by checking the RTE VERSION
62 * Typically DPDK backports some fixes (typically for building against
63 * newer kernels) to the older version of DPDK.
64 *
65 * These get released with the rX suffix. The following macros where added
66 * in these new releases.
67 *
68 * Below this is a log of version that required changes to the libtrace
69 * code (that we still attempt to support).
70 *
71 * DPDK 16.04 or newer is recommended.
72 * However 1.6 and newer are still likely supported.
73 */
74#include <rte_eal.h>
75#include <rte_version.h>
76#ifndef RTE_VERSION_NUM
77#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
78#endif
79#ifndef RTE_VER_PATCH_RELEASE
80#       define RTE_VER_PATCH_RELEASE 0
81#endif
82#ifndef RTE_VERSION
83#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
84        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
85#endif
86
87/* 1.6.0r2 :
88 *      rte_eal_pci_set_blacklist() is removed
89 *      device_list is renamed to pci_device_list
90 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
91 *      as such we do apply the whitelist before rte_eal_init.
92 *      This also works correctly with DPDK 1.6.0r2.
93 *
94 * Replaced by:
95 *      rte_devargs (we can simply whitelist)
96 */
97#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
98#       define DPDK_USE_BLACKLIST 1
99#else
100#       define DPDK_USE_BLACKLIST 0
101#endif
102
103/*
104 * 1.7.0 :
105 *      rte_pmd_init_all is removed
106 *
107 * Replaced by:
108 *      Nothing, no longer needed
109 */
110#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
111#       define DPDK_USE_PMD_INIT 1
112#else
113#       define DPDK_USE_PMD_INIT 0
114#endif
115
116/* 1.7.0-rc3 :
117 *
118 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
119 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
120 * it twice.
121 */
122#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
123#       define DPDK_USE_PCI_PROBE 1
124#else
125#       define DPDK_USE_PCI_PROBE 0
126#endif
127
128/* 1.8.0-rc1 :
129 * LOG LEVEL is a command line option which overrides what
130 * we previously set it to.
131 */
132#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
133#       define DPDK_USE_LOG_LEVEL 1
134#else
135#       define DPDK_USE_LOG_LEVEL 0
136#endif
137
138/* 1.8.0-rc2
139 * rx/tx_conf thresholds can be set to NULL in rte_eth_rx/tx_queue_setup
140 * this uses the default values, which are better tuned per device
141 * See issue #26
142 */
143#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 2)
144#       define DPDK_USE_NULL_QUEUE_CONFIG 1
145#else
146#       define DPDK_USE_NULL_QUEUE_CONFIG 0
147#endif
148
149/* 2.0.0-rc1
150 * Unifies RSS hash between cards
151 */
152#if RTE_VERSION >= RTE_VERSION_NUM(2, 0, 0, 1)
153#       define RX_RSS_FLAGS (ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | \
154                             ETH_RSS_SCTP)
155#else
156#       define RX_RSS_FLAGS (ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | \
157                             ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP |\
158                             ETH_RSS_IPV6_UDP)
159#endif
160
161/* v16.07-rc1 - deprecated
162 * rte_mempool_avail_count to replace rte_mempool_count
163 * rte_mempool_in_use_count to replace rte_mempool_free_count
164 */
165#if RTE_VERSION < RTE_VERSION_NUM(16, 7, 0, 1)
166#define rte_mempool_avail_count rte_mempool_count
167#define rte_mempool_in_use_count rte_mempool_free_count
168#endif
169
170#include <rte_per_lcore.h>
171#include <rte_debug.h>
172#include <rte_errno.h>
173#include <rte_common.h>
174#include <rte_log.h>
175#include <rte_memcpy.h>
176#include <rte_prefetch.h>
177#include <rte_branch_prediction.h>
178#include <rte_pci.h>
179#include <rte_ether.h>
180#include <rte_ethdev.h>
181#include <rte_ring.h>
182#include <rte_mempool.h>
183#include <rte_mbuf.h>
184#include <rte_launch.h>
185#include <rte_lcore.h>
186#include <rte_per_lcore.h>
187#include <rte_cycles.h>
188#include <pthread.h>
189#ifdef __FreeBSD__
190#include <pthread_np.h>
191#endif
192
193/* 16.04-rc3 ETH_LINK_SPEED_X are replaced with ETH_SPEED_NUM_X.
194 * ETH_LINK_SPEED_ are reused as flags, ugly.
195 * We use the new way in this code.
196 */
197#ifndef ETH_SPEED_NUM_1G
198        #define ETH_SPEED_NUM_1G ETH_LINK_SPEED_1000
199        #define ETH_SPEED_NUM_10G ETH_LINK_SPEED_10G
200        #define ETH_SPEED_NUM_20G ETH_LINK_SPEED_20G
201        #define ETH_SPEED_NUM_40G ETH_LINK_SPEED_40G
202#endif
203
204/* The default size of memory buffers to use - This is the max size of standard
205 * ethernet packet less the size of the MAC CHECKSUM */
206#define RX_MBUF_SIZE 1514
207
208/* The minimum number of memory buffers per queue tx or rx. Based on
209 * the requirement of the memory pool with 128 per thread buffers, needing
210 * at least 128*1.5 = 192 buffers. Our code allocates 128*2 to be safe.
211 */
212#define MIN_NB_BUF 128
213
214/* Number of receive memory buffers to use
215 * By default this is limited by driver to 4k and must be a multiple of 128.
216 * A modification can be made to the driver to remove this limit.
217 * This can be increased in the driver and here.
218 * Should be at least MIN_NB_BUF.
219 * We choose 2K rather than 4K because it enables the usage of sse vector
220 * drivers which are significantly faster than using the larger buffer.
221 */
222#define NB_RX_MBUF (4096/2)
223
224/* Number of send memory buffers to use.
225 * Same limits apply as those to NB_TX_MBUF.
226 */
227#define NB_TX_MBUF 1024
228
229/* The size of the PCI blacklist needs to be big enough to contain
230 * every PCI device address (listed by lspci every bus:device.function tuple).
231 */
232#define BLACK_LIST_SIZE 50
233
234/* The maximum number of characters the mempool name can be */
235#define MEMPOOL_NAME_LEN 20
236
237/* For single threaded libtrace we read packets as a batch/burst
238 * this is the maximum size of said burst */
239#define BURST_SIZE 32
240
241#define MBUF(x) ((struct rte_mbuf *) x)
242/* Get the original placement of the packet data */
243#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
244#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
245#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
246
247#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
248#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
249
250#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
251                        (uint64_t) tv.tv_usec*1000ull)
252#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
253                        (uint64_t) ts.tv_nsec)
254
255#if RTE_PKTMBUF_HEADROOM != 128
256#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
257         "any libtrace instance processing these packet must be have the" \
258         "same RTE_PKTMBUF_HEADROOM set"
259#endif
260
261/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
262 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
263 *
264 * Make sure you understand what these are doing before enabling them.
265 * They might make traces incompatible with other builds etc.
266 *
267 * These are also included to show how to do somethings which aren't
268 * obvious in the DPDK documentation.
269 */
270
271/* Print verbose messages to stderr */
272#define DEBUG 0
273
274/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
275 * only turn on if you know clock_gettime is a vsyscall on your system
276 * otherwise could be a large overhead. Again gettimeofday() should be
277 * vsyscall also if it's not you should seriously consider updating your
278 * kernel.
279 */
280#ifdef HAVE_CLOCK_GETTIME
281/* You can turn this on (set to 1) to prefer clock_gettime */
282#define USE_CLOCK_GETTIME 1
283#else
284/* DON'T CHANGE THIS !!! */
285#define USE_CLOCK_GETTIME 0
286#endif
287
288/* This is fairly safe to turn on - currently there appears to be a 'bug'
289 * in DPDK that will remove the checksum by making the packet appear 4bytes
290 * smaller than what it really is. Most formats don't include the checksum
291 * hence writing out a port such as int: ring: and dpdk: assumes there
292 * is no checksum and will attempt to write the checksum as part of the
293 * packet
294 */
295#define GET_MAC_CRC_CHECKSUM 0
296
297/* This requires a modification of the pmd drivers (inside Intel DPDK)
298 * TODO this requires updating (packet sizes are wrong TS most likely also)
299 */
300#define HAS_HW_TIMESTAMPS_82580 0
301
302#if HAS_HW_TIMESTAMPS_82580
303# define TS_NBITS_82580     40
304/* The maximum on the +ve or -ve side that we can be, make it half way */
305# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
306#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
307#endif
308
309static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
310/* Memory pools Per NUMA node */
311static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
312
313/* As per Intel 82580 specification - mismatch in 82580 datasheet
314 * it states ts is stored in Big Endian, however its actually Little */
315struct hw_timestamp_82580 {
316        uint64_t reserved;
317        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
318};
319
320enum paused_state {
321        DPDK_NEVER_STARTED,
322        DPDK_RUNNING,
323        DPDK_PAUSED,
324};
325
326struct dpdk_per_stream_t
327{
328        uint16_t queue_id;
329        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
330        struct rte_mempool *mempool;
331        int lcore;
332#if HAS_HW_TIMESTAMPS_82580
333        /* Timestamping only relevant to RX */
334        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
335        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
336#endif
337} ALIGN_STRUCT(CACHE_LINE_SIZE);
338
339#if HAS_HW_TIMESTAMPS_82580
340#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
341#else
342#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
343#endif
344
345typedef struct dpdk_per_stream_t dpdk_per_stream_t;
346
347/* Used by both input and output however some fields are not used
348 * for output */
349struct dpdk_format_data_t {
350        int8_t promisc; /* promiscuous mode - RX only */
351        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
352        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
353        uint8_t paused; /* See paused_state */
354        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
355        int snaplen; /* The snap length for the capture - RX only */
356        /* We always have to setup both rx and tx queues even if we don't want them */
357        int nb_rx_buf; /* The number of packet buffers in the rx ring */
358        int nb_tx_buf; /* The number of packet buffers in the tx ring */
359        int nic_numa_node; /* The NUMA node that the NIC is attached to */
360        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
361#if DPDK_USE_BLACKLIST
362        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
363        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
364#endif
365        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
366        enum hasher_types hasher_type;
367        uint8_t *rss_key;
368        /* To improve single-threaded performance we always batch reading
369         * packets, in a burst, otherwise the parallel library does this for us */
370        struct rte_mbuf* burst_pkts[BURST_SIZE];
371        int burst_size; /* The total number read in the burst */
372        int burst_offset; /* The offset we are into the burst */
373
374        /* Our parallel streams */
375        libtrace_list_t *per_stream;
376};
377
378enum dpdk_addt_hdr_flags {
379        INCLUDES_CHECKSUM = 0x1,
380        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
381};
382
383/**
384 * A structure placed in front of the packet where we can store
385 * additional information about the given packet.
386 * +--------------------------+
387 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
388 * +--------------------------+
389 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
390 * +--------------------------+
391 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
392 * +--------------------------+
393 * *   hw_timestamp_82580     * 16 bytes Optional
394 * +--------------------------+
395 * |       Packet data        | Variable Size
396 * |                          |
397 */
398struct dpdk_addt_hdr {
399        uint64_t timestamp;
400        uint8_t flags;
401        uint8_t direction;
402        uint8_t reserved1;
403        uint8_t reserved2;
404        uint32_t cap_len; /* The size to say the capture is */
405};
406
407/**
408 * We want to blacklist all devices except those on the whitelist
409 * (I say list, but yes it is only the one).
410 *
411 * The default behaviour of rte_pci_probe() will map every possible device
412 * to its DPDK driver. The DPDK driver will take the ethernet device
413 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
414 *
415 * So blacklist all devices except the one that we wish to use so that
416 * the others can still be used as standard ethernet ports.
417 *
418 * @return 0 if successful, otherwise -1 on error.
419 */
420#if DPDK_USE_BLACKLIST
421static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
422{
423        struct rte_pci_device *dev = NULL;
424        format_data->nb_blacklist = 0;
425
426        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
427
428        TAILQ_FOREACH(dev, &device_list, next) {
429        if (whitelist != NULL && whitelist->domain == dev->addr.domain
430            && whitelist->bus == dev->addr.bus
431            && whitelist->devid == dev->addr.devid
432            && whitelist->function == dev->addr.function)
433            continue;
434                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
435                                / sizeof (format_data->blacklist[0])) {
436                        fprintf(stderr, "Warning: too many devices to blacklist consider"
437                                        " increasing BLACK_LIST_SIZE");
438                        break;
439                }
440                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
441                ++format_data->nb_blacklist;
442        }
443
444        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
445        return 0;
446}
447#else /* DPDK_USE_BLACKLIST */
448#include <rte_devargs.h>
449static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
450{
451        char pci_str[20] = {0};
452        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
453                 whitelist->domain,
454                 whitelist->bus,
455                 whitelist->devid,
456                 whitelist->function);
457        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
458                return -1;
459        }
460        return 0;
461}
462#endif
463
464/**
465 * Parse the URI format as a pci address
466 * Fills in addr, note core is optional and is unchanged if
467 * a value for it is not provided.
468 *
469 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
470 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
471 */
472static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
473        int matches;
474        assert(str);
475        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
476                         &addr->domain, &addr->bus, &addr->devid,
477                         &addr->function, core);
478        if (matches >= 4) {
479                return 0;
480        } else {
481                return -1;
482        }
483}
484
485/**
486 * Convert a pci address to the numa node it is
487 * connected to.
488 *
489 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
490 * so we can call it before DPDK
491 *
492 * @return -1 if unknown otherwise a number 0 or higher of the numa node
493 */
494static int pci_to_numa(struct rte_pci_addr * dev_addr) {
495        char path[50] = {0};
496        FILE *file;
497
498        /* Read from the system */
499        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
500                 dev_addr->domain,
501                 dev_addr->bus,
502                 dev_addr->devid,
503                 dev_addr->function);
504
505        if((file = fopen(path, "r")) != NULL) {
506                int numa_node = -1;
507                fscanf(file, "%d", &numa_node);
508                fclose(file);
509                return numa_node;
510        }
511        return -1;
512}
513
514#if DEBUG
515/* For debugging */
516static inline void dump_configuration()
517{
518        struct rte_config * global_config;
519        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
520
521        if (nb_cpu <= 0) {
522                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
523                       " Falling back to the first core.");
524                nb_cpu = 1; /* fallback to just 1 core */
525        }
526        if (nb_cpu > RTE_MAX_LCORE)
527                nb_cpu = RTE_MAX_LCORE;
528
529        global_config = rte_eal_get_configuration();
530
531        if (global_config != NULL) {
532                int i;
533                fprintf(stderr, "Intel DPDK setup\n"
534                        "---Version      : %s\n"
535                        "---Master LCore : %"PRIu32"\n"
536                        "---LCore Count  : %"PRIu32"\n",
537                        rte_version(),
538                        global_config->master_lcore, global_config->lcore_count);
539
540                for (i = 0 ; i < nb_cpu; i++) {
541                        fprintf(stderr, "   ---Core %d : %s\n", i,
542                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
543                }
544
545                const char * proc_type;
546                switch (global_config->process_type) {
547                case RTE_PROC_AUTO:
548                        proc_type = "auto";
549                        break;
550                case RTE_PROC_PRIMARY:
551                        proc_type = "primary";
552                        break;
553                case RTE_PROC_SECONDARY:
554                        proc_type = "secondary";
555                        break;
556                case RTE_PROC_INVALID:
557                        proc_type = "invalid";
558                        break;
559                default:
560                        proc_type = "something worse than invalid!!";
561                }
562                fprintf(stderr, "---Process Type : %s\n", proc_type);
563        }
564
565}
566#endif
567
568/**
569 * Expects to be called from the master lcore and moves it to the given dpdk id
570 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
571 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
572 *               and not already in use.
573 * @return 0 is successful otherwise -1 on error.
574 */
575static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
576        struct rte_config *cfg = rte_eal_get_configuration();
577        cpu_set_t cpuset;
578        int i;
579
580        assert (core < RTE_MAX_LCORE);
581        assert (rte_get_master_lcore() == rte_lcore_id());
582
583        if (core == rte_lcore_id())
584                return 0;
585
586        /* Make sure we are not overwriting someone else */
587        assert(!rte_lcore_is_enabled(core));
588
589        /* Move the core */
590        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
591        cfg->lcore_role[core] = ROLE_RTE;
592        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
593        rte_eal_get_configuration()->master_lcore = core;
594        RTE_PER_LCORE(_lcore_id) = core;
595
596        /* Now change the affinity, either mapped to a single core or all accepted */
597        CPU_ZERO(&cpuset);
598
599        if (lcore_config[core].detected) {
600                CPU_SET(core, &cpuset);
601        } else {
602                for (i = 0; i < RTE_MAX_LCORE; ++i) {
603                        if (lcore_config[i].detected)
604                                CPU_SET(i, &cpuset);
605                }
606        }
607
608        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
609        if (i != 0) {
610                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
611                return -1;
612        }
613        return 0;
614}
615
616/**
617 * XXX This is very bad XXX
618 * But we have to do something to allow getopts nesting
619 * Luckly normally the format is last so it doesn't matter
620 * DPDK only supports modern systems so hopefully this
621 * will continue to work
622 */
623struct saved_getopts {
624        char *optarg;
625        int optind;
626        int opterr;
627        int optopt;
628};
629
630static void save_getopts(struct saved_getopts *opts) {
631        opts->optarg = optarg;
632        opts->optind = optind;
633        opts->opterr = opterr;
634        opts->optopt = optopt;
635}
636
637static void restore_getopts(struct saved_getopts *opts) {
638        optarg = opts->optarg;
639        optind = opts->optind;
640        opterr = opts->opterr;
641        optopt = opts->optopt;
642}
643
644static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
645                                        char * err, int errlen) {
646        int ret; /* Returned error codes */
647        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
648        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
649        char mem_map[20] = {0}; /* The memory name */
650        long nb_cpu; /* The number of CPUs in the system */
651        long my_cpu; /* The CPU number we want to bind to */
652        int i;
653        struct rte_config *cfg = rte_eal_get_configuration();
654        struct saved_getopts save_opts;
655
656        /* This initialises the Environment Abstraction Layer (EAL)
657         * If we had slave workers these are put into WAITING state
658         *
659         * Basically binds this thread to a fixed core, which we choose as
660         * the last core on the machine (assuming fewer interrupts mapped here).
661         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
662         * "-n" the number of memory channels into the CPU (hardware specific)
663         *      - Most likely to be half the number of ram slots in your machine.
664         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
665         * Controls where in memory packets are stored such that they are spread
666         * across the channels. We just use 1 to be safe.
667         *
668         * Using unique file prefixes mean separate memory is used, unlinking
669         * the two processes. However be careful we still cannot access a
670         * port that already in use.
671         */
672        char* argv[] = {"libtrace",
673                        "-c", cpu_number,
674                        "-n", "1",
675                        "--proc-type", "auto",
676                        "--file-prefix", mem_map,
677                        "-m", "512",
678#if DPDK_USE_LOG_LEVEL
679#       if DEBUG
680                        "--log-level", "8", /* RTE_LOG_DEBUG */
681#       else
682                        "--log-level", "5", /* RTE_LOG_WARNING */
683#       endif
684#endif
685                        NULL};
686        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
687
688#if DEBUG
689        rte_set_log_level(RTE_LOG_DEBUG);
690#else
691        rte_set_log_level(RTE_LOG_WARNING);
692#endif
693
694        /* Get the number of cpu cores in the system and use the last core
695         * on the correct numa node */
696        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
697        if (nb_cpu <= 0) {
698                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
699                       " Falling back to the first core.");
700                nb_cpu = 1; /* fallback to the first core */
701        }
702        if (nb_cpu > RTE_MAX_LCORE)
703                nb_cpu = RTE_MAX_LCORE;
704
705        my_cpu = -1;
706        /* This allows the user to specify the core - we would try to do this
707         * automatically but it's hard to tell that this is secondary
708         * before running rte_eal_init(...). Currently we are limited to 1
709         * instance per core due to the way memory is allocated. */
710        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
711                snprintf(err, errlen, "Failed to parse URI");
712                return -1;
713        }
714
715#if HAVE_LIBNUMA
716        format_data->nic_numa_node = pci_to_numa(&use_addr);
717        if (my_cpu < 0) {
718#if DEBUG
719                /* If we can assign to a core on the same numa node */
720                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
721#endif
722                if(format_data->nic_numa_node >= 0) {
723                        int max_node_cpu = -1;
724                        struct bitmask *mask = numa_allocate_cpumask();
725                        assert(mask);
726                        numa_node_to_cpus(format_data->nic_numa_node, mask);
727                        for (i = 0 ; i < nb_cpu; ++i) {
728                                if (numa_bitmask_isbitset(mask,i))
729                                        max_node_cpu = i+1;
730                        }
731                        my_cpu = max_node_cpu;
732                }
733        }
734#endif
735        if (my_cpu < 0) {
736                my_cpu = nb_cpu;
737        }
738
739
740        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
741                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
742
743        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
744                snprintf(err, errlen,
745                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
746                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
747                return -1;
748        }
749
750        /* Make our mask with all cores turned on this is so that DPDK
751         * gets all CPU info in older versions */
752        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
753        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
754
755#if !DPDK_USE_BLACKLIST
756        /* Black list all ports besides the one that we want to use */
757        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
758                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
759                         " are you sure the address is correct?: %s", strerror(-ret));
760                return -1;
761        }
762#endif
763
764        /* Give the memory map a unique name */
765        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
766        /* rte_eal_init it makes a call to getopt so we need to reset the
767         * global optind variable of getopt otherwise this fails */
768        save_getopts(&save_opts);
769        optind = 1;
770        if ((ret = rte_eal_init(argc, argv)) < 0) {
771                snprintf(err, errlen,
772                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
773                return -1;
774        }
775        restore_getopts(&save_opts);
776        // These are still running but will never do anything with DPDK v1.7 we
777        // should remove this XXX in the future
778        for(i = 0; i < RTE_MAX_LCORE; ++i) {
779                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
780                        cfg->lcore_role[i] = ROLE_OFF;
781                        cfg->lcore_count--;
782                }
783        }
784        // Only the master should be running
785        assert(cfg->lcore_count == 1);
786
787        // TODO XXX TODO
788        dpdk_move_master_lcore(NULL, my_cpu-1);
789
790#if DEBUG
791        dump_configuration();
792#endif
793
794#if DPDK_USE_PMD_INIT
795        /* This registers all available NICs with Intel DPDK
796         * These are not loaded until rte_eal_pci_probe() is called.
797         */
798        if ((ret = rte_pmd_init_all()) < 0) {
799                snprintf(err, errlen,
800                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
801                return -1;
802        }
803#endif
804
805#if DPDK_USE_BLACKLIST
806        /* Blacklist all ports besides the one that we want to use */
807        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
808                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
809                         " are you sure the address is correct?: %s", strerror(-ret));
810                return -1;
811        }
812#endif
813
814#if DPDK_USE_PCI_PROBE
815        /* This loads DPDK drivers against all ports that are not blacklisted */
816        if ((ret = rte_eal_pci_probe()) < 0) {
817                snprintf(err, errlen,
818                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
819                return -1;
820        }
821#endif
822
823        format_data->nb_ports = rte_eth_dev_count();
824
825        if (format_data->nb_ports != 1) {
826                snprintf(err, errlen,
827                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
828                         format_data->nb_ports);
829                return -1;
830        }
831
832        return 0;
833}
834
835static int dpdk_init_input (libtrace_t *libtrace) {
836        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
837        char err[500];
838        err[0] = 0;
839
840        libtrace->format_data = (struct dpdk_format_data_t *)
841                                malloc(sizeof(struct dpdk_format_data_t));
842        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
843        FORMAT(libtrace)->nb_ports = 0;
844        FORMAT(libtrace)->snaplen = 0; /* Use default */
845        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
846        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
847        FORMAT(libtrace)->nic_numa_node = -1;
848        FORMAT(libtrace)->promisc = -1;
849        FORMAT(libtrace)->pktmbuf_pool = NULL;
850#if DPDK_USE_BLACKLIST
851        FORMAT(libtrace)->nb_blacklist = 0;
852#endif
853        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
854        FORMAT(libtrace)->mempool_name[0] = 0;
855        memset(FORMAT(libtrace)->burst_pkts, 0,
856               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
857        FORMAT(libtrace)->burst_size = 0;
858        FORMAT(libtrace)->burst_offset = 0;
859        FORMAT(libtrace)->hasher_type = HASHER_BALANCE;
860        FORMAT(libtrace)->rss_key = NULL;
861
862        /* Make our first stream */
863        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
864        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
865
866        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
867                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
868                free(libtrace->format_data);
869                libtrace->format_data = NULL;
870                return -1;
871        }
872        return 0;
873}
874
875static int dpdk_init_output(libtrace_out_t *libtrace)
876{
877        char err[500];
878        err[0] = 0;
879
880        libtrace->format_data = (struct dpdk_format_data_t *)
881                                malloc(sizeof(struct dpdk_format_data_t));
882        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
883        FORMAT(libtrace)->nb_ports = 0;
884        FORMAT(libtrace)->snaplen = 0; /* Use default */
885        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
886        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
887        FORMAT(libtrace)->nic_numa_node = -1;
888        FORMAT(libtrace)->promisc = -1;
889        FORMAT(libtrace)->pktmbuf_pool = NULL;
890#if DPDK_USE_BLACKLIST
891        FORMAT(libtrace)->nb_blacklist = 0;
892#endif
893        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
894        FORMAT(libtrace)->mempool_name[0] = 0;
895        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
896        FORMAT(libtrace)->burst_size = 0;
897        FORMAT(libtrace)->burst_offset = 0;
898
899        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
900                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
901                free(libtrace->format_data);
902                libtrace->format_data = NULL;
903                return -1;
904        }
905        return 0;
906}
907
908/**
909 * Note here snaplen excludes the MAC checksum. Packets over
910 * the requested snaplen will be dropped. (Excluding MAC checksum)
911 *
912 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
913 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
914 * is set the maximum size of the returned packet would be 1518 otherwise
915 * 1514 would be the largest size possibly returned.
916 *
917 */
918static int dpdk_config_input (libtrace_t *libtrace,
919                              trace_option_t option,
920                              void *data) {
921        switch (option) {
922        case TRACE_OPTION_SNAPLEN:
923                /* Only support changing snaplen before a call to start is
924                 * made */
925                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
926                        FORMAT(libtrace)->snaplen=*(int*)data;
927                else
928                        return -1;
929                return 0;
930        case TRACE_OPTION_PROMISC:
931                FORMAT(libtrace)->promisc=*(int*)data;
932                return 0;
933        case TRACE_OPTION_HASHER:
934                switch (*((enum hasher_types *) data))
935                {
936                case HASHER_BALANCE:
937                case HASHER_UNIDIRECTIONAL:
938                case HASHER_BIDIRECTIONAL:
939                        FORMAT(libtrace)->hasher_type = *(enum hasher_types*)data;
940                        if (FORMAT(libtrace)->rss_key)
941                                free(FORMAT(libtrace)->rss_key);
942                        FORMAT(libtrace)->rss_key = NULL;
943                        return 0;
944                case HASHER_CUSTOM:
945                        // Let libtrace do this
946                        return -1;
947                }
948                break;
949        case TRACE_OPTION_FILTER:
950                /* TODO filtering */
951        case TRACE_OPTION_META_FREQ:
952        case TRACE_OPTION_EVENT_REALTIME:
953                break;
954        /* Avoid default: so that future options will cause a warning
955         * here to remind us to implement it, or flag it as
956         * unimplementable
957         */
958        }
959
960        /* Don't set an error - trace_config will try to deal with the
961         * option and will set an error if it fails */
962        return -1;
963}
964
965/* Can set jumbo frames/ or limit the size of a frame by setting both
966 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
967 *
968 */
969static struct rte_eth_conf port_conf = {
970        .rxmode = {
971                .mq_mode = ETH_RSS,
972                .split_hdr_size = 0,
973                .header_split   = 0, /**< Header Split disabled */
974                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
975                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
976                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
977                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
978#if GET_MAC_CRC_CHECKSUM
979/* So it appears that if hw_strip_crc is turned off the driver will still
980 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
981 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
982 * So lets just add it back on when we receive the packet.
983 */
984                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
985#else
986/* By default strip the MAC checksum because it's a bit of a hack to
987 * actually read these. And don't want to rely on disabling this to actualy
988 * always cut off the checksum in the future
989 */
990                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
991#endif
992        },
993        .txmode = {
994                .mq_mode = ETH_DCB_NONE,
995        },
996        .rx_adv_conf = {
997                .rss_conf = {
998                        .rss_hf = RX_RSS_FLAGS,
999                },
1000        },
1001        .intr_conf = {
1002                .lsc = 1
1003        }
1004};
1005
1006static const struct rte_eth_rxconf rx_conf = {
1007        .rx_thresh = {
1008                .pthresh = 8,/* RX_PTHRESH prefetch */
1009                .hthresh = 8,/* RX_HTHRESH host */
1010                .wthresh = 4,/* RX_WTHRESH writeback */
1011        },
1012        .rx_free_thresh = 0,
1013        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
1014};
1015
1016static const struct rte_eth_txconf tx_conf = {
1017        .tx_thresh = {
1018                /*
1019                 * TX_PTHRESH prefetch
1020                 * Set on the NIC, if the number of unprocessed descriptors to queued on
1021                 * the card fall below this try grab at least hthresh more unprocessed
1022                 * descriptors.
1023                 */
1024                .pthresh = 36,
1025
1026                /* TX_HTHRESH host
1027                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
1028                 */
1029                .hthresh = 0,
1030
1031                /* TX_WTHRESH writeback
1032                 * Set on the NIC, the number of sent descriptors before writing back
1033                 * status to confirm the transmission. This is done more efficiently as
1034                 * a bulk DMA-transfer rather than writing one at a time.
1035                 * Similar to tx_free_thresh however this is applied to the NIC, where
1036                 * as tx_free_thresh is when DPDK will check these. This is extended
1037                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
1038                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
1039                 */
1040                .wthresh = 4,
1041        },
1042
1043        /* Used internally by DPDK rather than passed to the NIC. The number of
1044         * packet descriptors to send before checking for any responses written
1045         * back (to confirm the transmission). Default = 32 if set to 0)
1046         */
1047        .tx_free_thresh = 0,
1048
1049        /* This is the Report Status threshold, used by 10Gbit cards,
1050         * This signals the card to only write back status (such as
1051         * transmission successful) after this minimum number of transmit
1052         * descriptors are seen. The default is 32 (if set to 0) however if set
1053         * to greater than 1 TX wthresh must be set to zero, because this is kindof
1054         * a replacement. See the dpdk programmers guide for more restrictions.
1055         */
1056        .tx_rs_thresh = 1,
1057};
1058
1059/**
1060 * A callback for a link state change (LSC).
1061 *
1062 * Packets may be received before this notification. In fact the DPDK IGXBE
1063 * driver likes to put a delay upto 5sec before sending this.
1064 *
1065 * We use this to ensure the link speed is correct for our timestamp
1066 * calculations. Because packets might be received before the link up we still
1067 * update this when the packet is received.
1068 *
1069 * @param port The DPDK port
1070 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1071 * @param cb_arg The dpdk_format_data_t structure associated with the format
1072 */
1073static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1074                              void *cb_arg) {
1075        struct dpdk_format_data_t * format_data = cb_arg;
1076        struct rte_eth_link link_info;
1077        assert(event == RTE_ETH_EVENT_INTR_LSC);
1078        assert(port == format_data->port);
1079
1080        rte_eth_link_get_nowait(port, &link_info);
1081
1082        if (link_info.link_status)
1083                format_data->link_speed = link_info.link_speed;
1084        else
1085                format_data->link_speed = 0;
1086
1087#if DEBUG
1088        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1089                link_info.link_status ? "up" : "down",
1090                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1091                                          "full-duplex" : "half-duplex",
1092                (int) link_info.link_speed);
1093#endif
1094
1095        /* Turns out DPDK drivers might not come back up if the link speed
1096         * changes. So we reset the autoneg procedure. This is very unsafe
1097         * we have have threads reading packets and we stop the port. */
1098#if 0
1099        if (!link_info.link_status) {
1100                int ret;
1101                rte_eth_dev_stop(port);
1102                ret = rte_eth_dev_start(port);
1103                if (ret < 0) {
1104                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1105                                strerror(-ret));
1106                }
1107        }
1108#endif
1109}
1110
1111/** Reserve a DPDK lcore ID for a thread globally.
1112 *
1113 * @param real If true allocate a real lcore, otherwise allocate a core which
1114 * does not exist on the local machine.
1115 * @param socket the prefered NUMA socket - only used if a real core is requested
1116 * @return a valid core, which can later be used with dpdk_register_lcore() or a
1117 * -1 if have run out of cores.
1118 *
1119 * If any thread is reading or freeing packets we need to register it here
1120 * due to TLS caches in the memory pools.
1121 */
1122static int dpdk_reserve_lcore(bool real, int socket) {
1123        int new_id = -1;
1124        int i;
1125        struct rte_config *cfg = rte_eal_get_configuration();
1126        (void) socket;
1127
1128        pthread_mutex_lock(&dpdk_lock);
1129        /* If 'reading packets' fill in cores from 0 up and bind affinity
1130         * otherwise start from the MAX core (which is also the master) and work backwards
1131         * in this case physical cores on the system will not exist so we don't bind
1132         * these to any particular physical core */
1133        if (real) {
1134#if HAVE_LIBNUMA
1135                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1136                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
1137                                new_id = i;
1138                                if (!lcore_config[i].detected)
1139                                        new_id = -1;
1140                                break;
1141                        }
1142                }
1143#endif
1144                /* Retry without the the numa restriction */
1145                if (new_id == -1) {
1146                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1147                                if (!rte_lcore_is_enabled(i)) {
1148                                        new_id = i;
1149                                        if (!lcore_config[i].detected)
1150                                                fprintf(stderr, "Warning the"
1151                                                        " number of 'reading' "
1152                                                        "threads exceed cores\n");
1153                                        break;
1154                                }
1155                        }
1156                }
1157        } else {
1158                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1159                        if (!rte_lcore_is_enabled(i)) {
1160                                new_id = i;
1161                                break;
1162                        }
1163                }
1164        }
1165
1166        if (new_id != -1) {
1167                /* Enable the core in global DPDK structs */
1168                cfg->lcore_role[new_id] = ROLE_RTE;
1169                cfg->lcore_count++;
1170        }
1171
1172        pthread_mutex_unlock(&dpdk_lock);
1173        return new_id;
1174}
1175
1176/** Register a thread as a lcore
1177 * @param libtrace any error is set against libtrace on exit
1178 * @param real If this is a true lcore we will bind its affinty to the
1179 * requested core.
1180 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
1181 * @return 0, if successful otherwise -1 if an error occured (details are stored
1182 * in libtrace)
1183 *
1184 * @note This must be called from the thread being registered.
1185 */
1186static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
1187        int ret;
1188        RTE_PER_LCORE(_lcore_id) = lcore;
1189
1190        /* Set affinity bind to corresponding core */
1191        if (real) {
1192                cpu_set_t cpuset;
1193                CPU_ZERO(&cpuset);
1194                CPU_SET(rte_lcore_id(), &cpuset);
1195                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1196                if (ret != 0) {
1197                        trace_set_err(libtrace, errno, "Warning "
1198                                      "pthread_setaffinity_np failed");
1199                        return -1;
1200                }
1201        }
1202
1203        return 0;
1204}
1205
1206/** Allocates a new dpdk packet buffer memory pool.
1207 *
1208 * @param n The number of threads
1209 * @param pkt_size The packet size we need ot store
1210 * @param socket_id The NUMA socket id
1211 * @param A new mempool, if NULL query the DPDK library for the error code
1212 * see rte_mempool_create() documentation.
1213 *
1214 * This allocates a new pool or recycles an existing memory pool.
1215 * Call dpdk_free_memory() to free the memory.
1216 * We cannot delete memory so instead we store the pools, allowing them to be
1217 * re-used.
1218 */
1219static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1220                                             unsigned pkt_size,
1221                                             int socket_id) {
1222        struct rte_mempool *ret;
1223        size_t j,k;
1224        char name[MEMPOOL_NAME_LEN];
1225
1226        /* Add on packet size overheads */
1227        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1228
1229        pthread_mutex_lock(&dpdk_lock);
1230
1231        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1232                /* Best guess go for zero */
1233                socket_id = 0;
1234        }
1235
1236        /* Find a valid pool */
1237        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1238                if (mem_pools[socket_id][j]->size >= n &&
1239                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1240                        break;
1241                }
1242        }
1243
1244        /* Find the end (+1) of the list */
1245        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1246
1247        if (mem_pools[socket_id][j]) {
1248                ret = mem_pools[socket_id][j];
1249                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1250                mem_pools[socket_id][k-1] = NULL;
1251                mem_pools[socket_id][j] = NULL;
1252        } else {
1253                static uint32_t test = 10;
1254                test++;
1255                snprintf(name, MEMPOOL_NAME_LEN,
1256                         "libtrace_pool_%"PRIu32, test);
1257
1258                ret = rte_mempool_create(name, n, pkt_size,
1259                                         128, sizeof(struct rte_pktmbuf_pool_private),
1260                                         rte_pktmbuf_pool_init, NULL,
1261                                         rte_pktmbuf_init, NULL,
1262                                         socket_id, 0);
1263        }
1264
1265        pthread_mutex_unlock(&dpdk_lock);
1266        return ret;
1267}
1268
1269/** Stores the memory against the DPDK library.
1270 *
1271 * @param mempool The mempool to free
1272 * @param socket_id The NUMA socket this mempool was allocated upon.
1273 *
1274 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1275 * store the memory shared globally against the format.
1276 */
1277static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1278        size_t i;
1279        pthread_mutex_lock(&dpdk_lock);
1280
1281        /* We should have all entries back in the mempool */
1282        rte_mempool_audit(mempool);
1283        if (!rte_mempool_full(mempool)) {
1284                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1285                        "free all packets before finishing a trace\n",
1286                        rte_mempool_avail_count(mempool), mempool->size);
1287        }
1288
1289        /* Find the end (+1) of the list */
1290        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1291
1292        if (i >= RTE_MAX_LCORE) {
1293                fprintf(stderr, "Too many memory pools, dropping this one\n");
1294        } else {
1295                mem_pools[socket_id][i] = mempool;
1296        }
1297
1298        pthread_mutex_unlock(&dpdk_lock);
1299}
1300
1301/* Attach memory to the port and start (or restart) the port/s.
1302 */
1303static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1304                              char *err, int errlen, uint16_t rx_queues) {
1305        int ret, i;
1306        struct rte_eth_link link_info; /* Wait for link */
1307        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1308
1309        /* Already started */
1310        if (format_data->paused == DPDK_RUNNING)
1311                return 0;
1312
1313        /* First time started we need to alloc our memory, doing this here
1314         * rather than in environment setup because we don't have snaplen then */
1315        if (format_data->paused == DPDK_NEVER_STARTED) {
1316                if (format_data->snaplen == 0) {
1317                        format_data->snaplen = RX_MBUF_SIZE;
1318                        port_conf.rxmode.jumbo_frame = 0;
1319                        port_conf.rxmode.max_rx_pkt_len = 0;
1320                } else {
1321                        /* Use jumbo frames */
1322                        port_conf.rxmode.jumbo_frame = 1;
1323                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1324                }
1325
1326#if GET_MAC_CRC_CHECKSUM
1327                /* This is additional overhead so make sure we allow space for this */
1328                format_data->snaplen += ETHER_CRC_LEN;
1329#endif
1330#if HAS_HW_TIMESTAMPS_82580
1331                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1332#endif
1333
1334                /* Create the mbuf pool, which is the place packets are allocated
1335                 * from - There is no free function (I cannot see one).
1336                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1337                 * allocate however that extra 1 packet is not used.
1338                 * (I assume <= vs < error some where in DPDK code)
1339                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1340                 * so that will fill the new buffer and wait until slots in the
1341                 * ring become available.
1342                 */
1343#if DEBUG
1344                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1345#endif
1346                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1347                                                              format_data->snaplen,
1348                                                              format_data->nic_numa_node);
1349
1350                if (format_data->pktmbuf_pool == NULL) {
1351                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1352                                 "pool failed: %s", strerror(rte_errno));
1353                        return -1;
1354                }
1355        }
1356
1357        /* Generate the hash key, based on the device */
1358        uint8_t rss_size = 52; // 52 for i40e, 40 for others, use the largest by default
1359        // In new versions DPDK we can query the size
1360#if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
1361        struct rte_eth_dev_info dev_info;
1362        rte_eth_dev_info_get(format_data->port, &dev_info);
1363        rss_size = dev_info.hash_key_size;
1364#endif
1365        if (rss_size != 0) {
1366                format_data->rss_key = malloc(rss_size);
1367                if (format_data->hasher_type == HASHER_BIDIRECTIONAL) {
1368                        toeplitz_ncreate_bikey(format_data->rss_key, rss_size);
1369                } else {
1370                        toeplitz_ncreate_unikey(format_data->rss_key, rss_size);
1371                }
1372                port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1373#if RTE_VERSION >= RTE_VERSION_NUM(1, 7, 0, 1)
1374                port_conf.rx_adv_conf.rss_conf.rss_key_len = rss_size;
1375#endif
1376        } else {
1377                fprintf(stderr, "DPDK couldn't configure RSS hashing!");
1378        }
1379
1380        /* ----------- Now do the setup for the port mapping ------------ */
1381        /* Order of calls must be
1382         * rte_eth_dev_configure()
1383         * rte_eth_tx_queue_setup()
1384         * rte_eth_rx_queue_setup()
1385         * rte_eth_dev_start()
1386         * other rte_eth calls
1387         */
1388
1389        /* This must be called first before another *eth* function
1390         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1391        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1392        if (ret < 0) {
1393                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1394                         " %"PRIu8" : %s", format_data->port,
1395                         strerror(-ret));
1396                return -1;
1397        }
1398#if DEBUG
1399        fprintf(stderr, "Doing dev configure\n");
1400#endif
1401        /* Initialise the TX queue a minimum value if using this port for
1402         * receiving. Otherwise a larger size if writing packets.
1403         */
1404        ret = rte_eth_tx_queue_setup(format_data->port,
1405                                     0 /* queue XXX */,
1406                                     format_data->nb_tx_buf,
1407                                     SOCKET_ID_ANY,
1408                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1409        if (ret < 0) {
1410                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1411                         " on port %"PRIu8" : %s", format_data->port,
1412                         strerror(-ret));
1413                return -1;
1414        }
1415
1416        /* Attach memory to our RX queues */
1417        for (i=0; i < rx_queues; i++) {
1418                dpdk_per_stream_t *stream;
1419#if DEBUG
1420                fprintf(stderr, "Configuring queue %d\n", i);
1421#endif
1422
1423                /* Add storage for the stream */
1424                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1425                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1426                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1427                stream->queue_id = i;
1428
1429                if (stream->lcore == -1)
1430                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1431
1432                if (stream->lcore == -1) {
1433                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1434                                 ". Too many threads?");
1435                        return -1;
1436                }
1437
1438                if (stream->mempool == NULL) {
1439                        stream->mempool = dpdk_alloc_memory(
1440                                                  format_data->nb_rx_buf*2,
1441                                                  format_data->snaplen,
1442                                                  rte_lcore_to_socket_id(stream->lcore));
1443
1444                        if (stream->mempool == NULL) {
1445                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1446                                         "pool failed: %s", strerror(rte_errno));
1447                                return -1;
1448                        }
1449                }
1450
1451                /* Initialise the RX queue with some packets from memory */
1452                ret = rte_eth_rx_queue_setup(format_data->port,
1453                                             stream->queue_id,
1454                                             format_data->nb_rx_buf,
1455                                             format_data->nic_numa_node,
1456                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1457                                             stream->mempool);
1458                if (ret < 0) {
1459                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1460                                 " RX queue on port %"PRIu8" : %s",
1461                                 format_data->port,
1462                                 strerror(-ret));
1463                        return -1;
1464                }
1465        }
1466
1467#if DEBUG
1468        fprintf(stderr, "Doing start device\n");
1469#endif
1470        rte_eth_stats_reset(format_data->port);
1471        /* Start device */
1472        ret = rte_eth_dev_start(format_data->port);
1473        if (ret < 0) {
1474                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1475                         strerror(-ret));
1476                return -1;
1477        }
1478
1479        /* Default promiscuous to on */
1480        if (format_data->promisc == -1)
1481                format_data->promisc = 1;
1482
1483        if (format_data->promisc == 1)
1484                rte_eth_promiscuous_enable(format_data->port);
1485        else
1486                rte_eth_promiscuous_disable(format_data->port);
1487
1488        /* We have now successfully started/unpased */
1489        format_data->paused = DPDK_RUNNING;
1490
1491
1492        /* Register a callback for link state changes */
1493        ret = rte_eth_dev_callback_register(format_data->port,
1494                                            RTE_ETH_EVENT_INTR_LSC,
1495                                            dpdk_lsc_callback,
1496                                            format_data);
1497#if DEBUG
1498        if (ret)
1499                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1500                        ret, strerror(-ret));
1501#endif
1502
1503        /* Get the current link status */
1504        rte_eth_link_get_nowait(format_data->port, &link_info);
1505        format_data->link_speed = link_info.link_speed;
1506#if DEBUG
1507        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1508                (int) link_info.link_duplex, (int) link_info.link_speed);
1509#endif
1510
1511        return 0;
1512}
1513
1514static int dpdk_start_input (libtrace_t *libtrace) {
1515        char err[500];
1516        err[0] = 0;
1517
1518        /* Make sure we don't reserve an extra thread for this */
1519        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1520
1521        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1522                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1523                free(libtrace->format_data);
1524                libtrace->format_data = NULL;
1525                return -1;
1526        }
1527        return 0;
1528}
1529
1530static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1531        struct rte_eth_dev_info dev_info;
1532        rte_eth_dev_info_get(port_id, &dev_info);
1533        return dev_info.max_rx_queues;
1534}
1535
1536static inline size_t dpdk_processor_count () {
1537        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1538        if (nb_cpu <= 0)
1539                return 1;
1540        else
1541                return (size_t) nb_cpu;
1542}
1543
1544static int dpdk_pstart_input (libtrace_t *libtrace) {
1545        char err[500];
1546        int i=0, phys_cores=0;
1547        int tot = libtrace->perpkt_thread_count;
1548        libtrace_list_node_t *n;
1549        err[0] = 0;
1550
1551        if (rte_lcore_id() != rte_get_master_lcore())
1552                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1553                        " from the master DPDK thread!\n");
1554
1555        /* If the master is not on the last thread we move it there */
1556        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1557                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1558                        return -1;
1559        }
1560
1561        /* Don't exceed the number of cores in the system/detected by dpdk
1562         * We don't have to force this but performance wont be good if we don't */
1563        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1564                if (lcore_config[i].detected) {
1565                        if (rte_lcore_is_enabled(i)) {
1566#if DEBUG
1567                                fprintf(stderr, "Found core %d already in use!\n", i);
1568#endif
1569                        } else {
1570                                phys_cores++;
1571                        }
1572                }
1573        }
1574        /* If we are restarting we have already allocated some threads as such
1575         * we add these back to the count for this calculation */
1576        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1577                dpdk_per_stream_t * stream = n->data;
1578                if (stream->lcore != -1)
1579                        phys_cores++;
1580        }
1581
1582        tot = MIN(libtrace->perpkt_thread_count,
1583                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1584        tot = MIN(tot, phys_cores);
1585
1586#if DEBUG
1587        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1588                libtrace->perpkt_thread_count, phys_cores);
1589#endif
1590
1591        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1592                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1593                free(libtrace->format_data);
1594                libtrace->format_data = NULL;
1595                return -1;
1596        }
1597
1598        /* Make sure we only start the number that we should */
1599        libtrace->perpkt_thread_count = tot;
1600        return 0;
1601}
1602
1603/**
1604 * Register a thread with the DPDK system,
1605 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1606 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1607 * gives it.
1608 *
1609 * We then allow a mapper thread to be started on every real core as DPDK would,
1610 * we also bind these to the corresponding CPU cores.
1611 *
1612 * @param libtrace A pointer to the trace
1613 * @param reading True if the thread will be used to read packets, i.e. will
1614 *                call pread_packet(), false if thread used to process packet
1615 *                in any other manner including statistics functions.
1616 */
1617static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1618{
1619#if DEBUG
1620        char name[99];
1621        name[0] = 0;
1622#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1623        pthread_getname_np(pthread_self(),
1624                           name, sizeof(name));
1625#endif
1626#endif
1627        if (reading) {
1628                dpdk_per_stream_t *stream;
1629                /* Attach our thread */
1630                if(t->type == THREAD_PERPKT) {
1631                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1632                        if (t->format_data == NULL) {
1633                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1634                                              "Too many threads registered");
1635                                return -1;
1636                        }
1637                } else {
1638                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1639                }
1640                stream = t->format_data;
1641#if DEBUG
1642                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1643#endif
1644                return dpdk_register_lcore(libtrace, true, stream->lcore);
1645        } else {
1646                int lcore = dpdk_reserve_lcore(reading, 0);
1647                if (lcore == -1) {
1648                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1649                                      " for DPDK");
1650                        return -1;
1651                }
1652#if DEBUG
1653                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1654#endif
1655                return dpdk_register_lcore(libtrace, false, lcore);
1656        }
1657
1658        return 0;
1659}
1660
1661/**
1662 * Unregister a thread with the DPDK system.
1663 *
1664 * Only previously registered threads should be calling this just before
1665 * they are destroyed.
1666 */
1667static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1668{
1669        struct rte_config *cfg = rte_eal_get_configuration();
1670
1671        assert(rte_lcore_id() < RTE_MAX_LCORE);
1672        pthread_mutex_lock(&dpdk_lock);
1673        /* Skip if master */
1674        if (rte_lcore_id() == rte_get_master_lcore()) {
1675                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1676                pthread_mutex_unlock(&dpdk_lock);
1677                return;
1678        }
1679
1680        /* Disable this core in global DPDK structs */
1681        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1682        cfg->lcore_count--;
1683        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1684        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1685        pthread_mutex_unlock(&dpdk_lock);
1686        return;
1687}
1688
1689static int dpdk_start_output(libtrace_out_t *libtrace)
1690{
1691        char err[500];
1692        err[0] = 0;
1693
1694        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1695                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1696                free(libtrace->format_data);
1697                libtrace->format_data = NULL;
1698                return -1;
1699        }
1700        return 0;
1701}
1702
1703static int dpdk_pause_input(libtrace_t * libtrace) {
1704        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1705        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1706        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1707#if DEBUG
1708                fprintf(stderr, "Pausing DPDK port\n");
1709#endif
1710                rte_eth_dev_stop(FORMAT(libtrace)->port);
1711                FORMAT(libtrace)->paused = DPDK_PAUSED;
1712                /* Empty the queue of packets */
1713                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1714                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1715                }
1716                FORMAT(libtrace)->burst_offset = 0;
1717                FORMAT(libtrace)->burst_size = 0;
1718
1719                for (; tmp != NULL; tmp = tmp->next) {
1720                        dpdk_per_stream_t *stream = tmp->data;
1721                        stream->ts_last_sys = 0;
1722#if HAS_HW_TIMESTAMPS_82580
1723                        stream->ts_first_sys = 0;
1724#endif
1725                }
1726
1727        }
1728        return 0;
1729}
1730
1731static int dpdk_write_packet(libtrace_out_t *trace,
1732                             libtrace_packet_t *packet){
1733        struct rte_mbuf* m_buff[1];
1734
1735        int wirelen = trace_get_wire_length(packet);
1736        int caplen = trace_get_capture_length(packet);
1737
1738        /* Check for a checksum and remove it */
1739        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1740            wirelen == caplen)
1741                caplen -= ETHER_CRC_LEN;
1742
1743        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1744        if (m_buff[0] == NULL) {
1745                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1746                return -1;
1747        } else {
1748                int ret;
1749                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1750                do {
1751                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1752                } while (ret != 1);
1753        }
1754
1755        return 0;
1756}
1757
1758static int dpdk_fin_input(libtrace_t * libtrace) {
1759        libtrace_list_node_t * n;
1760        /* Free our memory structures */
1761        if (libtrace->format_data != NULL) {
1762
1763                if (FORMAT(libtrace)->port != 0xFF)
1764                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1765                                                        RTE_ETH_EVENT_INTR_LSC,
1766                                                        dpdk_lsc_callback,
1767                                                        FORMAT(libtrace));
1768                /* Close the device completely, device cannot be restarted */
1769                rte_eth_dev_close(FORMAT(libtrace)->port);
1770
1771                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1772                                 FORMAT(libtrace)->nic_numa_node);
1773
1774                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1775                        dpdk_per_stream_t * stream = n->data;
1776                        if (stream->mempool)
1777                                dpdk_free_memory(stream->mempool,
1778                                                 rte_lcore_to_socket_id(stream->lcore));
1779                }
1780
1781                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1782                /* filter here if we used it */
1783                if (FORMAT(libtrace)->rss_key)
1784                        free(FORMAT(libtrace)->rss_key);
1785                free(libtrace->format_data);
1786        }
1787
1788        return 0;
1789}
1790
1791
1792static int dpdk_fin_output(libtrace_out_t * libtrace) {
1793        /* Free our memory structures */
1794        if (libtrace->format_data != NULL) {
1795                /* Close the device completely, device cannot be restarted */
1796                if (FORMAT(libtrace)->port != 0xFF)
1797                        rte_eth_dev_close(FORMAT(libtrace)->port);
1798                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1799                /* filter here if we used it */
1800                free(libtrace->format_data);
1801        }
1802
1803        return 0;
1804}
1805
1806/**
1807 * Get the start of the additional header that we added to a packet.
1808 */
1809static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1810        assert(packet);
1811        assert(packet->buffer);
1812        /* Our header sits straight after the mbuf header */
1813        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1814}
1815
1816static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1817        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1818        return hdr->cap_len;
1819}
1820
1821static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1822        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1823        if (size > hdr->cap_len) {
1824                /* Cannot make a packet bigger */
1825                return trace_get_capture_length(packet);
1826        }
1827
1828        /* Reset the cached capture length first*/
1829        packet->capture_length = -1;
1830        hdr->cap_len = (uint32_t) size;
1831        return trace_get_capture_length(packet);
1832}
1833
1834static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1835        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1836        int org_cap_size; /* The original capture size */
1837        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1838                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1839                               sizeof(struct hw_timestamp_82580);
1840        } else {
1841                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1842        }
1843        if (hdr->flags & INCLUDES_CHECKSUM) {
1844                return org_cap_size;
1845        } else {
1846                /* DPDK packets are always TRACE_TYPE_ETH packets */
1847                return org_cap_size + ETHER_CRC_LEN;
1848        }
1849}
1850
1851static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1852        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1853        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1854                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1855                                sizeof(struct hw_timestamp_82580);
1856        else
1857                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1858}
1859
1860static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1861                               libtrace_packet_t *packet, void *buffer,
1862                               libtrace_rt_types_t rt_type, uint32_t flags) {
1863        assert(packet);
1864        if (packet->buffer != buffer &&
1865            packet->buf_control == TRACE_CTRL_PACKET) {
1866                free(packet->buffer);
1867        }
1868
1869        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1870                packet->buf_control = TRACE_CTRL_PACKET;
1871        else
1872                packet->buf_control = TRACE_CTRL_EXTERNAL;
1873
1874        packet->buffer = buffer;
1875        packet->header = buffer;
1876
1877        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1878        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1879        packet->type = rt_type;
1880        return 0;
1881}
1882
1883/**
1884 * Given a packet size and a link speed, computes the
1885 * time to transmit in nanoseconds.
1886 *
1887 * @param format_data The dpdk format data from which we get the link speed
1888 *        and if unset updates it in a thread safe manner
1889 * @param pkt_size The size of the packet in bytes
1890 * @return The wire time in nanoseconds
1891 */
1892static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1893        uint32_t wire_time;
1894        /* 20 extra bytes of interframe gap and preamble */
1895# if GET_MAC_CRC_CHECKSUM
1896        wire_time = ((pkt_size + 20) * 8000);
1897# else
1898        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1899# endif
1900
1901        /* Division is really slow and introduces a pipeline stall
1902         * The compiler will optimise this into magical multiplication and shifting
1903         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1904         */
1905retry_calc_wiretime:
1906        switch (format_data->link_speed) {
1907        case ETH_SPEED_NUM_40G:
1908                wire_time /=  ETH_SPEED_NUM_40G;
1909                break;
1910        case ETH_SPEED_NUM_20G:
1911                wire_time /= ETH_SPEED_NUM_20G;
1912                break;
1913        case ETH_SPEED_NUM_10G:
1914                wire_time /= ETH_SPEED_NUM_10G;
1915                break;
1916        case ETH_SPEED_NUM_1G:
1917                wire_time /= ETH_SPEED_NUM_1G;
1918                break;
1919        case 0:
1920                {
1921                /* Maybe the link was down originally, but now it should be up */
1922                struct rte_eth_link link = {0};
1923                rte_eth_link_get_nowait(format_data->port, &link);
1924                if (link.link_status && link.link_speed) {
1925                        format_data->link_speed = link.link_speed;
1926#ifdef DEBUG
1927                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1928#endif
1929                        goto retry_calc_wiretime;
1930                }
1931                /* We don't know the link speed, make sure numbers are counting up */
1932                wire_time = 1;
1933                break;
1934                }
1935        default:
1936                wire_time /= format_data->link_speed;
1937        }
1938        return wire_time;
1939}
1940
1941/**
1942 * Does any extra preperation to all captured packets
1943 * This includes adding our extra header to it with the timestamp,
1944 * and any snapping
1945 *
1946 * @param format_data The DPDK format data
1947 * @param plc The DPDK per lcore format data
1948 * @param pkts An array of size nb_pkts of DPDK packets
1949 */
1950static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1951                                   struct dpdk_per_stream_t *plc,
1952                                   struct rte_mbuf **pkts,
1953                                   size_t nb_pkts) {
1954        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1955        struct dpdk_addt_hdr *hdr;
1956        size_t i;
1957        uint64_t cur_sys_time_ns;
1958#if HAS_HW_TIMESTAMPS_82580
1959        struct hw_timestamp_82580 *hw_ts;
1960        uint64_t estimated_wraps;
1961#else
1962
1963#endif
1964
1965#if USE_CLOCK_GETTIME
1966        struct timespec cur_sys_time = {0};
1967        /* This looks terrible and I feel bad doing it. But it's OK
1968         * on new kernels, because this is a fast vsyscall */
1969        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1970        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1971#else
1972        struct timeval cur_sys_time = {0};
1973        /* Also a fast vsyscall */
1974        gettimeofday(&cur_sys_time, NULL);
1975        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1976#endif
1977
1978        /* The system clock is not perfect so when running
1979         * at linerate we could timestamp a packet in the past.
1980         * To avoid this we munge the timestamp to appear 1ns
1981         * after the previous packet. We should eventually catch up
1982         * to system time since a 64byte packet on a 10G link takes 67ns.
1983         *
1984         * Note with parallel readers timestamping packets
1985         * with duplicate stamps or out of order is unavoidable without
1986         * hardware timestamping from the NIC.
1987         */
1988#if !HAS_HW_TIMESTAMPS_82580
1989        if (plc->ts_last_sys >= cur_sys_time_ns) {
1990                cur_sys_time_ns = plc->ts_last_sys + 1;
1991        }
1992#endif
1993
1994        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1995        for (i = 0 ; i < nb_pkts ; ++i) {
1996
1997                /* We put our header straight after the dpdk header */
1998                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1999                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
2000
2001#if GET_MAC_CRC_CHECKSUM
2002                /* Add back in the CRC sum */
2003                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
2004                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
2005                hdr->flags |= INCLUDES_CHECKSUM;
2006#endif
2007
2008                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
2009
2010#if HAS_HW_TIMESTAMPS_82580
2011                /* The timestamp is sitting before our packet and is included in pkt_len */
2012                hdr->flags |= INCLUDES_HW_TIMESTAMP;
2013                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
2014                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
2015
2016                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
2017                 *
2018                 *        +----------+---+   +--------------+
2019                 *  82580 |    24    | 8 |   |      32      |
2020                 *        +----------+---+   +--------------+
2021                 *          reserved  \______ 40 bits _____/
2022                 *
2023                 * The 40 bit 82580 SYSTIM overflows every
2024                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
2025                 *
2026                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
2027                 * Endian (for the full 64 bits) i.e. picture is mirrored
2028                 */
2029
2030                /* Despite what the documentation says this is in Little
2031                 * Endian byteorder. Mask the reserved section out.
2032                 */
2033                hdr->timestamp = le64toh(hw_ts->timestamp) &
2034                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
2035
2036                if (unlikely(plc->ts_first_sys == 0)) {
2037                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
2038                        plc->ts_last_sys = plc->ts_first_sys;
2039                }
2040
2041                /* This will have serious problems if packets aren't read quickly
2042                 * that is within a couple of seconds because our clock cycles every
2043                 * 18 seconds */
2044                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
2045                                  / (1ull<<TS_NBITS_82580);
2046
2047                /* Estimated_wraps gives the number of times the counter should have
2048                 * wrapped (however depending on value last time it could have wrapped
2049                 * twice more (if hw clock is close to its max value) or once less (allowing
2050                 * for a bit of variance between hw and sys clock). But if the clock
2051                 * shouldn't have wrapped once then don't allow it to go backwards in time */
2052                if (unlikely(estimated_wraps >= 2)) {
2053                        /* 2 or more wrap arounds add all but the very last wrap */
2054                        plc->wrap_count += estimated_wraps - 1;
2055                }
2056
2057                /* Set the timestamp to the lowest possible value we're considering */
2058                hdr->timestamp += plc->ts_first_sys +
2059                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
2060
2061                /* In most runs only the first if() will need evaluating - i.e our
2062                 * estimate is correct. */
2063                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
2064                                              hdr->timestamp, MAXSKEW_82580))) {
2065                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
2066                        plc->wrap_count++;
2067                        hdr->timestamp += (1ull<<TS_NBITS_82580);
2068                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
2069                                             hdr->timestamp, MAXSKEW_82580)) {
2070                                /* Failed to match estimated_wraps */
2071                                plc->wrap_count++;
2072                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2073                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2074                                                     hdr->timestamp, MAXSKEW_82580)) {
2075                                        if (estimated_wraps == 0) {
2076                                                /* 0 case Failed to match estimated_wraps+2 */
2077                                                printf("WARNING - Hardware Timestamp failed to"
2078                                                       " match using systemtime!\n");
2079                                                hdr->timestamp = cur_sys_time_ns;
2080                                        } else {
2081                                                /* Failed to match estimated_wraps+1 */
2082                                                plc->wrap_count++;
2083                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2084                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2085                                                                     hdr->timestamp, MAXSKEW_82580)) {
2086                                                        /* Failed to match estimated_wraps+2 */
2087                                                        printf("WARNING - Hardware Timestamp failed to"
2088                                                               " match using systemtime!!\n");
2089                                                }
2090                                        }
2091                                }
2092                        }
2093                }
2094#else
2095
2096                hdr->timestamp = cur_sys_time_ns;
2097                /* Offset the next packet by the wire time of previous */
2098                calculate_wire_time(format_data, hdr->cap_len);
2099
2100#endif
2101        }
2102
2103        plc->ts_last_sys = cur_sys_time_ns;
2104        return;
2105}
2106
2107
2108static void dpdk_fin_packet(libtrace_packet_t *packet)
2109{
2110        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2111                rte_pktmbuf_free(packet->buffer);
2112                packet->buffer = NULL;
2113        }
2114}
2115
2116/** Reads at least one packet or returns an error
2117 */
2118static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
2119                                           dpdk_per_stream_t *stream,
2120                                           libtrace_message_queue_t *mesg,
2121                                           struct rte_mbuf* pkts_burst[],
2122                                           size_t nb_packets) {
2123        size_t nb_rx; /* Number of rx packets we've recevied */
2124        while (1) {
2125                /* Poll for a batch of packets */
2126                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2127                                         stream->queue_id, pkts_burst, nb_packets);
2128                if (nb_rx > 0) {
2129                        /* Got some packets - otherwise we keep spining */
2130                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
2131                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2132                        return nb_rx;
2133                }
2134                /* Check the message queue this could be less than 0 */
2135                if (mesg && libtrace_message_queue_count(mesg) > 0)
2136                        return READ_MESSAGE;
2137                if ((nb_rx=is_halted(libtrace)) != (size_t) -1)
2138                        return nb_rx;
2139                /* Wait a while, polling on memory degrades performance
2140                 * This relieves the pressure on memory allowing the NIC to DMA */
2141                rte_delay_us(10);
2142        }
2143
2144        /* We'll never get here - but if we did it would be bad */
2145        return READ_ERROR;
2146}
2147
2148static int dpdk_pread_packets (libtrace_t *libtrace,
2149                                    libtrace_thread_t *t,
2150                                    libtrace_packet_t **packets,
2151                                    size_t nb_packets) {
2152        int nb_rx; /* Number of rx packets we've recevied */
2153        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2154        int i;
2155        dpdk_per_stream_t *stream = t->format_data;
2156        struct dpdk_addt_hdr * hdr;
2157
2158        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
2159                                         pkts_burst, nb_packets);
2160
2161        if (nb_rx > 0) {
2162                for (i = 0; i < nb_rx; ++i) {
2163                        if (packets[i]->buffer != NULL) {
2164                                /* The packet should always be finished */
2165                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
2166                                free(packets[i]->buffer);
2167                        }
2168                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2169                        packets[i]->type = TRACE_RT_DATA_DPDK;
2170                        packets[i]->buffer = pkts_burst[i];
2171                        packets[i]->trace = libtrace;
2172                        packets[i]->error = 1;
2173                        hdr = (struct dpdk_addt_hdr *) 
2174                                        ((struct rte_mbuf*) pkts_burst[i] + 1);
2175                        packets[i]->order = hdr->timestamp;
2176                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2177                }
2178        }
2179
2180        return nb_rx;
2181}
2182
2183static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2184        int nb_rx; /* Number of rx packets we've received */
2185        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2186
2187        /* Free the last packet buffer */
2188        if (packet->buffer != NULL) {
2189                /* The packet should always be finished */
2190                assert(packet->buf_control == TRACE_CTRL_PACKET);
2191                free(packet->buffer);
2192                packet->buffer = NULL;
2193        }
2194
2195        packet->buf_control = TRACE_CTRL_EXTERNAL;
2196        packet->type = TRACE_RT_DATA_DPDK;
2197
2198        /* Check if we already have some packets buffered */
2199        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2200                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2201                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2202                return 1; // TODO should be bytes read, which essentially useless anyway
2203        }
2204
2205        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2206                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2207
2208        if (nb_rx > 0) {
2209                FORMAT(libtrace)->burst_size = nb_rx;
2210                FORMAT(libtrace)->burst_offset = 1;
2211                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2212                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2213                return 1;
2214        }
2215        return nb_rx;
2216}
2217
2218static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2219        struct timeval tv;
2220        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2221
2222        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2223        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2224        return tv;
2225}
2226
2227static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2228        struct timespec ts;
2229        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2230
2231        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2232        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2233        return ts;
2234}
2235
2236static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2237        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2238}
2239
2240static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2241        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2242        return (libtrace_direction_t) hdr->direction;
2243}
2244
2245static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2246        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2247        hdr->direction = (uint8_t) direction;
2248        return (libtrace_direction_t) hdr->direction;
2249}
2250
2251static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2252        struct rte_eth_stats dev_stats = {0};
2253
2254        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2255                return;
2256
2257        /* Grab the current stats */
2258        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2259
2260        stats->captured_valid = true;
2261        stats->captured = dev_stats.ipackets;
2262
2263        stats->dropped_valid = true;
2264        stats->dropped = dev_stats.imissed;
2265
2266#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2267        /* DPDK commit 86057c fixes ensures missed does not get counted as
2268         * errors */
2269        stats->errors_valid = true;
2270        stats->errors = dev_stats.ierrors;
2271#else
2272        /* DPDK errors includes drops */
2273        stats->errors_valid = true;
2274        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2275#endif
2276        stats->received_valid = true;
2277        stats->received = dev_stats.ipackets + dev_stats.imissed;
2278
2279}
2280
2281/* Attempts to read a packet in a non-blocking fashion. If one is not
2282 * available a SLEEP event is returned. We do not have the ability to
2283 * create a select()able file descriptor in DPDK.
2284 */
2285static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2286                                            libtrace_packet_t *packet) {
2287        libtrace_eventobj_t event = {0,0,0.0,0};
2288        size_t nb_rx; /* Number of received packets we've read */
2289
2290        do {
2291
2292                /* No packets waiting in our buffer? Try and read some more */
2293                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
2294                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2295                                                 FORMAT_DATA_FIRST(trace)->queue_id,
2296                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
2297                        if (nb_rx > 0) {
2298                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
2299                                                FORMAT(trace)->burst_pkts, nb_rx);
2300                                FORMAT(trace)->burst_size = nb_rx;
2301                                FORMAT(trace)->burst_offset = 0;
2302                        }
2303                }
2304
2305                /* Now do we have packets waiting? */
2306                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
2307                        /* Free the last packet buffer */
2308                        if (packet->buffer != NULL) {
2309                                /* The packet should always be finished */
2310                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2311                                free(packet->buffer);
2312                                packet->buffer = NULL;
2313                        }
2314
2315                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2316                        packet->type = TRACE_RT_DATA_DPDK;
2317                        event.type = TRACE_EVENT_PACKET;
2318                        packet->buffer = FORMAT(trace)->burst_pkts[
2319                                             FORMAT(trace)->burst_offset++];
2320                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2321                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2322
2323                        /* XXX - Check this passes the filter trace_read_packet normally
2324                         * does this for us but this wont */
2325                        if (trace->filter) {
2326                                if (!trace_apply_filter(trace->filter, packet)) {
2327                                        /* Failed the filter so we loop for another packet */
2328                                        trace->filtered_packets ++;
2329                                        continue;
2330                                }
2331                        }
2332                        trace->accepted_packets ++;
2333                } else {
2334                        /* We only want to sleep for a very short time - we are non-blocking */
2335                        event.type = TRACE_EVENT_SLEEP;
2336                        event.seconds = 0.0001;
2337                        event.size = 0;
2338                }
2339
2340                /* If we get here we have our event */
2341                break;
2342        } while (1);
2343
2344        return event;
2345}
2346
2347static void dpdk_help(void) {
2348        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
2349        printf("Supported input URIs:\n");
2350        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2351        printf("\tThe -<coreid> is optional \n");
2352        printf("\t e.g. dpdk:0000:01:00.1\n");
2353        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2354        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2355        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2356        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2357        printf("\n");
2358        printf("Supported output URIs:\n");
2359        printf("\tSame format as the input URI.\n");
2360        printf("\t e.g. dpdk:0000:01:00.1\n");
2361        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2362        printf("\n");
2363}
2364
2365static struct libtrace_format_t dpdk = {
2366        "dpdk",
2367        "$Id$",
2368        TRACE_FORMAT_DPDK,
2369        NULL,                               /* probe filename */
2370        NULL,                               /* probe magic */
2371        dpdk_init_input,                    /* init_input */
2372        dpdk_config_input,                  /* config_input */
2373        dpdk_start_input,                   /* start_input */
2374        dpdk_pause_input,                   /* pause_input */
2375        dpdk_init_output,                   /* init_output */
2376        NULL,                               /* config_output */
2377        dpdk_start_output,                  /* start_ouput */
2378        dpdk_fin_input,                     /* fin_input */
2379        dpdk_fin_output,                    /* fin_output */
2380        dpdk_read_packet,                   /* read_packet */
2381        dpdk_prepare_packet,                /* prepare_packet */
2382        dpdk_fin_packet,                    /* fin_packet */
2383        dpdk_write_packet,                  /* write_packet */
2384        dpdk_get_link_type,                 /* get_link_type */
2385        dpdk_get_direction,                 /* get_direction */
2386        dpdk_set_direction,                 /* set_direction */
2387        NULL,                               /* get_erf_timestamp */
2388        dpdk_get_timeval,                   /* get_timeval */
2389        dpdk_get_timespec,                  /* get_timespec */
2390        NULL,                               /* get_seconds */
2391        NULL,                               /* seek_erf */
2392        NULL,                               /* seek_timeval */
2393        NULL,                               /* seek_seconds */
2394        dpdk_get_capture_length,            /* get_capture_length */
2395        dpdk_get_wire_length,               /* get_wire_length */
2396        dpdk_get_framing_length,            /* get_framing_length */
2397        dpdk_set_capture_length,            /* set_capture_length */
2398        NULL,                               /* get_received_packets */
2399        NULL,                               /* get_filtered_packets */
2400        NULL,                               /* get_dropped_packets */
2401        dpdk_get_stats,                     /* get_statistics */
2402        NULL,                               /* get_fd */
2403        dpdk_trace_event,                   /* trace_event */
2404        dpdk_help,                          /* help */
2405        NULL,                               /* next pointer */
2406        {true, 8},                          /* Live, NICs typically have 8 threads */
2407        dpdk_pstart_input,                  /* pstart_input */
2408        dpdk_pread_packets,                 /* pread_packets */
2409        dpdk_pause_input,                   /* ppause */
2410        dpdk_fin_input,                     /* p_fin */
2411        dpdk_pregister_thread,              /* pregister_thread */
2412        dpdk_punregister_thread,            /* punregister_thread */
2413        NULL                                /* get thread stats */
2414};
2415
2416void dpdk_constructor(void) {
2417        register_format(&dpdk);
2418}
Note: See TracBrowser for help on using the repository browser.