source: lib/format_dpdk.c @ ee6e802

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since ee6e802 was ee6e802, checked in by Shane Alcock <salcock@…>, 4 years ago

Updated copyright blurb on all source files

In some cases, this meant adding copyright blurbs to files that
had never had them before.

  • Property mode set to 100644
File size: 74.4 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44
45#ifdef HAVE_INTTYPES_H
46#  include <inttypes.h>
47#else
48# error "Can't find inttypes.h"
49#endif
50
51#include <stdlib.h>
52#include <assert.h>
53#include <unistd.h>
54#include <endian.h>
55#include <string.h>
56
57#if HAVE_LIBNUMA
58#include <numa.h>
59#endif
60
61/* We can deal with any minor differences by checking the RTE VERSION
62 * Typically DPDK backports some fixes (typically for building against
63 * newer kernels) to the older version of DPDK.
64 *
65 * These get released with the rX suffix. The following macros where added
66 * in these new releases.
67 *
68 * Below this is a log of version that required changes to the libtrace
69 * code (that we still attempt to support).
70 *
71 * DPDK v1.7.1 is recommended.
72 * However 1.5 to 1.8 are likely supported.
73 */
74#include <rte_eal.h>
75#include <rte_version.h>
76#ifndef RTE_VERSION_NUM
77#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
78#endif
79#ifndef RTE_VER_PATCH_RELEASE
80#       define RTE_VER_PATCH_RELEASE 0
81#endif
82#ifndef RTE_VERSION
83#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
84        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
85#endif
86
87/* 1.6.0r2 :
88 *      rte_eal_pci_set_blacklist() is removed
89 *      device_list is renamed to pci_device_list
90 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
91 *      as such we do apply the whitelist before rte_eal_init.
92 *      This also works correctly with DPDK 1.6.0r2.
93 *
94 * Replaced by:
95 *      rte_devargs (we can simply whitelist)
96 */
97#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
98#       define DPDK_USE_BLACKLIST 1
99#else
100#       define DPDK_USE_BLACKLIST 0
101#endif
102
103/*
104 * 1.7.0 :
105 *      rte_pmd_init_all is removed
106 *
107 * Replaced by:
108 *      Nothing, no longer needed
109 */
110#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
111#       define DPDK_USE_PMD_INIT 1
112#else
113#       define DPDK_USE_PMD_INIT 0
114#endif
115
116/* 1.7.0-rc3 :
117 *
118 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
119 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
120 * it twice.
121 */
122#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
123#       define DPDK_USE_PCI_PROBE 1
124#else
125#       define DPDK_USE_PCI_PROBE 0
126#endif
127
128/* 1.8.0-rc1 :
129 * LOG LEVEL is a command line option which overrides what
130 * we previously set it to.
131 */
132#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
133#       define DPDK_USE_LOG_LEVEL 1
134#else
135#       define DPDK_USE_LOG_LEVEL 0
136#endif
137
138/* 1.8.0-rc2
139 * rx/tx_conf thresholds can be set to NULL in rte_eth_rx/tx_queue_setup
140 * this uses the default values, which are better tuned per device
141 * See issue #26
142 */
143#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 2)
144#       define DPDK_USE_NULL_QUEUE_CONFIG 1
145#else
146#       define DPDK_USE_NULL_QUEUE_CONFIG 0
147#endif
148
149#include <rte_per_lcore.h>
150#include <rte_debug.h>
151#include <rte_errno.h>
152#include <rte_common.h>
153#include <rte_log.h>
154#include <rte_memcpy.h>
155#include <rte_prefetch.h>
156#include <rte_branch_prediction.h>
157#include <rte_pci.h>
158#include <rte_ether.h>
159#include <rte_ethdev.h>
160#include <rte_ring.h>
161#include <rte_mempool.h>
162#include <rte_mbuf.h>
163#include <rte_launch.h>
164#include <rte_lcore.h>
165#include <rte_per_lcore.h>
166#include <rte_cycles.h>
167#include <pthread.h>
168#ifdef __FreeBSD__
169#include <pthread_np.h>
170#endif
171
172
173/* The default size of memory buffers to use - This is the max size of standard
174 * ethernet packet less the size of the MAC CHECKSUM */
175#define RX_MBUF_SIZE 1514
176
177/* The minimum number of memory buffers per queue tx or rx. Search for
178 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
179 */
180#define MIN_NB_BUF 64
181
182/* Number of receive memory buffers to use
183 * By default this is limited by driver to 4k and must be a multiple of 128.
184 * A modification can be made to the driver to remove this limit.
185 * This can be increased in the driver and here.
186 * Should be at least MIN_NB_BUF.
187 */
188#define NB_RX_MBUF 4096
189
190/* Number of send memory buffers to use.
191 * Same limits apply as those to NB_TX_MBUF.
192 */
193#define NB_TX_MBUF 1024
194
195/* The size of the PCI blacklist needs to be big enough to contain
196 * every PCI device address (listed by lspci every bus:device.function tuple).
197 */
198#define BLACK_LIST_SIZE 50
199
200/* The maximum number of characters the mempool name can be */
201#define MEMPOOL_NAME_LEN 20
202
203/* For single threaded libtrace we read packets as a batch/burst
204 * this is the maximum size of said burst */
205#define BURST_SIZE 50
206
207#define MBUF(x) ((struct rte_mbuf *) x)
208/* Get the original placement of the packet data */
209#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
210#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
211#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
212
213#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
214#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
215
216#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
217                        (uint64_t) tv.tv_usec*1000ull)
218#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
219                        (uint64_t) ts.tv_nsec)
220
221#if RTE_PKTMBUF_HEADROOM != 128
222#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
223         "any libtrace instance processing these packet must be have the" \
224         "same RTE_PKTMBUF_HEADROOM set"
225#endif
226
227/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
228 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
229 *
230 * Make sure you understand what these are doing before enabling them.
231 * They might make traces incompatable with other builds etc.
232 *
233 * These are also included to show how to do somethings which aren't
234 * obvious in the DPDK documentation.
235 */
236
237/* Print verbose messages to stderr */
238#define DEBUG 0
239
240/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
241 * only turn on if you know clock_gettime is a vsyscall on your system
242 * overwise could be a large overhead. Again gettimeofday() should be
243 * vsyscall also if it's not you should seriously consider updating your
244 * kernel.
245 */
246#ifdef HAVE_CLOCK_GETTIME
247/* You can turn this on (set to 1) to prefer clock_gettime */
248#define USE_CLOCK_GETTIME 1
249#else
250/* DON'T CHANGE THIS !!! */
251#define USE_CLOCK_GETTIME 0
252#endif
253
254/* This is fairly safe to turn on - currently there appears to be a 'bug'
255 * in DPDK that will remove the checksum by making the packet appear 4bytes
256 * smaller than what it really is. Most formats don't include the checksum
257 * hence writing out a port such as int: ring: and dpdk: assumes there
258 * is no checksum and will attempt to write the checksum as part of the
259 * packet
260 */
261#define GET_MAC_CRC_CHECKSUM 0
262
263/* This requires a modification of the pmd drivers (inside Intel DPDK)
264 * TODO this requires updating (packet sizes are wrong TS most likely also)
265 */
266#define HAS_HW_TIMESTAMPS_82580 0
267
268#if HAS_HW_TIMESTAMPS_82580
269# define TS_NBITS_82580     40
270/* The maximum on the +ve or -ve side that we can be, make it half way */
271# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
272#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
273#endif
274
275static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
276/* Memory pools Per NUMA node */
277static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
278
279/* As per Intel 82580 specification - mismatch in 82580 datasheet
280 * it states ts is stored in Big Endian, however its actually Little */
281struct hw_timestamp_82580 {
282        uint64_t reserved;
283        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
284};
285
286enum paused_state {
287        DPDK_NEVER_STARTED,
288        DPDK_RUNNING,
289        DPDK_PAUSED,
290};
291
292struct dpdk_per_stream_t
293{
294        uint16_t queue_id;
295        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
296        struct rte_mempool *mempool;
297        int lcore;
298#if HAS_HW_TIMESTAMPS_82580
299        /* Timestamping only relevent to RX */
300        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
301        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
302#endif
303} ALIGN_STRUCT(CACHE_LINE_SIZE);
304
305#if HAS_HW_TIMESTAMPS_82580
306#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
307#else
308#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
309#endif
310
311typedef struct dpdk_per_stream_t dpdk_per_stream_t;
312
313/* Used by both input and output however some fields are not used
314 * for output */
315struct dpdk_format_data_t {
316        int8_t promisc; /* promiscuous mode - RX only */
317        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
318        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
319        uint8_t paused; /* See paused_state */
320        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
321        int snaplen; /* The snap length for the capture - RX only */
322        /* We always have to setup both rx and tx queues even if we don't want them */
323        int nb_rx_buf; /* The number of packet buffers in the rx ring */
324        int nb_tx_buf; /* The number of packet buffers in the tx ring */
325        int nic_numa_node; /* The NUMA node that the NIC is attached to */
326        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
327#if DPDK_USE_BLACKLIST
328        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
329        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
330#endif
331        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
332        uint8_t rss_key[40]; // This is the RSS KEY
333        /* To improve single-threaded performance we always batch reading
334         * packets, in a burst, otherwise the parallel library does this for us */
335        struct rte_mbuf* burst_pkts[BURST_SIZE];
336        int burst_size; /* The total number read in the burst */
337        int burst_offset; /* The offset we are into the burst */
338
339        /* Our parallel streams */
340        libtrace_list_t *per_stream;
341};
342
343enum dpdk_addt_hdr_flags {
344        INCLUDES_CHECKSUM = 0x1,
345        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
346};
347
348/**
349 * A structure placed in front of the packet where we can store
350 * additional information about the given packet.
351 * +--------------------------+
352 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
353 * +--------------------------+
354 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
355 * +--------------------------+
356 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
357 * +--------------------------+
358 * *   hw_timestamp_82580     * 16 bytes Optional
359 * +--------------------------+
360 * |       Packet data        | Variable Size
361 * |                          |
362 */
363struct dpdk_addt_hdr {
364        uint64_t timestamp;
365        uint8_t flags;
366        uint8_t direction;
367        uint8_t reserved1;
368        uint8_t reserved2;
369        uint32_t cap_len; /* The size to say the capture is */
370};
371
372/**
373 * We want to blacklist all devices except those on the whitelist
374 * (I say list, but yes it is only the one).
375 *
376 * The default behaviour of rte_pci_probe() will map every possible device
377 * to its DPDK driver. The DPDK driver will take the ethernet device
378 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
379 *
380 * So blacklist all devices except the one that we wish to use so that
381 * the others can still be used as standard ethernet ports.
382 *
383 * @return 0 if successful, otherwise -1 on error.
384 */
385#if DPDK_USE_BLACKLIST
386static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
387{
388        struct rte_pci_device *dev = NULL;
389        format_data->nb_blacklist = 0;
390
391        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
392
393        TAILQ_FOREACH(dev, &device_list, next) {
394        if (whitelist != NULL && whitelist->domain == dev->addr.domain
395            && whitelist->bus == dev->addr.bus
396            && whitelist->devid == dev->addr.devid
397            && whitelist->function == dev->addr.function)
398            continue;
399                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
400                                / sizeof (format_data->blacklist[0])) {
401                        fprintf(stderr, "Warning: too many devices to blacklist consider"
402                                        " increasing BLACK_LIST_SIZE");
403                        break;
404                }
405                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
406                ++format_data->nb_blacklist;
407        }
408
409        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
410        return 0;
411}
412#else /* DPDK_USE_BLACKLIST */
413#include <rte_devargs.h>
414static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
415{
416        char pci_str[20] = {0};
417        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
418                 whitelist->domain,
419                 whitelist->bus,
420                 whitelist->devid,
421                 whitelist->function);
422        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
423                return -1;
424        }
425        return 0;
426}
427#endif
428
429/**
430 * Parse the URI format as a pci address
431 * Fills in addr, note core is optional and is unchanged if
432 * a value for it is not provided.
433 *
434 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
435 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
436 */
437static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
438        int matches;
439        assert(str);
440        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
441                         &addr->domain, &addr->bus, &addr->devid,
442                         &addr->function, core);
443        if (matches >= 4) {
444                return 0;
445        } else {
446                return -1;
447        }
448}
449
450/**
451 * Convert a pci address to the numa node it is
452 * connected to.
453 *
454 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
455 * so we can call it before DPDK
456 *
457 * @return -1 if unknown otherwise a number 0 or higher of the numa node
458 */
459static int pci_to_numa(struct rte_pci_addr * dev_addr) {
460        char path[50] = {0};
461        FILE *file;
462
463        /* Read from the system */
464        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
465                 dev_addr->domain,
466                 dev_addr->bus,
467                 dev_addr->devid,
468                 dev_addr->function);
469
470        if((file = fopen(path, "r")) != NULL) {
471                int numa_node = -1;
472                fscanf(file, "%d", &numa_node);
473                fclose(file);
474                return numa_node;
475        }
476        return -1;
477}
478
479#if DEBUG
480/* For debugging */
481static inline void dump_configuration()
482{
483        struct rte_config * global_config;
484        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
485
486        if (nb_cpu <= 0) {
487                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
488                       " Falling back to the first core.");
489                nb_cpu = 1; /* fallback to just 1 core */
490        }
491        if (nb_cpu > RTE_MAX_LCORE)
492                nb_cpu = RTE_MAX_LCORE;
493
494        global_config = rte_eal_get_configuration();
495
496        if (global_config != NULL) {
497                int i;
498                fprintf(stderr, "Intel DPDK setup\n"
499                        "---Version      : %s\n"
500                        "---Master LCore : %"PRIu32"\n"
501                        "---LCore Count  : %"PRIu32"\n",
502                        rte_version(),
503                        global_config->master_lcore, global_config->lcore_count);
504
505                for (i = 0 ; i < nb_cpu; i++) {
506                        fprintf(stderr, "   ---Core %d : %s\n", i,
507                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
508                }
509
510                const char * proc_type;
511                switch (global_config->process_type) {
512                case RTE_PROC_AUTO:
513                        proc_type = "auto";
514                        break;
515                case RTE_PROC_PRIMARY:
516                        proc_type = "primary";
517                        break;
518                case RTE_PROC_SECONDARY:
519                        proc_type = "secondary";
520                        break;
521                case RTE_PROC_INVALID:
522                        proc_type = "invalid";
523                        break;
524                default:
525                        proc_type = "something worse than invalid!!";
526                }
527                fprintf(stderr, "---Process Type : %s\n", proc_type);
528        }
529
530}
531#endif
532
533/**
534 * Expects to be called from the master lcore and moves it to the given dpdk id
535 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
536 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
537 *               and not already in use.
538 * @return 0 is successful otherwise -1 on error.
539 */
540static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
541        struct rte_config *cfg = rte_eal_get_configuration();
542        cpu_set_t cpuset;
543        int i;
544
545        assert (core < RTE_MAX_LCORE);
546        assert (rte_get_master_lcore() == rte_lcore_id());
547
548        if (core == rte_lcore_id())
549                return 0;
550
551        /* Make sure we are not overwriting someone else */
552        assert(!rte_lcore_is_enabled(core));
553
554        /* Move the core */
555        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
556        cfg->lcore_role[core] = ROLE_RTE;
557        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
558        rte_eal_get_configuration()->master_lcore = core;
559        RTE_PER_LCORE(_lcore_id) = core;
560
561        /* Now change the affinity, either mapped to a single core or all accepted */
562        CPU_ZERO(&cpuset);
563
564        if (lcore_config[core].detected) {
565                CPU_SET(core, &cpuset);
566        } else {
567                for (i = 0; i < RTE_MAX_LCORE; ++i) {
568                        if (lcore_config[i].detected)
569                                CPU_SET(i, &cpuset);
570                }
571        }
572
573        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
574        if (i != 0) {
575                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
576                return -1;
577        }
578        return 0;
579}
580
581/**
582 * XXX This is very bad XXX
583 * But we have to do something to allow getopts nesting
584 * Luckly normally the format is last so it doesn't matter
585 * DPDK only supports modern systems so hopefully this
586 * will continue to work
587 */
588struct saved_getopts {
589        char *optarg;
590        int optind;
591        int opterr;
592        int optopt;
593};
594
595static void save_getopts(struct saved_getopts *opts) {
596        opts->optarg = optarg;
597        opts->optind = optind;
598        opts->opterr = opterr;
599        opts->optopt = optopt;
600}
601
602static void restore_getopts(struct saved_getopts *opts) {
603        optarg = opts->optarg;
604        optind = opts->optind;
605        opterr = opts->opterr;
606        optopt = opts->optopt;
607}
608
609static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
610                                        char * err, int errlen) {
611        int ret; /* Returned error codes */
612        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
613        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
614        char mem_map[20] = {0}; /* The memory name */
615        long nb_cpu; /* The number of CPUs in the system */
616        long my_cpu; /* The CPU number we want to bind to */
617        int i;
618        struct rte_config *cfg = rte_eal_get_configuration();
619        struct saved_getopts save_opts;
620
621        /* This initialises the Environment Abstraction Layer (EAL)
622         * If we had slave workers these are put into WAITING state
623         *
624         * Basically binds this thread to a fixed core, which we choose as
625         * the last core on the machine (assuming fewer interrupts mapped here).
626         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
627         * "-n" the number of memory channels into the CPU (hardware specific)
628         *      - Most likely to be half the number of ram slots in your machine.
629         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
630         * Controls where in memory packets are stored such that they are spread
631         * across the channels. We just use 1 to be safe.
632         *
633         * Using unique file prefixes mean separate memory is used, unlinking
634         * the two processes. However be careful we still cannot access a
635         * port that already in use.
636         */
637        char* argv[] = {"libtrace",
638                        "-c", cpu_number,
639                        "-n", "1",
640                        "--proc-type", "auto",
641                        "--file-prefix", mem_map,
642                        "-m", "512",
643#if DPDK_USE_LOG_LEVEL
644#       if DEBUG
645                        "--log-level", "8", /* RTE_LOG_DEBUG */
646#       else
647                        "--log-level", "5", /* RTE_LOG_WARNING */
648#       endif
649#endif
650                        NULL};
651        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
652
653#if DEBUG
654        rte_set_log_level(RTE_LOG_DEBUG);
655#else
656        rte_set_log_level(RTE_LOG_WARNING);
657#endif
658
659        /* Get the number of cpu cores in the system and use the last core
660         * on the correct numa node */
661        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
662        if (nb_cpu <= 0) {
663                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
664                       " Falling back to the first core.");
665                nb_cpu = 1; /* fallback to the first core */
666        }
667        if (nb_cpu > RTE_MAX_LCORE)
668                nb_cpu = RTE_MAX_LCORE;
669
670        my_cpu = -1;
671        /* This allows the user to specify the core - we would try to do this
672         * automatically but it's hard to tell that this is secondary
673         * before running rte_eal_init(...). Currently we are limited to 1
674         * instance per core due to the way memory is allocated. */
675        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
676                snprintf(err, errlen, "Failed to parse URI");
677                return -1;
678        }
679
680#if HAVE_LIBNUMA
681        format_data->nic_numa_node = pci_to_numa(&use_addr);
682        if (my_cpu < 0) {
683#if DEBUG
684                /* If we can assign to a core on the same numa node */
685                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
686#endif
687                if(format_data->nic_numa_node >= 0) {
688                        int max_node_cpu = -1;
689                        struct bitmask *mask = numa_allocate_cpumask();
690                        assert(mask);
691                        numa_node_to_cpus(format_data->nic_numa_node, mask);
692                        for (i = 0 ; i < nb_cpu; ++i) {
693                                if (numa_bitmask_isbitset(mask,i))
694                                        max_node_cpu = i+1;
695                        }
696                        my_cpu = max_node_cpu;
697                }
698        }
699#endif
700        if (my_cpu < 0) {
701                my_cpu = nb_cpu;
702        }
703
704
705        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
706                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
707
708        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
709                snprintf(err, errlen,
710                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
711                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
712                return -1;
713        }
714
715        /* Make our mask with all cores turned on this is so that DPDK
716         * gets all CPU info in older versions */
717        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
718        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
719
720#if !DPDK_USE_BLACKLIST
721        /* Black list all ports besides the one that we want to use */
722        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
723                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
724                         " are you sure the address is correct?: %s", strerror(-ret));
725                return -1;
726        }
727#endif
728
729        /* Give the memory map a unique name */
730        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
731        /* rte_eal_init it makes a call to getopt so we need to reset the
732         * global optind variable of getopt otherwise this fails */
733        save_getopts(&save_opts);
734        optind = 1;
735        if ((ret = rte_eal_init(argc, argv)) < 0) {
736                snprintf(err, errlen,
737                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
738                return -1;
739        }
740        restore_getopts(&save_opts);
741        // These are still running but will never do anything with DPDK v1.7 we
742        // should remove this XXX in the future
743        for(i = 0; i < RTE_MAX_LCORE; ++i) {
744                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
745                        cfg->lcore_role[i] = ROLE_OFF;
746                        cfg->lcore_count--;
747                }
748        }
749        // Only the master should be running
750        assert(cfg->lcore_count == 1);
751
752        // TODO XXX TODO
753        dpdk_move_master_lcore(NULL, my_cpu-1);
754
755#if DEBUG
756        dump_configuration();
757#endif
758
759#if DPDK_USE_PMD_INIT
760        /* This registers all available NICs with Intel DPDK
761         * These are not loaded until rte_eal_pci_probe() is called.
762         */
763        if ((ret = rte_pmd_init_all()) < 0) {
764                snprintf(err, errlen,
765                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
766                return -1;
767        }
768#endif
769
770#if DPDK_USE_BLACKLIST
771        /* Blacklist all ports besides the one that we want to use */
772        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
773                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
774                         " are you sure the address is correct?: %s", strerror(-ret));
775                return -1;
776        }
777#endif
778
779#if DPDK_USE_PCI_PROBE
780        /* This loads DPDK drivers against all ports that are not blacklisted */
781        if ((ret = rte_eal_pci_probe()) < 0) {
782                snprintf(err, errlen,
783                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
784                return -1;
785        }
786#endif
787
788        format_data->nb_ports = rte_eth_dev_count();
789
790        if (format_data->nb_ports != 1) {
791                snprintf(err, errlen,
792                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
793                         format_data->nb_ports);
794                return -1;
795        }
796
797        return 0;
798}
799
800static int dpdk_init_input (libtrace_t *libtrace) {
801        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
802        char err[500];
803        err[0] = 0;
804
805        libtrace->format_data = (struct dpdk_format_data_t *)
806                                malloc(sizeof(struct dpdk_format_data_t));
807        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
808        FORMAT(libtrace)->nb_ports = 0;
809        FORMAT(libtrace)->snaplen = 0; /* Use default */
810        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
811        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
812        FORMAT(libtrace)->nic_numa_node = -1;
813        FORMAT(libtrace)->promisc = -1;
814        FORMAT(libtrace)->pktmbuf_pool = NULL;
815#if DPDK_USE_BLACKLIST
816        FORMAT(libtrace)->nb_blacklist = 0;
817#endif
818        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
819        FORMAT(libtrace)->mempool_name[0] = 0;
820        memset(FORMAT(libtrace)->burst_pkts, 0,
821               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
822        FORMAT(libtrace)->burst_size = 0;
823        FORMAT(libtrace)->burst_offset = 0;
824
825        /* Make our first stream */
826        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
827        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
828
829        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
830                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
831                free(libtrace->format_data);
832                libtrace->format_data = NULL;
833                return -1;
834        }
835        return 0;
836}
837
838static int dpdk_init_output(libtrace_out_t *libtrace)
839{
840        char err[500];
841        err[0] = 0;
842
843        libtrace->format_data = (struct dpdk_format_data_t *)
844                                malloc(sizeof(struct dpdk_format_data_t));
845        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
846        FORMAT(libtrace)->nb_ports = 0;
847        FORMAT(libtrace)->snaplen = 0; /* Use default */
848        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
849        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
850        FORMAT(libtrace)->nic_numa_node = -1;
851        FORMAT(libtrace)->promisc = -1;
852        FORMAT(libtrace)->pktmbuf_pool = NULL;
853#if DPDK_USE_BLACKLIST
854        FORMAT(libtrace)->nb_blacklist = 0;
855#endif
856        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
857        FORMAT(libtrace)->mempool_name[0] = 0;
858        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
859        FORMAT(libtrace)->burst_size = 0;
860        FORMAT(libtrace)->burst_offset = 0;
861
862        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
863                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
864                free(libtrace->format_data);
865                libtrace->format_data = NULL;
866                return -1;
867        }
868        return 0;
869}
870
871/**
872 * Note here snaplen excludes the MAC checksum. Packets over
873 * the requested snaplen will be dropped. (Excluding MAC checksum)
874 *
875 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
876 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
877 * is set the maximum size of the returned packet would be 1518 otherwise
878 * 1514 would be the largest size possibly returned.
879 *
880 */
881static int dpdk_config_input (libtrace_t *libtrace,
882                              trace_option_t option,
883                              void *data) {
884        switch (option) {
885        case TRACE_OPTION_SNAPLEN:
886                /* Only support changing snaplen before a call to start is
887                 * made */
888                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
889                        FORMAT(libtrace)->snaplen=*(int*)data;
890                else
891                        return -1;
892                return 0;
893        case TRACE_OPTION_PROMISC:
894                FORMAT(libtrace)->promisc=*(int*)data;
895                return 0;
896        case TRACE_OPTION_HASHER:
897                switch (*((enum hasher_types *) data))
898                {
899                case HASHER_BALANCE:
900                case HASHER_UNIDIRECTIONAL:
901                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
902                        return 0;
903                case HASHER_BIDIRECTIONAL:
904                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
905                        return 0;
906                case HASHER_CUSTOM:
907                        // We don't support these
908                        return -1;
909                }
910                break;
911        case TRACE_OPTION_FILTER:
912                /* TODO filtering */
913        case TRACE_OPTION_META_FREQ:
914        case TRACE_OPTION_EVENT_REALTIME:
915                break;
916        /* Avoid default: so that future options will cause a warning
917         * here to remind us to implement it, or flag it as
918         * unimplementable
919         */
920        }
921
922        /* Don't set an error - trace_config will try to deal with the
923         * option and will set an error if it fails */
924        return -1;
925}
926
927/* Can set jumbo frames/ or limit the size of a frame by setting both
928 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
929 *
930 */
931static struct rte_eth_conf port_conf = {
932        .rxmode = {
933                .mq_mode = ETH_RSS,
934                .split_hdr_size = 0,
935                .header_split   = 0, /**< Header Split disabled */
936                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
937                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
938                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
939                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
940#if GET_MAC_CRC_CHECKSUM
941/* So it appears that if hw_strip_crc is turned off the driver will still
942 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
943 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
944 * So lets just add it back on when we receive the packet.
945 */
946                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
947#else
948/* By default strip the MAC checksum because it's a bit of a hack to
949 * actually read these. And don't want to rely on disabling this to actualy
950 * always cut off the checksum in the future
951 */
952                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
953#endif
954        },
955        .txmode = {
956                .mq_mode = ETH_DCB_NONE,
957        },
958        .rx_adv_conf = {
959                .rss_conf = {
960                        // .rss_key = &rss_key, // We set this per format
961                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
962                },
963        },
964        .intr_conf = {
965                .lsc = 1
966        }
967};
968
969static const struct rte_eth_rxconf rx_conf = {
970        .rx_thresh = {
971                .pthresh = 8,/* RX_PTHRESH prefetch */
972                .hthresh = 8,/* RX_HTHRESH host */
973                .wthresh = 4,/* RX_WTHRESH writeback */
974        },
975        .rx_free_thresh = 0,
976        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
977};
978
979static const struct rte_eth_txconf tx_conf = {
980        .tx_thresh = {
981                /*
982                 * TX_PTHRESH prefetch
983                 * Set on the NIC, if the number of unprocessed descriptors to queued on
984                 * the card fall below this try grab at least hthresh more unprocessed
985                 * descriptors.
986                 */
987                .pthresh = 36,
988
989                /* TX_HTHRESH host
990                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
991                 */
992                .hthresh = 0,
993
994                /* TX_WTHRESH writeback
995                 * Set on the NIC, the number of sent descriptors before writing back
996                 * status to confirm the transmission. This is done more efficiently as
997                 * a bulk DMA-transfer rather than writing one at a time.
998                 * Similar to tx_free_thresh however this is applied to the NIC, where
999                 * as tx_free_thresh is when DPDK will check these. This is extended
1000                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
1001                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
1002                 */
1003                .wthresh = 4,
1004        },
1005
1006        /* Used internally by DPDK rather than passed to the NIC. The number of
1007         * packet descriptors to send before checking for any responses written
1008         * back (to confirm the transmission). Default = 32 if set to 0)
1009         */
1010        .tx_free_thresh = 0,
1011
1012        /* This is the Report Status threshold, used by 10Gbit cards,
1013         * This signals the card to only write back status (such as
1014         * transmission successful) after this minimum number of transmit
1015         * descriptors are seen. The default is 32 (if set to 0) however if set
1016         * to greater than 1 TX wthresh must be set to zero, because this is kindof
1017         * a replacement. See the dpdk programmers guide for more restrictions.
1018         */
1019        .tx_rs_thresh = 1,
1020};
1021
1022/**
1023 * A callback for a link state change (LSC).
1024 *
1025 * Packets may be received before this notification. In fact the DPDK IGXBE
1026 * driver likes to put a delay upto 5sec before sending this.
1027 *
1028 * We use this to ensure the link speed is correct for our timestamp
1029 * calculations. Because packets might be received before the link up we still
1030 * update this when the packet is received.
1031 *
1032 * @param port The DPDK port
1033 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1034 * @param cb_arg The dpdk_format_data_t structure associated with the format
1035 */
1036static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1037                              void *cb_arg) {
1038        struct dpdk_format_data_t * format_data = cb_arg;
1039        struct rte_eth_link link_info;
1040        assert(event == RTE_ETH_EVENT_INTR_LSC);
1041        assert(port == format_data->port);
1042
1043        rte_eth_link_get_nowait(port, &link_info);
1044
1045        if (link_info.link_status)
1046                format_data->link_speed = link_info.link_speed;
1047        else
1048                format_data->link_speed = 0;
1049
1050#if DEBUG
1051        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1052                link_info.link_status ? "up" : "down",
1053                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1054                                          "full-duplex" : "half-duplex",
1055                (int) link_info.link_speed);
1056#endif
1057
1058        /* Turns out DPDK drivers might not come back up if the link speed
1059         * changes. So we reset the autoneg procedure. This is very unsafe
1060         * we have have threads reading packets and we stop the port. */
1061#if 0
1062        if (!link_info.link_status) {
1063                int ret;
1064                rte_eth_dev_stop(port);
1065                ret = rte_eth_dev_start(port);
1066                if (ret < 0) {
1067                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1068                                strerror(-ret));
1069                }
1070        }
1071#endif
1072}
1073
1074/** Reserve a DPDK lcore ID for a thread globally.
1075 *
1076 * @param real If true allocate a real lcore, otherwise allocate a core which
1077 * does not exist on the local machine.
1078 * @param socket the prefered NUMA socket - only used if a real core is requested
1079 * @return a valid core, which can later be used with dpdk_register_lcore() or a
1080 * -1 if have run out of cores.
1081 *
1082 * If any thread is reading or freeing packets we need to register it here
1083 * due to TLS caches in the memory pools.
1084 */
1085static int dpdk_reserve_lcore(bool real, int socket) {
1086        int new_id = -1;
1087        int i;
1088        struct rte_config *cfg = rte_eal_get_configuration();
1089
1090        pthread_mutex_lock(&dpdk_lock);
1091        /* If 'reading packets' fill in cores from 0 up and bind affinity
1092         * otherwise start from the MAX core (which is also the master) and work backwards
1093         * in this case physical cores on the system will not exist so we don't bind
1094         * these to any particular physical core */
1095        if (real) {
1096#if HAVE_LIBNUMA
1097                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1098                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
1099                                new_id = i;
1100                                if (!lcore_config[i].detected)
1101                                        new_id = -1;
1102                                break;
1103                        }
1104                }
1105#endif
1106                /* Retry without the the numa restriction */
1107                if (new_id == -1) {
1108                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1109                                if (!rte_lcore_is_enabled(i)) {
1110                                        new_id = i;
1111                                        if (!lcore_config[i].detected)
1112                                                fprintf(stderr, "Warning the"
1113                                                        " number of 'reading' "
1114                                                        "threads exceed cores\n");
1115                                        break;
1116                                }
1117                        }
1118                }
1119        } else {
1120                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1121                        if (!rte_lcore_is_enabled(i)) {
1122                                new_id = i;
1123                                break;
1124                        }
1125                }
1126        }
1127
1128        if (new_id != -1) {
1129                /* Enable the core in global DPDK structs */
1130                cfg->lcore_role[new_id] = ROLE_RTE;
1131                cfg->lcore_count++;
1132        }
1133
1134        pthread_mutex_unlock(&dpdk_lock);
1135        return new_id;
1136}
1137
1138/** Register a thread as a lcore
1139 * @param libtrace any error is set against libtrace on exit
1140 * @param real If this is a true lcore we will bind its affinty to the
1141 * requested core.
1142 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
1143 * @return 0, if successful otherwise -1 if an error occured (details are stored
1144 * in libtrace)
1145 *
1146 * @note This must be called from the thread being registered.
1147 */
1148static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
1149        int ret;
1150        RTE_PER_LCORE(_lcore_id) = lcore;
1151
1152        /* Set affinity bind to corresponding core */
1153        if (real) {
1154                cpu_set_t cpuset;
1155                CPU_ZERO(&cpuset);
1156                CPU_SET(rte_lcore_id(), &cpuset);
1157                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1158                if (ret != 0) {
1159                        trace_set_err(libtrace, errno, "Warning "
1160                                      "pthread_setaffinity_np failed");
1161                        return -1;
1162                }
1163        }
1164
1165        return 0;
1166}
1167
1168/** Allocates a new dpdk packet buffer memory pool.
1169 *
1170 * @param n The number of threads
1171 * @param pkt_size The packet size we need ot store
1172 * @param socket_id The NUMA socket id
1173 * @param A new mempool, if NULL query the DPDK library for the error code
1174 * see rte_mempool_create() documentation.
1175 *
1176 * This allocates a new pool or recycles an existing memory pool.
1177 * Call dpdk_free_memory() to free the memory.
1178 * We cannot delete memory so instead we store the pools, allowing them to be
1179 * re-used.
1180 */
1181static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1182                                             unsigned pkt_size,
1183                                             int socket_id) {
1184        struct rte_mempool *ret;
1185        size_t j,k;
1186        char name[MEMPOOL_NAME_LEN];
1187
1188        /* Add on packet size overheads */
1189        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1190
1191        pthread_mutex_lock(&dpdk_lock);
1192
1193        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1194                /* Best guess go for zero */
1195                socket_id = 0;
1196        }
1197
1198        /* Find a valid pool */
1199        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1200                if (mem_pools[socket_id][j]->size >= n &&
1201                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1202                        break;
1203                }
1204        }
1205
1206        /* Find the end (+1) of the list */
1207        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1208
1209        if (mem_pools[socket_id][j]) {
1210                ret = mem_pools[socket_id][j];
1211                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1212                mem_pools[socket_id][k-1] = NULL;
1213                mem_pools[socket_id][j] = NULL;
1214        } else {
1215                static uint32_t test = 10;
1216                test++;
1217                snprintf(name, MEMPOOL_NAME_LEN,
1218                         "libtrace_pool_%"PRIu32, test);
1219
1220                ret = rte_mempool_create(name, n, pkt_size,
1221                                         128, sizeof(struct rte_pktmbuf_pool_private),
1222                                         rte_pktmbuf_pool_init, NULL,
1223                                         rte_pktmbuf_init, NULL,
1224                                         socket_id, 0);
1225        }
1226
1227        pthread_mutex_unlock(&dpdk_lock);
1228        return ret;
1229}
1230
1231/** Stores the memory against the DPDK library.
1232 *
1233 * @param mempool The mempool to free
1234 * @param socket_id The NUMA socket this mempool was allocated upon.
1235 *
1236 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1237 * store the memory shared globally against the format.
1238 */
1239static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1240        size_t i;
1241        pthread_mutex_lock(&dpdk_lock);
1242
1243        /* We should have all entries back in the mempool */
1244        rte_mempool_audit(mempool);
1245        if (!rte_mempool_full(mempool)) {
1246                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1247                        "free all packets before finishing a trace\n",
1248                        rte_mempool_count(mempool), mempool->size);
1249        }
1250
1251        /* Find the end (+1) of the list */
1252        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1253
1254        if (i >= RTE_MAX_LCORE) {
1255                fprintf(stderr, "Too many memory pools, dropping this one\n");
1256        } else {
1257                mem_pools[socket_id][i] = mempool;
1258        }
1259
1260        pthread_mutex_unlock(&dpdk_lock);
1261}
1262
1263/* Attach memory to the port and start (or restart) the port/s.
1264 */
1265static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1266                              char *err, int errlen, uint16_t rx_queues) {
1267        int ret, i;
1268        struct rte_eth_link link_info; /* Wait for link */
1269        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1270
1271        /* Already started */
1272        if (format_data->paused == DPDK_RUNNING)
1273                return 0;
1274
1275        /* First time started we need to alloc our memory, doing this here
1276         * rather than in environment setup because we don't have snaplen then */
1277        if (format_data->paused == DPDK_NEVER_STARTED) {
1278                if (format_data->snaplen == 0) {
1279                        format_data->snaplen = RX_MBUF_SIZE;
1280                        port_conf.rxmode.jumbo_frame = 0;
1281                        port_conf.rxmode.max_rx_pkt_len = 0;
1282                } else {
1283                        /* Use jumbo frames */
1284                        port_conf.rxmode.jumbo_frame = 1;
1285                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1286                }
1287
1288#if GET_MAC_CRC_CHECKSUM
1289                /* This is additional overhead so make sure we allow space for this */
1290                format_data->snaplen += ETHER_CRC_LEN;
1291#endif
1292#if HAS_HW_TIMESTAMPS_82580
1293                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1294#endif
1295
1296                /* Create the mbuf pool, which is the place packets are allocated
1297                 * from - There is no free function (I cannot see one).
1298                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1299                 * allocate however that extra 1 packet is not used.
1300                 * (I assume <= vs < error some where in DPDK code)
1301                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1302                 * so that will fill the new buffer and wait until slots in the
1303                 * ring become available.
1304                 */
1305#if DEBUG
1306                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1307#endif
1308                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1309                                                              format_data->snaplen,
1310                                                              format_data->nic_numa_node);
1311
1312                if (format_data->pktmbuf_pool == NULL) {
1313                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1314                                 "pool failed: %s", strerror(rte_errno));
1315                        return -1;
1316                }
1317        }
1318
1319        /* ----------- Now do the setup for the port mapping ------------ */
1320        /* Order of calls must be
1321         * rte_eth_dev_configure()
1322         * rte_eth_tx_queue_setup()
1323         * rte_eth_rx_queue_setup()
1324         * rte_eth_dev_start()
1325         * other rte_eth calls
1326         */
1327
1328        /* This must be called first before another *eth* function
1329         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1330        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1331        if (ret < 0) {
1332                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1333                         " %"PRIu8" : %s", format_data->port,
1334                         strerror(-ret));
1335                return -1;
1336        }
1337#if DEBUG
1338        fprintf(stderr, "Doing dev configure\n");
1339#endif
1340        /* Initialise the TX queue a minimum value if using this port for
1341         * receiving. Otherwise a larger size if writing packets.
1342         */
1343        ret = rte_eth_tx_queue_setup(format_data->port,
1344                                     0 /* queue XXX */,
1345                                     format_data->nb_tx_buf,
1346                                     SOCKET_ID_ANY,
1347                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1348        if (ret < 0) {
1349                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1350                         " on port %"PRIu8" : %s", format_data->port,
1351                         strerror(-ret));
1352                return -1;
1353        }
1354
1355        /* Attach memory to our RX queues */
1356        for (i=0; i < rx_queues; i++) {
1357                dpdk_per_stream_t *stream;
1358#if DEBUG
1359                fprintf(stderr, "Configuring queue %d\n", i);
1360#endif
1361
1362                /* Add storage for the stream */
1363                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1364                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1365                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1366                stream->queue_id = i;
1367
1368                if (stream->lcore == -1)
1369                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1370
1371                if (stream->lcore == -1) {
1372                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1373                                 ". Too many threads?");
1374                        return -1;
1375                }
1376
1377                if (stream->mempool == NULL) {
1378                        stream->mempool = dpdk_alloc_memory(
1379                                                  format_data->nb_rx_buf*2,
1380                                                  format_data->snaplen,
1381                                                  rte_lcore_to_socket_id(stream->lcore));
1382
1383                        if (stream->mempool == NULL) {
1384                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1385                                         "pool failed: %s", strerror(rte_errno));
1386                                return -1;
1387                        }
1388                }
1389
1390                /* Initialise the RX queue with some packets from memory */
1391                ret = rte_eth_rx_queue_setup(format_data->port,
1392                                             stream->queue_id,
1393                                             format_data->nb_rx_buf,
1394                                             format_data->nic_numa_node,
1395                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1396                                             stream->mempool);
1397                if (ret < 0) {
1398                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1399                                 " RX queue on port %"PRIu8" : %s",
1400                                 format_data->port,
1401                                 strerror(-ret));
1402                        return -1;
1403                }
1404        }
1405
1406#if DEBUG
1407        fprintf(stderr, "Doing start device\n");
1408#endif
1409        rte_eth_stats_reset(format_data->port);
1410        /* Start device */
1411        ret = rte_eth_dev_start(format_data->port);
1412        if (ret < 0) {
1413                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1414                         strerror(-ret));
1415                return -1;
1416        }
1417
1418        /* Default promiscuous to on */
1419        if (format_data->promisc == -1)
1420                format_data->promisc = 1;
1421
1422        if (format_data->promisc == 1)
1423                rte_eth_promiscuous_enable(format_data->port);
1424        else
1425                rte_eth_promiscuous_disable(format_data->port);
1426
1427        /* We have now successfully started/unpased */
1428        format_data->paused = DPDK_RUNNING;
1429
1430
1431        /* Register a callback for link state changes */
1432        ret = rte_eth_dev_callback_register(format_data->port,
1433                                            RTE_ETH_EVENT_INTR_LSC,
1434                                            dpdk_lsc_callback,
1435                                            format_data);
1436#if DEBUG
1437        if (ret)
1438                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1439                        ret, strerror(-ret));
1440#endif
1441
1442        /* Get the current link status */
1443        rte_eth_link_get_nowait(format_data->port, &link_info);
1444        format_data->link_speed = link_info.link_speed;
1445#if DEBUG
1446        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1447                (int) link_info.link_duplex, (int) link_info.link_speed);
1448#endif
1449
1450        return 0;
1451}
1452
1453static int dpdk_start_input (libtrace_t *libtrace) {
1454        char err[500];
1455        err[0] = 0;
1456
1457        /* Make sure we don't reserve an extra thread for this */
1458        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1459
1460        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1461                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1462                free(libtrace->format_data);
1463                libtrace->format_data = NULL;
1464                return -1;
1465        }
1466        return 0;
1467}
1468
1469static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1470        struct rte_eth_dev_info dev_info;
1471        rte_eth_dev_info_get(port_id, &dev_info);
1472        return dev_info.max_rx_queues;
1473}
1474
1475static inline size_t dpdk_processor_count () {
1476        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1477        if (nb_cpu <= 0)
1478                return 1;
1479        else
1480                return (size_t) nb_cpu;
1481}
1482
1483static int dpdk_pstart_input (libtrace_t *libtrace) {
1484        char err[500];
1485        int i=0, phys_cores=0;
1486        int tot = libtrace->perpkt_thread_count;
1487        libtrace_list_node_t *n;
1488        err[0] = 0;
1489
1490        if (rte_lcore_id() != rte_get_master_lcore())
1491                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1492                        " from the master DPDK thread!\n");
1493
1494        /* If the master is not on the last thread we move it there */
1495        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1496                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1497                        return -1;
1498        }
1499
1500        /* Don't exceed the number of cores in the system/detected by dpdk
1501         * We don't have to force this but performance wont be good if we don't */
1502        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1503                if (lcore_config[i].detected) {
1504                        if (rte_lcore_is_enabled(i)) {
1505#if DEBUG
1506                                fprintf(stderr, "Found core %d already in use!\n", i);
1507#endif
1508                        } else {
1509                                phys_cores++;
1510                        }
1511                }
1512        }
1513        /* If we are restarting we have already allocated some threads as such
1514         * we add these back to the count for this calculation */
1515        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1516                dpdk_per_stream_t * stream = n->data;
1517                if (stream->lcore != -1)
1518                        phys_cores++;
1519        }
1520
1521        tot = MIN(libtrace->perpkt_thread_count,
1522                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1523        tot = MIN(tot, phys_cores);
1524
1525#if DEBUG
1526        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1527                libtrace->perpkt_thread_count, phys_cores);
1528#endif
1529
1530        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1531                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1532                free(libtrace->format_data);
1533                libtrace->format_data = NULL;
1534                return -1;
1535        }
1536
1537        /* Make sure we only start the number that we should */
1538        libtrace->perpkt_thread_count = tot;
1539        return 0;
1540}
1541
1542/**
1543 * Register a thread with the DPDK system,
1544 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1545 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1546 * gives it.
1547 *
1548 * We then allow a mapper thread to be started on every real core as DPDK would,
1549 * we also bind these to the corresponding CPU cores.
1550 *
1551 * @param libtrace A pointer to the trace
1552 * @param reading True if the thread will be used to read packets, i.e. will
1553 *                call pread_packet(), false if thread used to process packet
1554 *                in any other manner including statistics functions.
1555 */
1556static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1557{
1558#if DEBUG
1559        char name[99];
1560        name[0] = 0;
1561#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1562        pthread_getname_np(pthread_self(),
1563                           name, sizeof(name));
1564#endif
1565#endif
1566        if (reading) {
1567                dpdk_per_stream_t *stream;
1568                /* Attach our thread */
1569                if(t->type == THREAD_PERPKT) {
1570                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1571                        if (t->format_data == NULL) {
1572                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1573                                              "Too many threads registered");
1574                                return -1;
1575                        }
1576                } else {
1577                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1578                }
1579                stream = t->format_data;
1580#if DEBUG
1581                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1582#endif
1583                return dpdk_register_lcore(libtrace, true, stream->lcore);
1584        } else {
1585                int lcore = dpdk_reserve_lcore(reading, 0);
1586                if (lcore == -1) {
1587                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1588                                      " for DPDK");
1589                        return -1;
1590                }
1591#if DEBUG
1592                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1593#endif
1594                return dpdk_register_lcore(libtrace, false, lcore);
1595        }
1596
1597        return 0;
1598}
1599
1600/**
1601 * Unregister a thread with the DPDK system.
1602 *
1603 * Only previously registered threads should be calling this just before
1604 * they are destroyed.
1605 */
1606static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1607{
1608        struct rte_config *cfg = rte_eal_get_configuration();
1609
1610        assert(rte_lcore_id() < RTE_MAX_LCORE);
1611        pthread_mutex_lock(&dpdk_lock);
1612        /* Skip if master */
1613        if (rte_lcore_id() == rte_get_master_lcore()) {
1614                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1615                pthread_mutex_unlock(&dpdk_lock);
1616                return;
1617        }
1618
1619        /* Disable this core in global DPDK structs */
1620        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1621        cfg->lcore_count--;
1622        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1623        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1624        pthread_mutex_unlock(&dpdk_lock);
1625        return;
1626}
1627
1628static int dpdk_start_output(libtrace_out_t *libtrace)
1629{
1630        char err[500];
1631        err[0] = 0;
1632
1633        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1634                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1635                free(libtrace->format_data);
1636                libtrace->format_data = NULL;
1637                return -1;
1638        }
1639        return 0;
1640}
1641
1642static int dpdk_pause_input(libtrace_t * libtrace) {
1643        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1644        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1645        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1646#if DEBUG
1647                fprintf(stderr, "Pausing DPDK port\n");
1648#endif
1649                rte_eth_dev_stop(FORMAT(libtrace)->port);
1650                FORMAT(libtrace)->paused = DPDK_PAUSED;
1651                /* Empty the queue of packets */
1652                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1653                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1654                }
1655                FORMAT(libtrace)->burst_offset = 0;
1656                FORMAT(libtrace)->burst_size = 0;
1657
1658                for (; tmp != NULL; tmp = tmp->next) {
1659                        dpdk_per_stream_t *stream = tmp->data;
1660                        stream->ts_last_sys = 0;
1661#if HAS_HW_TIMESTAMPS_82580
1662                        stream->ts_first_sys = 0;
1663#endif
1664                }
1665
1666        }
1667        return 0;
1668}
1669
1670static int dpdk_write_packet(libtrace_out_t *trace,
1671                             libtrace_packet_t *packet){
1672        struct rte_mbuf* m_buff[1];
1673
1674        int wirelen = trace_get_wire_length(packet);
1675        int caplen = trace_get_capture_length(packet);
1676
1677        /* Check for a checksum and remove it */
1678        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1679            wirelen == caplen)
1680                caplen -= ETHER_CRC_LEN;
1681
1682        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1683        if (m_buff[0] == NULL) {
1684                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1685                return -1;
1686        } else {
1687                int ret;
1688                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1689                do {
1690                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1691                } while (ret != 1);
1692        }
1693
1694        return 0;
1695}
1696
1697static int dpdk_fin_input(libtrace_t * libtrace) {
1698        libtrace_list_node_t * n;
1699        /* Free our memory structures */
1700        if (libtrace->format_data != NULL) {
1701
1702                if (FORMAT(libtrace)->port != 0xFF)
1703                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1704                                                        RTE_ETH_EVENT_INTR_LSC,
1705                                                        dpdk_lsc_callback,
1706                                                        FORMAT(libtrace));
1707                /* Close the device completely, device cannot be restarted */
1708                rte_eth_dev_close(FORMAT(libtrace)->port);
1709
1710                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1711                                 FORMAT(libtrace)->nic_numa_node);
1712
1713                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1714                        dpdk_per_stream_t * stream = n->data;
1715                        if (stream->mempool)
1716                                dpdk_free_memory(stream->mempool,
1717                                                 rte_lcore_to_socket_id(stream->lcore));
1718                }
1719
1720                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1721                /* filter here if we used it */
1722                free(libtrace->format_data);
1723        }
1724
1725        return 0;
1726}
1727
1728
1729static int dpdk_fin_output(libtrace_out_t * libtrace) {
1730        /* Free our memory structures */
1731        if (libtrace->format_data != NULL) {
1732                /* Close the device completely, device cannot be restarted */
1733                if (FORMAT(libtrace)->port != 0xFF)
1734                        rte_eth_dev_close(FORMAT(libtrace)->port);
1735                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1736                /* filter here if we used it */
1737                free(libtrace->format_data);
1738        }
1739
1740        return 0;
1741}
1742
1743/**
1744 * Get the start of the additional header that we added to a packet.
1745 */
1746static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1747        assert(packet);
1748        assert(packet->buffer);
1749        /* Our header sits straight after the mbuf header */
1750        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1751}
1752
1753static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1754        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1755        return hdr->cap_len;
1756}
1757
1758static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1759        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1760        if (size > hdr->cap_len) {
1761                /* Cannot make a packet bigger */
1762                return trace_get_capture_length(packet);
1763        }
1764
1765        /* Reset the cached capture length first*/
1766        packet->capture_length = -1;
1767        hdr->cap_len = (uint32_t) size;
1768        return trace_get_capture_length(packet);
1769}
1770
1771static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1772        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1773        int org_cap_size; /* The original capture size */
1774        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1775                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1776                               sizeof(struct hw_timestamp_82580);
1777        } else {
1778                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1779        }
1780        if (hdr->flags & INCLUDES_CHECKSUM) {
1781                return org_cap_size;
1782        } else {
1783                /* DPDK packets are always TRACE_TYPE_ETH packets */
1784                return org_cap_size + ETHER_CRC_LEN;
1785        }
1786}
1787
1788static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1789        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1790        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1791                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1792                                sizeof(struct hw_timestamp_82580);
1793        else
1794                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1795}
1796
1797static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1798                               libtrace_packet_t *packet, void *buffer,
1799                               libtrace_rt_types_t rt_type, uint32_t flags) {
1800        assert(packet);
1801        if (packet->buffer != buffer &&
1802            packet->buf_control == TRACE_CTRL_PACKET) {
1803                free(packet->buffer);
1804        }
1805
1806        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1807                packet->buf_control = TRACE_CTRL_PACKET;
1808        else
1809                packet->buf_control = TRACE_CTRL_EXTERNAL;
1810
1811        packet->buffer = buffer;
1812        packet->header = buffer;
1813
1814        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1815        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1816        packet->type = rt_type;
1817        return 0;
1818}
1819
1820/**
1821 * Given a packet size and a link speed, computes the
1822 * time to transmit in nanoseconds.
1823 *
1824 * @param format_data The dpdk format data from which we get the link speed
1825 *        and if unset updates it in a thread safe manner
1826 * @param pkt_size The size of the packet in bytes
1827 * @return The wire time in nanoseconds
1828 */
1829static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1830        uint32_t wire_time;
1831        /* 20 extra bytes of interframe gap and preamble */
1832# if GET_MAC_CRC_CHECKSUM
1833        wire_time = ((pkt_size + 20) * 8000);
1834# else
1835        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1836# endif
1837
1838        /* Division is really slow and introduces a pipeline stall
1839         * The compiler will optimise this into magical multiplication and shifting
1840         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1841         */
1842retry_calc_wiretime:
1843        switch (format_data->link_speed) {
1844        case ETH_LINK_SPEED_40G:
1845                wire_time /=  ETH_LINK_SPEED_40G;
1846                break;
1847        case ETH_LINK_SPEED_20G:
1848                wire_time /= ETH_LINK_SPEED_20G;
1849                break;
1850        case ETH_LINK_SPEED_10G:
1851                wire_time /= ETH_LINK_SPEED_10G;
1852                break;
1853        case ETH_LINK_SPEED_1000:
1854                wire_time /= ETH_LINK_SPEED_1000;
1855                break;
1856        case 0:
1857                {
1858                /* Maybe the link was down originally, but now it should be up */
1859                struct rte_eth_link link = {0};
1860                rte_eth_link_get_nowait(format_data->port, &link);
1861                if (link.link_status && link.link_speed) {
1862                        format_data->link_speed = link.link_speed;
1863#ifdef DEBUG
1864                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1865#endif
1866                        goto retry_calc_wiretime;
1867                }
1868                /* We don't know the link speed, make sure numbers are counting up */
1869                wire_time = 1;
1870                break;
1871                }
1872        default:
1873                wire_time /= format_data->link_speed;
1874        }
1875        return wire_time;
1876}
1877
1878/**
1879 * Does any extra preperation to all captured packets
1880 * This includes adding our extra header to it with the timestamp,
1881 * and any snapping
1882 *
1883 * @param format_data The DPDK format data
1884 * @param plc The DPDK per lcore format data
1885 * @param pkts An array of size nb_pkts of DPDK packets
1886 */
1887static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1888                                   struct dpdk_per_stream_t *plc,
1889                                   struct rte_mbuf **pkts,
1890                                   size_t nb_pkts) {
1891        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1892        struct dpdk_addt_hdr *hdr;
1893        size_t i;
1894        uint64_t cur_sys_time_ns;
1895#if HAS_HW_TIMESTAMPS_82580
1896        struct hw_timestamp_82580 *hw_ts;
1897        uint64_t estimated_wraps;
1898#else
1899
1900#endif
1901
1902#if USE_CLOCK_GETTIME
1903        struct timespec cur_sys_time = {0};
1904        /* This looks terrible and I feel bad doing it. But it's OK
1905         * on new kernels, because this is a fast vsyscall */
1906        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1907        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1908#else
1909        struct timeval cur_sys_time = {0};
1910        /* Also a fast vsyscall */
1911        gettimeofday(&cur_sys_time, NULL);
1912        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1913#endif
1914
1915        /* The system clock is not perfect so when running
1916         * at linerate we could timestamp a packet in the past.
1917         * To avoid this we munge the timestamp to appear 1ns
1918         * after the previous packet. We should eventually catch up
1919         * to system time since a 64byte packet on a 10G link takes 67ns.
1920         *
1921         * Note with parallel readers timestamping packets
1922         * with duplicate stamps or out of order is unavoidable without
1923         * hardware timestamping from the NIC.
1924         */
1925#if !HAS_HW_TIMESTAMPS_82580
1926        if (plc->ts_last_sys >= cur_sys_time_ns) {
1927                cur_sys_time_ns = plc->ts_last_sys + 1;
1928        }
1929#endif
1930
1931        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1932        for (i = 0 ; i < nb_pkts ; ++i) {
1933
1934                /* We put our header straight after the dpdk header */
1935                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1936                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1937
1938#if GET_MAC_CRC_CHECKSUM
1939                /* Add back in the CRC sum */
1940                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1941                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1942                hdr->flags |= INCLUDES_CHECKSUM;
1943#endif
1944
1945                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1946
1947#if HAS_HW_TIMESTAMPS_82580
1948                /* The timestamp is sitting before our packet and is included in pkt_len */
1949                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1950                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1951                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1952
1953                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1954                 *
1955                 *        +----------+---+   +--------------+
1956                 *  82580 |    24    | 8 |   |      32      |
1957                 *        +----------+---+   +--------------+
1958                 *          reserved  \______ 40 bits _____/
1959                 *
1960                 * The 40 bit 82580 SYSTIM overflows every
1961                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1962                 *
1963                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1964                 * Endian (for the full 64 bits) i.e. picture is mirrored
1965                 */
1966
1967                /* Despite what the documentation says this is in Little
1968                 * Endian byteorder. Mask the reserved section out.
1969                 */
1970                hdr->timestamp = le64toh(hw_ts->timestamp) &
1971                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1972
1973                if (unlikely(plc->ts_first_sys == 0)) {
1974                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1975                        plc->ts_last_sys = plc->ts_first_sys;
1976                }
1977
1978                /* This will have serious problems if packets aren't read quickly
1979                 * that is within a couple of seconds because our clock cycles every
1980                 * 18 seconds */
1981                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1982                                  / (1ull<<TS_NBITS_82580);
1983
1984                /* Estimated_wraps gives the number of times the counter should have
1985                 * wrapped (however depending on value last time it could have wrapped
1986                 * twice more (if hw clock is close to its max value) or once less (allowing
1987                 * for a bit of variance between hw and sys clock). But if the clock
1988                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1989                if (unlikely(estimated_wraps >= 2)) {
1990                        /* 2 or more wrap arounds add all but the very last wrap */
1991                        plc->wrap_count += estimated_wraps - 1;
1992                }
1993
1994                /* Set the timestamp to the lowest possible value we're considering */
1995                hdr->timestamp += plc->ts_first_sys +
1996                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1997
1998                /* In most runs only the first if() will need evaluating - i.e our
1999                 * estimate is correct. */
2000                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
2001                                              hdr->timestamp, MAXSKEW_82580))) {
2002                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
2003                        plc->wrap_count++;
2004                        hdr->timestamp += (1ull<<TS_NBITS_82580);
2005                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
2006                                             hdr->timestamp, MAXSKEW_82580)) {
2007                                /* Failed to match estimated_wraps */
2008                                plc->wrap_count++;
2009                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2010                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2011                                                     hdr->timestamp, MAXSKEW_82580)) {
2012                                        if (estimated_wraps == 0) {
2013                                                /* 0 case Failed to match estimated_wraps+2 */
2014                                                printf("WARNING - Hardware Timestamp failed to"
2015                                                       " match using systemtime!\n");
2016                                                hdr->timestamp = cur_sys_time_ns;
2017                                        } else {
2018                                                /* Failed to match estimated_wraps+1 */
2019                                                plc->wrap_count++;
2020                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2021                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2022                                                                     hdr->timestamp, MAXSKEW_82580)) {
2023                                                        /* Failed to match estimated_wraps+2 */
2024                                                        printf("WARNING - Hardware Timestamp failed to"
2025                                                               " match using systemtime!!\n");
2026                                                }
2027                                        }
2028                                }
2029                        }
2030                }
2031#else
2032
2033                hdr->timestamp = cur_sys_time_ns;
2034                /* Offset the next packet by the wire time of previous */
2035                calculate_wire_time(format_data, hdr->cap_len);
2036
2037#endif
2038        }
2039
2040        plc->ts_last_sys = cur_sys_time_ns;
2041        return;
2042}
2043
2044
2045static void dpdk_fin_packet(libtrace_packet_t *packet)
2046{
2047        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2048                rte_pktmbuf_free(packet->buffer);
2049                packet->buffer = NULL;
2050        }
2051}
2052
2053/** Reads at least one packet or returns an error
2054 */
2055static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
2056                                           dpdk_per_stream_t *stream,
2057                                           libtrace_message_queue_t *mesg,
2058                                           struct rte_mbuf* pkts_burst[],
2059                                           size_t nb_packets) {
2060        size_t nb_rx; /* Number of rx packets we've recevied */
2061        while (1) {
2062                /* Poll for a batch of packets */
2063                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2064                                         stream->queue_id, pkts_burst, nb_packets);
2065                if (nb_rx > 0) {
2066                        /* Got some packets - otherwise we keep spining */
2067                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
2068                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2069                        return nb_rx;
2070                }
2071                /* Check the message queue this could be less than 0 */
2072                if (mesg && libtrace_message_queue_count(mesg) > 0)
2073                        return READ_MESSAGE;
2074                if (libtrace_halt)
2075                        return READ_EOF;
2076                /* Wait a while, polling on memory degrades performance
2077                 * This relieves the pressure on memory allowing the NIC to DMA */
2078                rte_delay_us(10);
2079        }
2080
2081        /* We'll never get here - but if we did it would be bad */
2082        return READ_ERROR;
2083}
2084
2085static int dpdk_pread_packets (libtrace_t *libtrace,
2086                                    libtrace_thread_t *t,
2087                                    libtrace_packet_t **packets,
2088                                    size_t nb_packets) {
2089        int nb_rx; /* Number of rx packets we've recevied */
2090        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2091        int i;
2092        dpdk_per_stream_t *stream = t->format_data;
2093
2094        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
2095                                         pkts_burst, nb_packets);
2096
2097        if (nb_rx > 0) {
2098                for (i = 0; i < nb_rx; ++i) {
2099                        if (packets[i]->buffer != NULL) {
2100                                /* The packet should always be finished */
2101                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
2102                                free(packets[i]->buffer);
2103                        }
2104                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2105                        packets[i]->type = TRACE_RT_DATA_DPDK;
2106                        packets[i]->buffer = pkts_burst[i];
2107                        packets[i]->trace = libtrace;
2108                        packets[i]->error = 1;
2109                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2110                }
2111        }
2112
2113        return nb_rx;
2114}
2115
2116static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2117        int nb_rx; /* Number of rx packets we've received */
2118        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2119
2120        /* Free the last packet buffer */
2121        if (packet->buffer != NULL) {
2122                /* The packet should always be finished */
2123                assert(packet->buf_control == TRACE_CTRL_PACKET);
2124                free(packet->buffer);
2125                packet->buffer = NULL;
2126        }
2127
2128        packet->buf_control = TRACE_CTRL_EXTERNAL;
2129        packet->type = TRACE_RT_DATA_DPDK;
2130
2131        /* Check if we already have some packets buffered */
2132        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2133                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2134                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2135                return 1; // TODO should be bytes read, which essentially useless anyway
2136        }
2137
2138        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2139                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2140
2141        if (nb_rx > 0) {
2142                FORMAT(libtrace)->burst_size = nb_rx;
2143                FORMAT(libtrace)->burst_offset = 1;
2144                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2145                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2146                return 1;
2147        }
2148        return nb_rx;
2149}
2150
2151static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2152        struct timeval tv;
2153        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2154
2155        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2156        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2157        return tv;
2158}
2159
2160static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2161        struct timespec ts;
2162        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2163
2164        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2165        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2166        return ts;
2167}
2168
2169static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2170        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2171}
2172
2173static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2174        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2175        return (libtrace_direction_t) hdr->direction;
2176}
2177
2178static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2179        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2180        hdr->direction = (uint8_t) direction;
2181        return (libtrace_direction_t) hdr->direction;
2182}
2183
2184static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2185        struct rte_eth_stats dev_stats = {0};
2186
2187        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2188                return;
2189
2190        /* Grab the current stats */
2191        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2192
2193        stats->captured_valid = true;
2194        stats->captured = dev_stats.ipackets;
2195
2196        /* Not that we support adding filters but if we did this
2197         * would work */
2198        stats->filtered += dev_stats.fdirmiss;
2199
2200        stats->dropped_valid = true;
2201        stats->dropped = dev_stats.imissed;
2202
2203        /* DPDK errors includes drops */
2204        stats->errors_valid = true;
2205        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2206
2207        stats->received_valid = true;
2208        stats->received = dev_stats.ipackets + dev_stats.imissed;
2209
2210}
2211
2212/* Attempts to read a packet in a non-blocking fashion. If one is not
2213 * available a SLEEP event is returned. We do not have the ability to
2214 * create a select()able file descriptor in DPDK.
2215 */
2216static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2217                                            libtrace_packet_t *packet) {
2218        libtrace_eventobj_t event = {0,0,0.0,0};
2219        int nb_rx; /* Number of receive packets we've read */
2220        struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2221
2222        do {
2223
2224                /* See if we already have a packet waiting */
2225                nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2226                                         FORMAT_DATA_FIRST(trace)->queue_id,
2227                                         pkts_burst, 1);
2228
2229                if (nb_rx > 0) {
2230                        /* Free the last packet buffer */
2231                        if (packet->buffer != NULL) {
2232                                /* The packet should always be finished */
2233                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2234                                free(packet->buffer);
2235                                packet->buffer = NULL;
2236                        }
2237
2238                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2239                        packet->type = TRACE_RT_DATA_DPDK;
2240                        event.type = TRACE_EVENT_PACKET;
2241                        dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
2242                        packet->buffer = FORMAT(trace)->burst_pkts[0];
2243                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2244                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2245
2246                        /* XXX - Check this passes the filter trace_read_packet normally
2247                         * does this for us but this wont */
2248                        if (trace->filter) {
2249                                if (!trace_apply_filter(trace->filter, packet)) {
2250                                        /* Failed the filter so we loop for another packet */
2251                                        trace->filtered_packets ++;
2252                                        continue;
2253                                }
2254                        }
2255                        trace->accepted_packets ++;
2256                } else {
2257                        /* We only want to sleep for a very short time - we are non-blocking */
2258                        event.type = TRACE_EVENT_SLEEP;
2259                        event.seconds = 0.0001;
2260                        event.size = 0;
2261                }
2262
2263                /* If we get here we have our event */
2264                break;
2265        } while (1);
2266
2267        return event;
2268}
2269
2270static void dpdk_help(void) {
2271        printf("dpdk format module: $Revision: 1752 $\n");
2272        printf("Supported input URIs:\n");
2273        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2274        printf("\tThe -<coreid> is optional \n");
2275        printf("\t e.g. dpdk:0000:01:00.1\n");
2276        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2277        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2278        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2279        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2280        printf("\n");
2281        printf("Supported output URIs:\n");
2282        printf("\tSame format as the input URI.\n");
2283        printf("\t e.g. dpdk:0000:01:00.1\n");
2284        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2285        printf("\n");
2286}
2287
2288static struct libtrace_format_t dpdk = {
2289        "dpdk",
2290        "$Id$",
2291        TRACE_FORMAT_DPDK,
2292        NULL,                               /* probe filename */
2293        NULL,                               /* probe magic */
2294        dpdk_init_input,                    /* init_input */
2295        dpdk_config_input,                  /* config_input */
2296        dpdk_start_input,                   /* start_input */
2297        dpdk_pause_input,                   /* pause_input */
2298        dpdk_init_output,                   /* init_output */
2299        NULL,                               /* config_output */
2300        dpdk_start_output,                  /* start_ouput */
2301        dpdk_fin_input,                     /* fin_input */
2302        dpdk_fin_output,                    /* fin_output */
2303        dpdk_read_packet,                   /* read_packet */
2304        dpdk_prepare_packet,                /* prepare_packet */
2305        dpdk_fin_packet,                    /* fin_packet */
2306        dpdk_write_packet,                  /* write_packet */
2307        dpdk_get_link_type,                 /* get_link_type */
2308        dpdk_get_direction,                 /* get_direction */
2309        dpdk_set_direction,                 /* set_direction */
2310        NULL,                               /* get_erf_timestamp */
2311        dpdk_get_timeval,                   /* get_timeval */
2312        dpdk_get_timespec,                  /* get_timespec */
2313        NULL,                               /* get_seconds */
2314        NULL,                               /* seek_erf */
2315        NULL,                               /* seek_timeval */
2316        NULL,                               /* seek_seconds */
2317        dpdk_get_capture_length,            /* get_capture_length */
2318        dpdk_get_wire_length,               /* get_wire_length */
2319        dpdk_get_framing_length,            /* get_framing_length */
2320        dpdk_set_capture_length,            /* set_capture_length */
2321        NULL,                               /* get_received_packets */
2322        NULL,                               /* get_filtered_packets */
2323        NULL,                               /* get_dropped_packets */
2324        dpdk_get_stats,                     /* get_statistics */
2325        NULL,                               /* get_fd */
2326        dpdk_trace_event,                   /* trace_event */
2327        dpdk_help,                          /* help */
2328        NULL,                               /* next pointer */
2329        {true, 8},                          /* Live, NICs typically have 8 threads */
2330        dpdk_pstart_input,                  /* pstart_input */
2331        dpdk_pread_packets,                 /* pread_packets */
2332        dpdk_pause_input,                   /* ppause */
2333        dpdk_fin_input,                     /* p_fin */
2334        dpdk_pregister_thread,              /* pregister_thread */
2335        dpdk_punregister_thread,            /* punregister_thread */
2336        NULL                                /* get thread stats */
2337};
2338
2339void dpdk_constructor(void) {
2340        register_format(&dpdk);
2341}
Note: See TracBrowser for help on using the repository browser.