source: lib/format_dpdk.c @ c7e547e

cachetimestampsdevelopdpdk-ndagetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since c7e547e was c7e547e, checked in by Shane Alcock <salcock@…>, 4 years ago

Added a dpdkndag format for faster ndag reading

Instead of joining a multicast group and receiving nDAG packets
via the networking stack, this new format uses DPDK to sniff
the multicast direct from the wire. This should save some effort
shuffling the packets back through the kernel's networking stack.

  • Property mode set to 100644
File size: 69.3 KB
RevLine 
[c04929c]1/*
[d7fd648]2 *
[ee6e802]3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
[c04929c]4 * All rights reserved.
5 *
[ee6e802]6 * This file is part of libtrace.
7 *
[d7fd648]8 * This code has been developed by the University of Waikato WAND
[c04929c]9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
[ee6e802]12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
[c04929c]14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
[ee6e802]19 * GNU Lesser General Public License for more details.
[c04929c]20 *
[ee6e802]21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
[c04929c]23 *
24 *
[b148e3b]25 *
[c04929c]26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
[d7fd648]30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
[c04929c]33 * RT-speaking programs.
34 */
35
[50ce607]36#define _GNU_SOURCE
37
[c04929c]38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
[29bbef0]43#include "hash_toeplitz.h"
[c7e547e]44#include "format_dpdk.h"
[c04929c]45
46#ifdef HAVE_INTTYPES_H
47#  include <inttypes.h>
48#else
49# error "Can't find inttypes.h"
50#endif
51
52#include <stdlib.h>
53#include <assert.h>
54#include <unistd.h>
55#include <endian.h>
[3f58e38]56#include <string.h>
57
[d7fd648]58#if HAVE_LIBNUMA
59#include <numa.h>
60#endif
61
[c04929c]62#define MBUF(x) ((struct rte_mbuf *) x)
63/* Get the original placement of the packet data */
64#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
65#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
[50ce607]66#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
67
[ddfc46d]68#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
69#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
70
[c04929c]71#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
[d7fd648]72                        (uint64_t) tv.tv_usec*1000ull)
[c04929c]73#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
[d7fd648]74                        (uint64_t) ts.tv_nsec)
[c04929c]75
76#if RTE_PKTMBUF_HEADROOM != 128
77#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
[d7fd648]78         "any libtrace instance processing these packet must be have the" \
79         "same RTE_PKTMBUF_HEADROOM set"
[c04929c]80#endif
81
82
[4ce6fca]83static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
84/* Memory pools Per NUMA node */
85static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
86
[c04929c]87/* Used by both input and output however some fields are not used
88 * for output */
89struct dpdk_format_data_t {
[ddfc46d]90        int8_t promisc; /* promiscuous mode - RX only */
91        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
92        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
93        uint8_t paused; /* See paused_state */
94        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
95        int snaplen; /* The snap length for the capture - RX only */
96        /* We always have to setup both rx and tx queues even if we don't want them */
97        int nb_rx_buf; /* The number of packet buffers in the rx ring */
98        int nb_tx_buf; /* The number of packet buffers in the tx ring */
99        int nic_numa_node; /* The NUMA node that the NIC is attached to */
100        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
[3f58e38]101#if DPDK_USE_BLACKLIST
[ddfc46d]102        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
[3f58e38]103        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
104#endif
[ddfc46d]105        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
[e47ab4d]106        enum hasher_types hasher_type;
[6f7cd4b]107        uint8_t *rss_key;
[ddfc46d]108        /* To improve single-threaded performance we always batch reading
109         * packets, in a burst, otherwise the parallel library does this for us */
110        struct rte_mbuf* burst_pkts[BURST_SIZE];
111        int burst_size; /* The total number read in the burst */
112        int burst_offset; /* The offset we are into the burst */
113
114        /* Our parallel streams */
115        libtrace_list_t *per_stream;
[c04929c]116};
117
118enum dpdk_addt_hdr_flags {
[ddfc46d]119        INCLUDES_CHECKSUM = 0x1,
120        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
[c04929c]121};
122
[d7fd648]123/**
[c04929c]124 * A structure placed in front of the packet where we can store
125 * additional information about the given packet.
126 * +--------------------------+
127 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
128 * +--------------------------+
129 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
130 * +--------------------------+
[d7fd648]131 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
132 * +--------------------------+
[c04929c]133 * *   hw_timestamp_82580     * 16 bytes Optional
134 * +--------------------------+
135 * |       Packet data        | Variable Size
136 * |                          |
137 */
138struct dpdk_addt_hdr {
[ddfc46d]139        uint64_t timestamp;
140        uint8_t flags;
141        uint8_t direction;
142        uint8_t reserved1;
143        uint8_t reserved2;
144        uint32_t cap_len; /* The size to say the capture is */
[c04929c]145};
146
147/**
148 * We want to blacklist all devices except those on the whitelist
149 * (I say list, but yes it is only the one).
[d7fd648]150 *
[c04929c]151 * The default behaviour of rte_pci_probe() will map every possible device
152 * to its DPDK driver. The DPDK driver will take the ethernet device
153 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
[d7fd648]154 *
155 * So blacklist all devices except the one that we wish to use so that
[c04929c]156 * the others can still be used as standard ethernet ports.
[3f58e38]157 *
158 * @return 0 if successful, otherwise -1 on error.
[c04929c]159 */
[3f58e38]160#if DPDK_USE_BLACKLIST
161static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
[c04929c]162{
163        struct rte_pci_device *dev = NULL;
164        format_data->nb_blacklist = 0;
165
166        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
167
168        TAILQ_FOREACH(dev, &device_list, next) {
[d7fd648]169        if (whitelist != NULL && whitelist->domain == dev->addr.domain
170            && whitelist->bus == dev->addr.bus
171            && whitelist->devid == dev->addr.devid
172            && whitelist->function == dev->addr.function)
173            continue;
[c04929c]174                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
[d7fd648]175                                / sizeof (format_data->blacklist[0])) {
[75e088f]176                        fprintf(stderr, "Warning: too many devices to blacklist consider"
[d7fd648]177                                        " increasing BLACK_LIST_SIZE");
[c04929c]178                        break;
179                }
180                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
181                ++format_data->nb_blacklist;
182        }
183
184        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
[3f58e38]185        return 0;
[c04929c]186}
[3f58e38]187#else /* DPDK_USE_BLACKLIST */
188#include <rte_devargs.h>
[2badac9]189static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
[3f58e38]190{
191        char pci_str[20] = {0};
192        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
[d7fd648]193                 whitelist->domain,
194                 whitelist->bus,
195                 whitelist->devid,
196                 whitelist->function);
[3f58e38]197        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
198                return -1;
199        }
200        return 0;
201}
202#endif
[c04929c]203
204/**
205 * Parse the URI format as a pci address
206 * Fills in addr, note core is optional and is unchanged if
207 * a value for it is not provided.
[d7fd648]208 *
[c04929c]209 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
[d7fd648]210 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
[c04929c]211 */
212static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
[ddfc46d]213        int matches;
214        assert(str);
215        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
216                         &addr->domain, &addr->bus, &addr->devid,
217                         &addr->function, core);
218        if (matches >= 4) {
219                return 0;
220        } else {
221                return -1;
222        }
[c04929c]223}
224
[d7fd648]225/**
226 * Convert a pci address to the numa node it is
227 * connected to.
228 *
229 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
230 * so we can call it before DPDK
231 *
232 * @return -1 if unknown otherwise a number 0 or higher of the numa node
233 */
234static int pci_to_numa(struct rte_pci_addr * dev_addr) {
235        char path[50] = {0};
236        FILE *file;
237
238        /* Read from the system */
239        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
240                 dev_addr->domain,
241                 dev_addr->bus,
242                 dev_addr->devid,
243                 dev_addr->function);
244
245        if((file = fopen(path, "r")) != NULL) {
246                int numa_node = -1;
247                fscanf(file, "%d", &numa_node);
248                fclose(file);
249                return numa_node;
250        }
251        return -1;
252}
253
[c04929c]254#if DEBUG
255/* For debugging */
256static inline void dump_configuration()
257{
[ddfc46d]258        struct rte_config * global_config;
259        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
[d7fd648]260
[ddfc46d]261        if (nb_cpu <= 0) {
262                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
263                       " Falling back to the first core.");
264                nb_cpu = 1; /* fallback to just 1 core */
[d7fd648]265        }
[ddfc46d]266        if (nb_cpu > RTE_MAX_LCORE)
267                nb_cpu = RTE_MAX_LCORE;
268
269        global_config = rte_eal_get_configuration();
270
271        if (global_config != NULL) {
272                int i;
273                fprintf(stderr, "Intel DPDK setup\n"
274                        "---Version      : %s\n"
275                        "---Master LCore : %"PRIu32"\n"
276                        "---LCore Count  : %"PRIu32"\n",
277                        rte_version(),
278                        global_config->master_lcore, global_config->lcore_count);
279
280                for (i = 0 ; i < nb_cpu; i++) {
281                        fprintf(stderr, "   ---Core %d : %s\n", i,
282                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
283                }
[d7fd648]284
[ddfc46d]285                const char * proc_type;
286                switch (global_config->process_type) {
287                case RTE_PROC_AUTO:
288                        proc_type = "auto";
289                        break;
290                case RTE_PROC_PRIMARY:
291                        proc_type = "primary";
292                        break;
293                case RTE_PROC_SECONDARY:
294                        proc_type = "secondary";
295                        break;
296                case RTE_PROC_INVALID:
297                        proc_type = "invalid";
298                        break;
299                default:
300                        proc_type = "something worse than invalid!!";
301                }
302                fprintf(stderr, "---Process Type : %s\n", proc_type);
[d7fd648]303        }
304
[c04929c]305}
306#endif
307
[50ce607]308/**
309 * Expects to be called from the master lcore and moves it to the given dpdk id
310 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
311 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
312 *               and not already in use.
313 * @return 0 is successful otherwise -1 on error.
314 */
[ddfc46d]315static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
316        struct rte_config *cfg = rte_eal_get_configuration();
317        cpu_set_t cpuset;
318        int i;
[50ce607]319
[ddfc46d]320        assert (core < RTE_MAX_LCORE);
321        assert (rte_get_master_lcore() == rte_lcore_id());
[50ce607]322
[ddfc46d]323        if (core == rte_lcore_id())
324                return 0;
[50ce607]325
[ddfc46d]326        /* Make sure we are not overwriting someone else */
327        assert(!rte_lcore_is_enabled(core));
[50ce607]328
[ddfc46d]329        /* Move the core */
330        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
331        cfg->lcore_role[core] = ROLE_RTE;
332        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
333        rte_eal_get_configuration()->master_lcore = core;
334        RTE_PER_LCORE(_lcore_id) = core;
[50ce607]335
[ddfc46d]336        /* Now change the affinity, either mapped to a single core or all accepted */
337        CPU_ZERO(&cpuset);
[50ce607]338
[ddfc46d]339        if (lcore_config[core].detected) {
340                CPU_SET(core, &cpuset);
341        } else {
342                for (i = 0; i < RTE_MAX_LCORE; ++i) {
343                        if (lcore_config[i].detected)
344                                CPU_SET(i, &cpuset);
345                }
[d7fd648]346        }
[50ce607]347
[ddfc46d]348        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
349        if (i != 0) {
350                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
351                return -1;
352        }
353        return 0;
[50ce607]354}
355
[9e429e8]356/**
[9f43919]357 * XXX This is very bad XXX
358 * But we have to do something to allow getopts nesting
359 * Luckly normally the format is last so it doesn't matter
360 * DPDK only supports modern systems so hopefully this
361 * will continue to work
362 */
363struct saved_getopts {
364        char *optarg;
365        int optind;
366        int opterr;
367        int optopt;
368};
369
370static void save_getopts(struct saved_getopts *opts) {
371        opts->optarg = optarg;
372        opts->optind = optind;
373        opts->opterr = opterr;
374        opts->optopt = optopt;
375}
376
377static void restore_getopts(struct saved_getopts *opts) {
378        optarg = opts->optarg;
379        optind = opts->optind;
380        opterr = opts->opterr;
381        optopt = opts->optopt;
382}
383
[95364aa]384static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
[ddfc46d]385                                        char * err, int errlen) {
386        int ret; /* Returned error codes */
387        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
388        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
389        char mem_map[20] = {0}; /* The memory name */
390        long nb_cpu; /* The number of CPUs in the system */
391        long my_cpu; /* The CPU number we want to bind to */
392        int i;
393        struct rte_config *cfg = rte_eal_get_configuration();
[9f43919]394        struct saved_getopts save_opts;
[d7fd648]395
[ddfc46d]396        /* This initialises the Environment Abstraction Layer (EAL)
397         * If we had slave workers these are put into WAITING state
398         *
399         * Basically binds this thread to a fixed core, which we choose as
400         * the last core on the machine (assuming fewer interrupts mapped here).
401         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
402         * "-n" the number of memory channels into the CPU (hardware specific)
403         *      - Most likely to be half the number of ram slots in your machine.
404         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
405         * Controls where in memory packets are stored such that they are spread
406         * across the channels. We just use 1 to be safe.
407         *
408         * Using unique file prefixes mean separate memory is used, unlinking
409         * the two processes. However be careful we still cannot access a
410         * port that already in use.
411         */
412        char* argv[] = {"libtrace",
413                        "-c", cpu_number,
414                        "-n", "1",
415                        "--proc-type", "auto",
416                        "--file-prefix", mem_map,
417                        "-m", "512",
[b585975]418#if DPDK_USE_LOG_LEVEL
419#       if DEBUG
[ddfc46d]420                        "--log-level", "8", /* RTE_LOG_DEBUG */
[b585975]421#       else
[ddfc46d]422                        "--log-level", "5", /* RTE_LOG_WARNING */
[b585975]423#       endif
424#endif
[ddfc46d]425                        NULL};
426        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
427
428#if DEBUG
429        rte_set_log_level(RTE_LOG_DEBUG);
430#else
431        rte_set_log_level(RTE_LOG_WARNING);
432#endif
433
434        /* Get the number of cpu cores in the system and use the last core
435         * on the correct numa node */
436        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
437        if (nb_cpu <= 0) {
438                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
439                       " Falling back to the first core.");
440                nb_cpu = 1; /* fallback to the first core */
441        }
442        if (nb_cpu > RTE_MAX_LCORE)
443                nb_cpu = RTE_MAX_LCORE;
444
445        my_cpu = -1;
446        /* This allows the user to specify the core - we would try to do this
447         * automatically but it's hard to tell that this is secondary
448         * before running rte_eal_init(...). Currently we are limited to 1
449         * instance per core due to the way memory is allocated. */
450        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
451                snprintf(err, errlen, "Failed to parse URI");
452                return -1;
453        }
[c04929c]454
[d7fd648]455#if HAVE_LIBNUMA
456        format_data->nic_numa_node = pci_to_numa(&use_addr);
457        if (my_cpu < 0) {
[773a2a3]458#if DEBUG
[d7fd648]459                /* If we can assign to a core on the same numa node */
[75e088f]460                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
[773a2a3]461#endif
[d7fd648]462                if(format_data->nic_numa_node >= 0) {
463                        int max_node_cpu = -1;
464                        struct bitmask *mask = numa_allocate_cpumask();
465                        assert(mask);
466                        numa_node_to_cpus(format_data->nic_numa_node, mask);
467                        for (i = 0 ; i < nb_cpu; ++i) {
468                                if (numa_bitmask_isbitset(mask,i))
469                                        max_node_cpu = i+1;
470                        }
471                        my_cpu = max_node_cpu;
472                }
473        }
474#endif
475        if (my_cpu < 0) {
476                my_cpu = nb_cpu;
477        }
478
479
[ddfc46d]480        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
481                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
[c04929c]482
[ddfc46d]483        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
484                snprintf(err, errlen,
485                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
486                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
487                return -1;
488        }
[c04929c]489
[773a2a3]490        /* Make our mask with all cores turned on this is so that DPDK
491         * gets all CPU info in older versions */
[ddfc46d]492        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
493        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
[c04929c]494
[2badac9]495#if !DPDK_USE_BLACKLIST
[ddfc46d]496        /* Black list all ports besides the one that we want to use */
[59e8400]497        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
[2badac9]498                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
[ddfc46d]499                         " are you sure the address is correct?: %s", strerror(-ret));
[2badac9]500                return -1;
501        }
502#endif
[704229c]503
504        /* Give the memory map a unique name */
[9b69b49]505        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
[ddfc46d]506        /* rte_eal_init it makes a call to getopt so we need to reset the
507         * global optind variable of getopt otherwise this fails */
[9f43919]508        save_getopts(&save_opts);
[ddfc46d]509        optind = 1;
510        if ((ret = rte_eal_init(argc, argv)) < 0) {
511                snprintf(err, errlen,
512                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
513                return -1;
514        }
[9f43919]515        restore_getopts(&save_opts);
[ddfc46d]516        // These are still running but will never do anything with DPDK v1.7 we
517        // should remove this XXX in the future
518        for(i = 0; i < RTE_MAX_LCORE; ++i) {
519                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
520                        cfg->lcore_role[i] = ROLE_OFF;
521                        cfg->lcore_count--;
522                }
[d7fd648]523        }
[ddfc46d]524        // Only the master should be running
525        assert(cfg->lcore_count == 1);
[50ce607]526
[ddfc46d]527        // TODO XXX TODO
528        dpdk_move_master_lcore(NULL, my_cpu-1);
[50ce607]529
[c04929c]530#if DEBUG
[ddfc46d]531        dump_configuration();
[c04929c]532#endif
[3f58e38]533
534#if DPDK_USE_PMD_INIT
[ddfc46d]535        /* This registers all available NICs with Intel DPDK
536         * These are not loaded until rte_eal_pci_probe() is called.
537         */
538        if ((ret = rte_pmd_init_all()) < 0) {
539                snprintf(err, errlen,
540                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
541                return -1;
542        }
[3f58e38]543#endif
[c04929c]544
[2badac9]545#if DPDK_USE_BLACKLIST
[ddfc46d]546        /* Blacklist all ports besides the one that we want to use */
[3f58e38]547        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
548                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
[ddfc46d]549                         " are you sure the address is correct?: %s", strerror(-ret));
[3f58e38]550                return -1;
551        }
[2badac9]552#endif
[c04929c]553
[b585975]554#if DPDK_USE_PCI_PROBE
[ddfc46d]555        /* This loads DPDK drivers against all ports that are not blacklisted */
[c04929c]556        if ((ret = rte_eal_pci_probe()) < 0) {
[ddfc46d]557                snprintf(err, errlen,
558                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
559                return -1;
560        }
[b585975]561#endif
[c04929c]562
[ddfc46d]563        format_data->nb_ports = rte_eth_dev_count();
[c04929c]564
[ddfc46d]565        if (format_data->nb_ports != 1) {
566                snprintf(err, errlen,
567                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
568                         format_data->nb_ports);
569                return -1;
570        }
[d7fd648]571
[ddfc46d]572        return 0;
[c04929c]573}
574
[c7e547e]575int dpdk_init_input (libtrace_t *libtrace) {
[e3d534f]576        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
[ddfc46d]577        char err[500];
578        err[0] = 0;
579
580        libtrace->format_data = (struct dpdk_format_data_t *)
581                                malloc(sizeof(struct dpdk_format_data_t));
582        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
583        FORMAT(libtrace)->nb_ports = 0;
584        FORMAT(libtrace)->snaplen = 0; /* Use default */
585        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
586        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
587        FORMAT(libtrace)->nic_numa_node = -1;
588        FORMAT(libtrace)->promisc = -1;
589        FORMAT(libtrace)->pktmbuf_pool = NULL;
[3f58e38]590#if DPDK_USE_BLACKLIST
[ddfc46d]591        FORMAT(libtrace)->nb_blacklist = 0;
[3f58e38]592#endif
[ddfc46d]593        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
594        FORMAT(libtrace)->mempool_name[0] = 0;
595        memset(FORMAT(libtrace)->burst_pkts, 0,
596               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
597        FORMAT(libtrace)->burst_size = 0;
598        FORMAT(libtrace)->burst_offset = 0;
[e47ab4d]599        FORMAT(libtrace)->hasher_type = HASHER_BALANCE;
[6f7cd4b]600        FORMAT(libtrace)->rss_key = NULL;
[ddfc46d]601
602        /* Make our first stream */
603        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
604        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
605
606        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
607                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
608                free(libtrace->format_data);
609                libtrace->format_data = NULL;
610                return -1;
611        }
612        return 0;
613}
[c04929c]614
615static int dpdk_init_output(libtrace_out_t *libtrace)
616{
[ddfc46d]617        char err[500];
618        err[0] = 0;
619
620        libtrace->format_data = (struct dpdk_format_data_t *)
621                                malloc(sizeof(struct dpdk_format_data_t));
622        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
623        FORMAT(libtrace)->nb_ports = 0;
624        FORMAT(libtrace)->snaplen = 0; /* Use default */
625        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
626        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
627        FORMAT(libtrace)->nic_numa_node = -1;
628        FORMAT(libtrace)->promisc = -1;
629        FORMAT(libtrace)->pktmbuf_pool = NULL;
[3f58e38]630#if DPDK_USE_BLACKLIST
[ddfc46d]631        FORMAT(libtrace)->nb_blacklist = 0;
[3f58e38]632#endif
[ddfc46d]633        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
634        FORMAT(libtrace)->mempool_name[0] = 0;
635        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
636        FORMAT(libtrace)->burst_size = 0;
637        FORMAT(libtrace)->burst_offset = 0;
638
639        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
640                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
641                free(libtrace->format_data);
642                libtrace->format_data = NULL;
643                return -1;
644        }
645        return 0;
646}
[c04929c]647
648/**
649 * Note here snaplen excludes the MAC checksum. Packets over
650 * the requested snaplen will be dropped. (Excluding MAC checksum)
[d7fd648]651 *
[c04929c]652 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
653 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
654 * is set the maximum size of the returned packet would be 1518 otherwise
655 * 1514 would be the largest size possibly returned.
[d7fd648]656 *
[c04929c]657 */
[c7e547e]658int dpdk_config_input (libtrace_t *libtrace,
[ddfc46d]659                              trace_option_t option,
660                              void *data) {
661        switch (option) {
[d7fd648]662        case TRACE_OPTION_SNAPLEN:
[ddfc46d]663                /* Only support changing snaplen before a call to start is
664                 * made */
665                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
666                        FORMAT(libtrace)->snaplen=*(int*)data;
667                else
668                        return -1;
669                return 0;
670        case TRACE_OPTION_PROMISC:
671                FORMAT(libtrace)->promisc=*(int*)data;
672                return 0;
[6b98325]673        case TRACE_OPTION_HASHER:
[6f7cd4b]674                switch (*((enum hasher_types *) data))
675                {
676                case HASHER_BALANCE:
677                case HASHER_UNIDIRECTIONAL:
678                case HASHER_BIDIRECTIONAL:
679                        FORMAT(libtrace)->hasher_type = *(enum hasher_types*)data;
680                        if (FORMAT(libtrace)->rss_key)
681                                free(FORMAT(libtrace)->rss_key);
682                        FORMAT(libtrace)->rss_key = NULL;
683                        return 0;
684                case HASHER_CUSTOM:
685                        // Let libtrace do this
686                        return -1;
687                }
688                break;
[d7fd648]689        case TRACE_OPTION_FILTER:
[ddfc46d]690                /* TODO filtering */
[d7fd648]691        case TRACE_OPTION_META_FREQ:
692        case TRACE_OPTION_EVENT_REALTIME:
[ddfc46d]693                break;
[d7fd648]694        /* Avoid default: so that future options will cause a warning
695         * here to remind us to implement it, or flag it as
696         * unimplementable
697         */
[ddfc46d]698        }
[c04929c]699
700        /* Don't set an error - trace_config will try to deal with the
701         * option and will set an error if it fails */
[ddfc46d]702        return -1;
[c04929c]703}
704
705/* Can set jumbo frames/ or limit the size of a frame by setting both
706 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
[d7fd648]707 *
[c04929c]708 */
709static struct rte_eth_conf port_conf = {
710        .rxmode = {
[29bbef0]711                .mq_mode = ETH_RSS,
[c04929c]712                .split_hdr_size = 0,
713                .header_split   = 0, /**< Header Split disabled */
714                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
715                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
716                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
[d7fd648]717                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
[c04929c]718#if GET_MAC_CRC_CHECKSUM
719/* So it appears that if hw_strip_crc is turned off the driver will still
720 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
721 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
722 * So lets just add it back on when we receive the packet.
723 */
724                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
725#else
726/* By default strip the MAC checksum because it's a bit of a hack to
727 * actually read these. And don't want to rely on disabling this to actualy
728 * always cut off the checksum in the future
729 */
[d7fd648]730                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
[c04929c]731#endif
732        },
733        .txmode = {
734                .mq_mode = ETH_DCB_NONE,
735        },
[29bbef0]736        .rx_adv_conf = {
737                .rss_conf = {
[b148e3b]738                        .rss_hf = RX_RSS_FLAGS,
[29bbef0]739                },
740        },
[1960910]741        .intr_conf = {
742                .lsc = 1
743        }
[c04929c]744};
745
746static const struct rte_eth_rxconf rx_conf = {
747        .rx_thresh = {
748                .pthresh = 8,/* RX_PTHRESH prefetch */
749                .hthresh = 8,/* RX_HTHRESH host */
750                .wthresh = 4,/* RX_WTHRESH writeback */
751        },
[ddfc46d]752        .rx_free_thresh = 0,
753        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
[c04929c]754};
755
756static const struct rte_eth_txconf tx_conf = {
757        .tx_thresh = {
[ddfc46d]758                /*
759                 * TX_PTHRESH prefetch
760                 * Set on the NIC, if the number of unprocessed descriptors to queued on
761                 * the card fall below this try grab at least hthresh more unprocessed
762                 * descriptors.
763                 */
[0c866b0]764                .pthresh = 36,
765
[ddfc46d]766                /* TX_HTHRESH host
767                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
768                 */
[0c866b0]769                .hthresh = 0,
[d7fd648]770
[ddfc46d]771                /* TX_WTHRESH writeback
772                 * Set on the NIC, the number of sent descriptors before writing back
773                 * status to confirm the transmission. This is done more efficiently as
774                 * a bulk DMA-transfer rather than writing one at a time.
775                 * Similar to tx_free_thresh however this is applied to the NIC, where
776                 * as tx_free_thresh is when DPDK will check these. This is extended
777                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
778                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
779                 */
[0c866b0]780                .wthresh = 4,
[c04929c]781        },
[0c866b0]782
[ddfc46d]783        /* Used internally by DPDK rather than passed to the NIC. The number of
784         * packet descriptors to send before checking for any responses written
785         * back (to confirm the transmission). Default = 32 if set to 0)
786         */
[0c866b0]787        .tx_free_thresh = 0,
788
[ddfc46d]789        /* This is the Report Status threshold, used by 10Gbit cards,
790         * This signals the card to only write back status (such as
791         * transmission successful) after this minimum number of transmit
792         * descriptors are seen. The default is 32 (if set to 0) however if set
793         * to greater than 1 TX wthresh must be set to zero, because this is kindof
794         * a replacement. See the dpdk programmers guide for more restrictions.
795         */
[0c866b0]796        .tx_rs_thresh = 1,
[c04929c]797};
798
[1960910]799/**
800 * A callback for a link state change (LSC).
801 *
802 * Packets may be received before this notification. In fact the DPDK IGXBE
803 * driver likes to put a delay upto 5sec before sending this.
804 *
805 * We use this to ensure the link speed is correct for our timestamp
806 * calculations. Because packets might be received before the link up we still
807 * update this when the packet is received.
808 *
809 * @param port The DPDK port
810 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
811 * @param cb_arg The dpdk_format_data_t structure associated with the format
812 */
813static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
814                              void *cb_arg) {
815        struct dpdk_format_data_t * format_data = cb_arg;
816        struct rte_eth_link link_info;
817        assert(event == RTE_ETH_EVENT_INTR_LSC);
818        assert(port == format_data->port);
819
820        rte_eth_link_get_nowait(port, &link_info);
821
822        if (link_info.link_status)
823                format_data->link_speed = link_info.link_speed;
824        else
825                format_data->link_speed = 0;
826
827#if DEBUG
828        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
829                link_info.link_status ? "up" : "down",
830                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
831                                          "full-duplex" : "half-duplex",
832                (int) link_info.link_speed);
833#endif
834
835        /* Turns out DPDK drivers might not come back up if the link speed
836         * changes. So we reset the autoneg procedure. This is very unsafe
837         * we have have threads reading packets and we stop the port. */
838#if 0
839        if (!link_info.link_status) {
840                int ret;
841                rte_eth_dev_stop(port);
842                ret = rte_eth_dev_start(port);
843                if (ret < 0) {
844                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
845                                strerror(-ret));
846                }
847        }
848#endif
849}
850
[4ce6fca]851/** Reserve a DPDK lcore ID for a thread globally.
852 *
853 * @param real If true allocate a real lcore, otherwise allocate a core which
854 * does not exist on the local machine.
855 * @param socket the prefered NUMA socket - only used if a real core is requested
856 * @return a valid core, which can later be used with dpdk_register_lcore() or a
857 * -1 if have run out of cores.
858 *
859 * If any thread is reading or freeing packets we need to register it here
860 * due to TLS caches in the memory pools.
861 */
862static int dpdk_reserve_lcore(bool real, int socket) {
863        int new_id = -1;
864        int i;
865        struct rte_config *cfg = rte_eal_get_configuration();
[b148e3b]866        (void) socket;
[4ce6fca]867
868        pthread_mutex_lock(&dpdk_lock);
869        /* If 'reading packets' fill in cores from 0 up and bind affinity
870         * otherwise start from the MAX core (which is also the master) and work backwards
871         * in this case physical cores on the system will not exist so we don't bind
872         * these to any particular physical core */
873        if (real) {
874#if HAVE_LIBNUMA
875                for (i = 0; i < RTE_MAX_LCORE; ++i) {
876                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
877                                new_id = i;
878                                if (!lcore_config[i].detected)
879                                        new_id = -1;
880                                break;
881                        }
882                }
883#endif
884                /* Retry without the the numa restriction */
885                if (new_id == -1) {
886                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
887                                if (!rte_lcore_is_enabled(i)) {
888                                        new_id = i;
889                                        if (!lcore_config[i].detected)
890                                                fprintf(stderr, "Warning the"
891                                                        " number of 'reading' "
892                                                        "threads exceed cores\n");
893                                        break;
894                                }
895                        }
896                }
897        } else {
898                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
899                        if (!rte_lcore_is_enabled(i)) {
900                                new_id = i;
901                                break;
902                        }
903                }
904        }
905
906        if (new_id != -1) {
907                /* Enable the core in global DPDK structs */
908                cfg->lcore_role[new_id] = ROLE_RTE;
909                cfg->lcore_count++;
910        }
911
912        pthread_mutex_unlock(&dpdk_lock);
913        return new_id;
914}
915
916/** Register a thread as a lcore
917 * @param libtrace any error is set against libtrace on exit
918 * @param real If this is a true lcore we will bind its affinty to the
919 * requested core.
920 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
921 * @return 0, if successful otherwise -1 if an error occured (details are stored
922 * in libtrace)
923 *
924 * @note This must be called from the thread being registered.
925 */
926static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
927        int ret;
928        RTE_PER_LCORE(_lcore_id) = lcore;
929
930        /* Set affinity bind to corresponding core */
931        if (real) {
932                cpu_set_t cpuset;
933                CPU_ZERO(&cpuset);
934                CPU_SET(rte_lcore_id(), &cpuset);
935                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
936                if (ret != 0) {
937                        trace_set_err(libtrace, errno, "Warning "
938                                      "pthread_setaffinity_np failed");
939                        return -1;
940                }
941        }
942
943        return 0;
944}
945
946/** Allocates a new dpdk packet buffer memory pool.
947 *
948 * @param n The number of threads
949 * @param pkt_size The packet size we need ot store
950 * @param socket_id The NUMA socket id
951 * @param A new mempool, if NULL query the DPDK library for the error code
952 * see rte_mempool_create() documentation.
953 *
954 * This allocates a new pool or recycles an existing memory pool.
955 * Call dpdk_free_memory() to free the memory.
956 * We cannot delete memory so instead we store the pools, allowing them to be
957 * re-used.
958 */
959static struct rte_mempool *dpdk_alloc_memory(unsigned n,
960                                             unsigned pkt_size,
961                                             int socket_id) {
962        struct rte_mempool *ret;
963        size_t j,k;
964        char name[MEMPOOL_NAME_LEN];
965
966        /* Add on packet size overheads */
967        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
968
969        pthread_mutex_lock(&dpdk_lock);
970
971        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
972                /* Best guess go for zero */
973                socket_id = 0;
974        }
975
976        /* Find a valid pool */
977        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
978                if (mem_pools[socket_id][j]->size >= n &&
979                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
980                        break;
981                }
982        }
983
984        /* Find the end (+1) of the list */
985        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
986
987        if (mem_pools[socket_id][j]) {
988                ret = mem_pools[socket_id][j];
989                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
990                mem_pools[socket_id][k-1] = NULL;
991                mem_pools[socket_id][j] = NULL;
992        } else {
993                static uint32_t test = 10;
994                test++;
995                snprintf(name, MEMPOOL_NAME_LEN,
996                         "libtrace_pool_%"PRIu32, test);
997
998                ret = rte_mempool_create(name, n, pkt_size,
999                                         128, sizeof(struct rte_pktmbuf_pool_private),
1000                                         rte_pktmbuf_pool_init, NULL,
1001                                         rte_pktmbuf_init, NULL,
1002                                         socket_id, 0);
1003        }
1004
1005        pthread_mutex_unlock(&dpdk_lock);
1006        return ret;
1007}
1008
1009/** Stores the memory against the DPDK library.
1010 *
1011 * @param mempool The mempool to free
1012 * @param socket_id The NUMA socket this mempool was allocated upon.
1013 *
1014 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1015 * store the memory shared globally against the format.
1016 */
1017static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1018        size_t i;
1019        pthread_mutex_lock(&dpdk_lock);
1020
1021        /* We should have all entries back in the mempool */
1022        rte_mempool_audit(mempool);
1023        if (!rte_mempool_full(mempool)) {
1024                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1025                        "free all packets before finishing a trace\n",
[b148e3b]1026                        rte_mempool_avail_count(mempool), mempool->size);
[4ce6fca]1027        }
1028
1029        /* Find the end (+1) of the list */
1030        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1031
1032        if (i >= RTE_MAX_LCORE) {
1033                fprintf(stderr, "Too many memory pools, dropping this one\n");
1034        } else {
1035                mem_pools[socket_id][i] = mempool;
1036        }
1037
1038        pthread_mutex_unlock(&dpdk_lock);
1039}
1040
[ddfc46d]1041/* Attach memory to the port and start (or restart) the port/s.
[c04929c]1042 */
[ddfc46d]1043static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1044                              char *err, int errlen, uint16_t rx_queues) {
1045        int ret, i;
1046        struct rte_eth_link link_info; /* Wait for link */
[e3d534f]1047        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
[ddfc46d]1048
1049        /* Already started */
1050        if (format_data->paused == DPDK_RUNNING)
1051                return 0;
[c04929c]1052
[ddfc46d]1053        /* First time started we need to alloc our memory, doing this here
1054         * rather than in environment setup because we don't have snaplen then */
1055        if (format_data->paused == DPDK_NEVER_STARTED) {
1056                if (format_data->snaplen == 0) {
1057                        format_data->snaplen = RX_MBUF_SIZE;
1058                        port_conf.rxmode.jumbo_frame = 0;
1059                        port_conf.rxmode.max_rx_pkt_len = 0;
1060                } else {
1061                        /* Use jumbo frames */
1062                        port_conf.rxmode.jumbo_frame = 1;
1063                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1064                }
[d7fd648]1065
[c04929c]1066#if GET_MAC_CRC_CHECKSUM
[ddfc46d]1067                /* This is additional overhead so make sure we allow space for this */
1068                format_data->snaplen += ETHER_CRC_LEN;
[c04929c]1069#endif
1070#if HAS_HW_TIMESTAMPS_82580
[ddfc46d]1071                format_data->snaplen += sizeof(struct hw_timestamp_82580);
[c04929c]1072#endif
1073
[ddfc46d]1074                /* Create the mbuf pool, which is the place packets are allocated
1075                 * from - There is no free function (I cannot see one).
1076                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1077                 * allocate however that extra 1 packet is not used.
1078                 * (I assume <= vs < error some where in DPDK code)
1079                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1080                 * so that will fill the new buffer and wait until slots in the
1081                 * ring become available.
1082                 */
[c04929c]1083#if DEBUG
[ddfc46d]1084                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
[c04929c]1085#endif
[4ce6fca]1086                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1087                                                              format_data->snaplen,
1088                                                              format_data->nic_numa_node);
[ddfc46d]1089
1090                if (format_data->pktmbuf_pool == NULL) {
1091                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1092                                 "pool failed: %s", strerror(rte_errno));
1093                        return -1;
1094                }
[d7fd648]1095        }
[29bbef0]1096
[6f7cd4b]1097        /* Generate the hash key, based on the device */
1098        uint8_t rss_size = 52; // 52 for i40e, 40 for others, use the largest by default
1099        // In new versions DPDK we can query the size
[e47ab4d]1100#if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
[6f7cd4b]1101        struct rte_eth_dev_info dev_info;
1102        rte_eth_dev_info_get(format_data->port, &dev_info);
1103        rss_size = dev_info.hash_key_size;
[e47ab4d]1104#endif
[6f7cd4b]1105        if (rss_size != 0) {
1106                format_data->rss_key = malloc(rss_size);
1107                if (format_data->hasher_type == HASHER_BIDIRECTIONAL) {
1108                        toeplitz_ncreate_bikey(format_data->rss_key, rss_size);
1109                } else {
1110                        toeplitz_ncreate_unikey(format_data->rss_key, rss_size);
1111                }
1112                port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1113#if RTE_VERSION >= RTE_VERSION_NUM(1, 7, 0, 1)
1114                port_conf.rx_adv_conf.rss_conf.rss_key_len = rss_size;
1115#endif
1116        } else {
1117                fprintf(stderr, "DPDK couldn't configure RSS hashing!");
[e47ab4d]1118        }
1119
[ddfc46d]1120        /* ----------- Now do the setup for the port mapping ------------ */
1121        /* Order of calls must be
1122         * rte_eth_dev_configure()
1123         * rte_eth_tx_queue_setup()
1124         * rte_eth_rx_queue_setup()
1125         * rte_eth_dev_start()
1126         * other rte_eth calls
1127         */
[29bbef0]1128
[ddfc46d]1129        /* This must be called first before another *eth* function
1130         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1131        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1132        if (ret < 0) {
1133                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1134                         " %"PRIu8" : %s", format_data->port,
1135                         strerror(-ret));
1136                return -1;
[d7fd648]1137        }
[29bbef0]1138#if DEBUG
[ddfc46d]1139        fprintf(stderr, "Doing dev configure\n");
[29bbef0]1140#endif
[ddfc46d]1141        /* Initialise the TX queue a minimum value if using this port for
1142         * receiving. Otherwise a larger size if writing packets.
1143         */
1144        ret = rte_eth_tx_queue_setup(format_data->port,
1145                                     0 /* queue XXX */,
1146                                     format_data->nb_tx_buf,
1147                                     SOCKET_ID_ANY,
[a984307]1148                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
[ddfc46d]1149        if (ret < 0) {
1150                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1151                         " on port %"PRIu8" : %s", format_data->port,
1152                         strerror(-ret));
1153                return -1;
[d7fd648]1154        }
1155
[ddfc46d]1156        /* Attach memory to our RX queues */
1157        for (i=0; i < rx_queues; i++) {
[4ce6fca]1158                dpdk_per_stream_t *stream;
[29bbef0]1159#if DEBUG
[4ce6fca]1160                fprintf(stderr, "Configuring queue %d\n", i);
[d7fd648]1161#endif
1162
[ddfc46d]1163                /* Add storage for the stream */
1164                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1165                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1166                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1167                stream->queue_id = i;
[50ce607]1168
[e3d534f]1169                if (stream->lcore == -1)
1170                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
[4ce6fca]1171
1172                if (stream->lcore == -1) {
1173                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1174                                 ". Too many threads?");
1175                        return -1;
1176                }
1177
1178                if (stream->mempool == NULL) {
1179                        stream->mempool = dpdk_alloc_memory(
1180                                                  format_data->nb_rx_buf*2,
1181                                                  format_data->snaplen,
1182                                                  rte_lcore_to_socket_id(stream->lcore));
1183
1184                        if (stream->mempool == NULL) {
1185                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1186                                         "pool failed: %s", strerror(rte_errno));
1187                                return -1;
1188                        }
1189                }
1190
[ddfc46d]1191                /* Initialise the RX queue with some packets from memory */
1192                ret = rte_eth_rx_queue_setup(format_data->port,
1193                                             stream->queue_id,
1194                                             format_data->nb_rx_buf,
1195                                             format_data->nic_numa_node,
[a984307]1196                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
[4ce6fca]1197                                             stream->mempool);
[29bbef0]1198                if (ret < 0) {
[ddfc46d]1199                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1200                                 " RX queue on port %"PRIu8" : %s",
1201                                 format_data->port,
1202                                 strerror(-ret));
[29bbef0]1203                        return -1;
1204                }
1205        }
[d7fd648]1206
[29bbef0]1207#if DEBUG
[ddfc46d]1208        fprintf(stderr, "Doing start device\n");
[d7fd648]1209#endif
[e3d534f]1210        rte_eth_stats_reset(format_data->port);
[ddfc46d]1211        /* Start device */
1212        ret = rte_eth_dev_start(format_data->port);
1213        if (ret < 0) {
1214                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1215                         strerror(-ret));
1216                return -1;
1217        }
[d7fd648]1218
[ddfc46d]1219        /* Default promiscuous to on */
1220        if (format_data->promisc == -1)
1221                format_data->promisc = 1;
[d7fd648]1222
[ddfc46d]1223        if (format_data->promisc == 1)
1224                rte_eth_promiscuous_enable(format_data->port);
1225        else
1226                rte_eth_promiscuous_disable(format_data->port);
[d7fd648]1227
[ddfc46d]1228        /* We have now successfully started/unpased */
1229        format_data->paused = DPDK_RUNNING;
[d7fd648]1230
1231
[ddfc46d]1232        /* Register a callback for link state changes */
1233        ret = rte_eth_dev_callback_register(format_data->port,
1234                                            RTE_ETH_EVENT_INTR_LSC,
1235                                            dpdk_lsc_callback,
1236                                            format_data);
[1960910]1237#if DEBUG
[ddfc46d]1238        if (ret)
1239                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1240                        ret, strerror(-ret));
[1960910]1241#endif
1242
[ddfc46d]1243        /* Get the current link status */
1244        rte_eth_link_get_nowait(format_data->port, &link_info);
1245        format_data->link_speed = link_info.link_speed;
[29bbef0]1246#if DEBUG
[ddfc46d]1247        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1248                (int) link_info.link_duplex, (int) link_info.link_speed);
[29bbef0]1249#endif
1250
[ddfc46d]1251        return 0;
[29bbef0]1252}
[c04929c]1253
[c7e547e]1254int dpdk_start_input (libtrace_t *libtrace) {
[ddfc46d]1255        char err[500];
1256        err[0] = 0;
[c04929c]1257
[e3d534f]1258        /* Make sure we don't reserve an extra thread for this */
1259        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1260
[ddfc46d]1261        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1262                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1263                free(libtrace->format_data);
1264                libtrace->format_data = NULL;
1265                return -1;
1266        }
1267        return 0;
[c04929c]1268}
1269
[50ce607]1270static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
[ddfc46d]1271        struct rte_eth_dev_info dev_info;
1272        rte_eth_dev_info_get(port_id, &dev_info);
1273        return dev_info.max_rx_queues;
[8af0d01]1274}
1275
[50ce607]1276static inline size_t dpdk_processor_count () {
[ddfc46d]1277        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1278        if (nb_cpu <= 0)
1279                return 1;
1280        else
1281                return (size_t) nb_cpu;
[50ce607]1282}
1283
[c7e547e]1284int dpdk_pstart_input (libtrace_t *libtrace) {
[ddfc46d]1285        char err[500];
1286        int i=0, phys_cores=0;
1287        int tot = libtrace->perpkt_thread_count;
[e3d534f]1288        libtrace_list_node_t *n;
[ddfc46d]1289        err[0] = 0;
1290
1291        if (rte_lcore_id() != rte_get_master_lcore())
1292                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1293                        " from the master DPDK thread!\n");
1294
1295        /* If the master is not on the last thread we move it there */
1296        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1297                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1298                        return -1;
[d7fd648]1299        }
[50ce607]1300
[ddfc46d]1301        /* Don't exceed the number of cores in the system/detected by dpdk
1302         * We don't have to force this but performance wont be good if we don't */
1303        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1304                if (lcore_config[i].detected) {
[e3d534f]1305                        if (rte_lcore_is_enabled(i)) {
1306#if DEBUG
[ddfc46d]1307                                fprintf(stderr, "Found core %d already in use!\n", i);
[e3d534f]1308#endif
1309                        } else {
[ddfc46d]1310                                phys_cores++;
[e3d534f]1311                        }
[ddfc46d]1312                }
1313        }
[e3d534f]1314        /* If we are restarting we have already allocated some threads as such
1315         * we add these back to the count for this calculation */
1316        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1317                dpdk_per_stream_t * stream = n->data;
1318                if (stream->lcore != -1)
1319                        phys_cores++;
1320        }
[ddfc46d]1321
1322        tot = MIN(libtrace->perpkt_thread_count,
1323                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
[d7fd648]1324        tot = MIN(tot, phys_cores);
[50ce607]1325
[ddfc46d]1326#if DEBUG
1327        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1328                libtrace->perpkt_thread_count, phys_cores);
1329#endif
[d7fd648]1330
[ddfc46d]1331        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1332                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1333                free(libtrace->format_data);
1334                libtrace->format_data = NULL;
1335                return -1;
1336        }
[50ce607]1337
[ddfc46d]1338        /* Make sure we only start the number that we should */
1339        libtrace->perpkt_thread_count = tot;
1340        return 0;
[50ce607]1341}
1342
1343/**
1344 * Register a thread with the DPDK system,
1345 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1346 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1347 * gives it.
1348 *
[12ae766]1349 * We then allow a mapper thread to be started on every real core as DPDK would,
[50ce607]1350 * we also bind these to the corresponding CPU cores.
[d7fd648]1351 *
[50ce607]1352 * @param libtrace A pointer to the trace
1353 * @param reading True if the thread will be used to read packets, i.e. will
1354 *                call pread_packet(), false if thread used to process packet
1355 *                in any other manner including statistics functions.
1356 */
[c7e547e]1357int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
[50ce607]1358{
[4ce6fca]1359#if DEBUG
[d7fd648]1360        char name[99];
[5e43b8b]1361        name[0] = 0;
[915b93c]1362#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
[d7fd648]1363        pthread_getname_np(pthread_self(),
[ddfc46d]1364                           name, sizeof(name));
[4ce6fca]1365#endif
[5e43b8b]1366#endif
[ddfc46d]1367        if (reading) {
[4ce6fca]1368                dpdk_per_stream_t *stream;
1369                /* Attach our thread */
[ddfc46d]1370                if(t->type == THREAD_PERPKT) {
1371                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1372                        if (t->format_data == NULL) {
1373                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1374                                              "Too many threads registered");
1375                                return -1;
1376                        }
1377                } else {
1378                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1379                }
[4ce6fca]1380                stream = t->format_data;
1381#if DEBUG
1382                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1383#endif
1384                return dpdk_register_lcore(libtrace, true, stream->lcore);
1385        } else {
1386                int lcore = dpdk_reserve_lcore(reading, 0);
1387                if (lcore == -1) {
1388                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1389                                      " for DPDK");
1390                        return -1;
1391                }
1392#if DEBUG
1393                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1394#endif
1395                return dpdk_register_lcore(libtrace, false, lcore);
[d7fd648]1396        }
[4ce6fca]1397
[ddfc46d]1398        return 0;
[50ce607]1399}
1400
1401/**
1402 * Unregister a thread with the DPDK system.
[d7fd648]1403 *
[50ce607]1404 * Only previously registered threads should be calling this just before
1405 * they are destroyed.
1406 */
[c7e547e]1407void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
[50ce607]1408{
[ddfc46d]1409        struct rte_config *cfg = rte_eal_get_configuration();
1410
1411        assert(rte_lcore_id() < RTE_MAX_LCORE);
[4ce6fca]1412        pthread_mutex_lock(&dpdk_lock);
[ddfc46d]1413        /* Skip if master */
1414        if (rte_lcore_id() == rte_get_master_lcore()) {
1415                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
[4ce6fca]1416                pthread_mutex_unlock(&dpdk_lock);
[ddfc46d]1417                return;
1418        }
[50ce607]1419
[ddfc46d]1420        /* Disable this core in global DPDK structs */
1421        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1422        cfg->lcore_count--;
1423        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1424        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
[4ce6fca]1425        pthread_mutex_unlock(&dpdk_lock);
[d7fd648]1426        return;
[29bbef0]1427}
1428
[c04929c]1429static int dpdk_start_output(libtrace_out_t *libtrace)
1430{
[ddfc46d]1431        char err[500];
1432        err[0] = 0;
[d7fd648]1433
[ddfc46d]1434        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1435                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1436                free(libtrace->format_data);
1437                libtrace->format_data = NULL;
1438                return -1;
1439        }
1440        return 0;
[c04929c]1441}
1442
[c7e547e]1443int dpdk_pause_input(libtrace_t * libtrace) {
[ddfc46d]1444        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1445        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1446        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
[d7fd648]1447#if DEBUG
[ddfc46d]1448                fprintf(stderr, "Pausing DPDK port\n");
[c04929c]1449#endif
[ddfc46d]1450                rte_eth_dev_stop(FORMAT(libtrace)->port);
1451                FORMAT(libtrace)->paused = DPDK_PAUSED;
1452                /* Empty the queue of packets */
1453                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1454                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1455                }
1456                FORMAT(libtrace)->burst_offset = 0;
1457                FORMAT(libtrace)->burst_size = 0;
[d7fd648]1458
[ddfc46d]1459                for (; tmp != NULL; tmp = tmp->next) {
1460                        dpdk_per_stream_t *stream = tmp->data;
1461                        stream->ts_last_sys = 0;
[c04929c]1462#if HAS_HW_TIMESTAMPS_82580
[ddfc46d]1463                        stream->ts_first_sys = 0;
[c04929c]1464#endif
[ddfc46d]1465                }
1466
1467        }
1468        return 0;
[c04929c]1469}
1470
[d7fd648]1471static int dpdk_write_packet(libtrace_out_t *trace,
[ddfc46d]1472                             libtrace_packet_t *packet){
1473        struct rte_mbuf* m_buff[1];
[d7fd648]1474
[ddfc46d]1475        int wirelen = trace_get_wire_length(packet);
1476        int caplen = trace_get_capture_length(packet);
[d7fd648]1477
[ddfc46d]1478        /* Check for a checksum and remove it */
1479        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1480            wirelen == caplen)
1481                caplen -= ETHER_CRC_LEN;
[c04929c]1482
[ddfc46d]1483        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1484        if (m_buff[0] == NULL) {
1485                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1486                return -1;
1487        } else {
1488                int ret;
1489                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1490                do {
1491                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1492                } while (ret != 1);
1493        }
[c04929c]1494
[ddfc46d]1495        return 0;
[c04929c]1496}
1497
[c7e547e]1498int dpdk_fin_input(libtrace_t * libtrace) {
[4ce6fca]1499        libtrace_list_node_t * n;
[ddfc46d]1500        /* Free our memory structures */
1501        if (libtrace->format_data != NULL) {
[4ce6fca]1502
[ddfc46d]1503                if (FORMAT(libtrace)->port != 0xFF)
1504                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1505                                                        RTE_ETH_EVENT_INTR_LSC,
1506                                                        dpdk_lsc_callback,
1507                                                        FORMAT(libtrace));
[4ce6fca]1508                /* Close the device completely, device cannot be restarted */
[1960910]1509                rte_eth_dev_close(FORMAT(libtrace)->port);
[4ce6fca]1510
1511                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1512                                 FORMAT(libtrace)->nic_numa_node);
1513
1514                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1515                        dpdk_per_stream_t * stream = n->data;
1516                        if (stream->mempool)
1517                                dpdk_free_memory(stream->mempool,
1518                                                 rte_lcore_to_socket_id(stream->lcore));
1519                }
1520
[ddfc46d]1521                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
[1960910]1522                /* filter here if we used it */
[6f7cd4b]1523                if (FORMAT(libtrace)->rss_key)
1524                        free(FORMAT(libtrace)->rss_key);
[c04929c]1525                free(libtrace->format_data);
1526        }
1527
[ddfc46d]1528        return 0;
[c04929c]1529}
1530
1531
1532static int dpdk_fin_output(libtrace_out_t * libtrace) {
[ddfc46d]1533        /* Free our memory structures */
1534        if (libtrace->format_data != NULL) {
1535                /* Close the device completely, device cannot be restarted */
1536                if (FORMAT(libtrace)->port != 0xFF)
1537                        rte_eth_dev_close(FORMAT(libtrace)->port);
1538                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1539                /* filter here if we used it */
[c04929c]1540                free(libtrace->format_data);
1541        }
1542
[ddfc46d]1543        return 0;
[c04929c]1544}
1545
[d7fd648]1546/**
1547 * Get the start of the additional header that we added to a packet.
[c04929c]1548 */
1549static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
[ddfc46d]1550        assert(packet);
1551        assert(packet->buffer);
1552        /* Our header sits straight after the mbuf header */
1553        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
[c04929c]1554}
1555
1556static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
[ddfc46d]1557        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1558        return hdr->cap_len;
[c04929c]1559}
1560
1561static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
[ddfc46d]1562        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1563        if (size > hdr->cap_len) {
1564                /* Cannot make a packet bigger */
[c04929c]1565                return trace_get_capture_length(packet);
1566        }
1567
[ddfc46d]1568        /* Reset the cached capture length first*/
1569        packet->capture_length = -1;
1570        hdr->cap_len = (uint32_t) size;
[c04929c]1571        return trace_get_capture_length(packet);
1572}
1573
1574static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
[ddfc46d]1575        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1576        int org_cap_size; /* The original capture size */
1577        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1578                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1579                               sizeof(struct hw_timestamp_82580);
1580        } else {
1581                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1582        }
1583        if (hdr->flags & INCLUDES_CHECKSUM) {
1584                return org_cap_size;
1585        } else {
1586                /* DPDK packets are always TRACE_TYPE_ETH packets */
1587                return org_cap_size + ETHER_CRC_LEN;
1588        }
[c04929c]1589}
[ddfc46d]1590
[c04929c]1591static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
[ddfc46d]1592        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1593        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1594                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1595                                sizeof(struct hw_timestamp_82580);
1596        else
1597                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
[c04929c]1598}
1599
[c7e547e]1600int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
[ddfc46d]1601                               libtrace_packet_t *packet, void *buffer,
1602                               libtrace_rt_types_t rt_type, uint32_t flags) {
1603        assert(packet);
1604        if (packet->buffer != buffer &&
1605            packet->buf_control == TRACE_CTRL_PACKET) {
1606                free(packet->buffer);
1607        }
[c04929c]1608
[ddfc46d]1609        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1610                packet->buf_control = TRACE_CTRL_PACKET;
1611        else
1612                packet->buf_control = TRACE_CTRL_EXTERNAL;
[c04929c]1613
[ddfc46d]1614        packet->buffer = buffer;
1615        packet->header = buffer;
[c04929c]1616
[ddfc46d]1617        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1618        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1619        packet->type = rt_type;
1620        return 0;
1621}
[d7fd648]1622
1623/**
1624 * Given a packet size and a link speed, computes the
1625 * time to transmit in nanoseconds.
1626 *
1627 * @param format_data The dpdk format data from which we get the link speed
1628 *        and if unset updates it in a thread safe manner
1629 * @param pkt_size The size of the packet in bytes
1630 * @return The wire time in nanoseconds
1631 */
1632static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1633        uint32_t wire_time;
1634        /* 20 extra bytes of interframe gap and preamble */
1635# if GET_MAC_CRC_CHECKSUM
1636        wire_time = ((pkt_size + 20) * 8000);
1637# else
1638        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1639# endif
1640
1641        /* Division is really slow and introduces a pipeline stall
1642         * The compiler will optimise this into magical multiplication and shifting
1643         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1644         */
1645retry_calc_wiretime:
1646        switch (format_data->link_speed) {
[b148e3b]1647        case ETH_SPEED_NUM_40G:
1648                wire_time /=  ETH_SPEED_NUM_40G;
[d7fd648]1649                break;
[b148e3b]1650        case ETH_SPEED_NUM_20G:
1651                wire_time /= ETH_SPEED_NUM_20G;
[d7fd648]1652                break;
[b148e3b]1653        case ETH_SPEED_NUM_10G:
1654                wire_time /= ETH_SPEED_NUM_10G;
[d7fd648]1655                break;
[b148e3b]1656        case ETH_SPEED_NUM_1G:
1657                wire_time /= ETH_SPEED_NUM_1G;
[d7fd648]1658                break;
1659        case 0:
1660                {
1661                /* Maybe the link was down originally, but now it should be up */
1662                struct rte_eth_link link = {0};
1663                rte_eth_link_get_nowait(format_data->port, &link);
1664                if (link.link_status && link.link_speed) {
1665                        format_data->link_speed = link.link_speed;
1666#ifdef DEBUG
1667                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1668#endif
1669                        goto retry_calc_wiretime;
1670                }
1671                /* We don't know the link speed, make sure numbers are counting up */
1672                wire_time = 1;
1673                break;
1674                }
1675        default:
1676                wire_time /= format_data->link_speed;
1677        }
1678        return wire_time;
1679}
1680
[ddfc46d]1681/**
[d7fd648]1682 * Does any extra preperation to all captured packets
1683 * This includes adding our extra header to it with the timestamp,
1684 * and any snapping
1685 *
1686 * @param format_data The DPDK format data
1687 * @param plc The DPDK per lcore format data
1688 * @param pkts An array of size nb_pkts of DPDK packets
[c04929c]1689 */
[ddfc46d]1690static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1691                                   struct dpdk_per_stream_t *plc,
1692                                   struct rte_mbuf **pkts,
1693                                   size_t nb_pkts) {
[10c47a0]1694        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
[d7fd648]1695        struct dpdk_addt_hdr *hdr;
1696        size_t i;
1697        uint64_t cur_sys_time_ns;
[c04929c]1698#if HAS_HW_TIMESTAMPS_82580
[d7fd648]1699        struct hw_timestamp_82580 *hw_ts;
1700        uint64_t estimated_wraps;
[c04929c]1701#else
[d7fd648]1702
1703#endif
1704
1705#if USE_CLOCK_GETTIME
1706        struct timespec cur_sys_time = {0};
1707        /* This looks terrible and I feel bad doing it. But it's OK
1708         * on new kernels, because this is a fast vsyscall */
1709        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1710        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1711#else
1712        struct timeval cur_sys_time = {0};
1713        /* Also a fast vsyscall */
1714        gettimeofday(&cur_sys_time, NULL);
1715        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
[c04929c]1716#endif
1717
[d7fd648]1718        /* The system clock is not perfect so when running
1719         * at linerate we could timestamp a packet in the past.
1720         * To avoid this we munge the timestamp to appear 1ns
1721         * after the previous packet. We should eventually catch up
1722         * to system time since a 64byte packet on a 10G link takes 67ns.
1723         *
1724         * Note with parallel readers timestamping packets
1725         * with duplicate stamps or out of order is unavoidable without
1726         * hardware timestamping from the NIC.
1727         */
1728#if !HAS_HW_TIMESTAMPS_82580
1729        if (plc->ts_last_sys >= cur_sys_time_ns) {
1730                cur_sys_time_ns = plc->ts_last_sys + 1;
1731        }
1732#endif
1733
[10c47a0]1734        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
[d7fd648]1735        for (i = 0 ; i < nb_pkts ; ++i) {
1736
1737                /* We put our header straight after the dpdk header */
1738                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1739                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1740
[c04929c]1741#if GET_MAC_CRC_CHECKSUM
[d7fd648]1742                /* Add back in the CRC sum */
[10c47a0]1743                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1744                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
[d7fd648]1745                hdr->flags |= INCLUDES_CHECKSUM;
[c04929c]1746#endif
1747
[d7fd648]1748                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1749
[c04929c]1750#if HAS_HW_TIMESTAMPS_82580
[d7fd648]1751                /* The timestamp is sitting before our packet and is included in pkt_len */
1752                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1753                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1754                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1755
1756                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1757                 *
1758                 *        +----------+---+   +--------------+
1759                 *  82580 |    24    | 8 |   |      32      |
1760                 *        +----------+---+   +--------------+
1761                 *          reserved  \______ 40 bits _____/
1762                 *
1763                 * The 40 bit 82580 SYSTIM overflows every
1764                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1765                 *
1766                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1767                 * Endian (for the full 64 bits) i.e. picture is mirrored
1768                 */
1769
1770                /* Despite what the documentation says this is in Little
1771                 * Endian byteorder. Mask the reserved section out.
1772                 */
1773                hdr->timestamp = le64toh(hw_ts->timestamp) &
1774                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1775
1776                if (unlikely(plc->ts_first_sys == 0)) {
1777                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1778                        plc->ts_last_sys = plc->ts_first_sys;
1779                }
[c04929c]1780
[d7fd648]1781                /* This will have serious problems if packets aren't read quickly
1782                 * that is within a couple of seconds because our clock cycles every
1783                 * 18 seconds */
1784                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1785                                  / (1ull<<TS_NBITS_82580);
1786
1787                /* Estimated_wraps gives the number of times the counter should have
1788                 * wrapped (however depending on value last time it could have wrapped
1789                 * twice more (if hw clock is close to its max value) or once less (allowing
1790                 * for a bit of variance between hw and sys clock). But if the clock
1791                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1792                if (unlikely(estimated_wraps >= 2)) {
1793                        /* 2 or more wrap arounds add all but the very last wrap */
1794                        plc->wrap_count += estimated_wraps - 1;
1795                }
[c04929c]1796
[d7fd648]1797                /* Set the timestamp to the lowest possible value we're considering */
1798                hdr->timestamp += plc->ts_first_sys +
1799                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1800
1801                /* In most runs only the first if() will need evaluating - i.e our
1802                 * estimate is correct. */
1803                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1804                                              hdr->timestamp, MAXSKEW_82580))) {
1805                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1806                        plc->wrap_count++;
1807                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1808                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1809                                             hdr->timestamp, MAXSKEW_82580)) {
1810                                /* Failed to match estimated_wraps */
1811                                plc->wrap_count++;
1812                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1813                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1814                                                     hdr->timestamp, MAXSKEW_82580)) {
1815                                        if (estimated_wraps == 0) {
1816                                                /* 0 case Failed to match estimated_wraps+2 */
1817                                                printf("WARNING - Hardware Timestamp failed to"
1818                                                       " match using systemtime!\n");
1819                                                hdr->timestamp = cur_sys_time_ns;
1820                                        } else {
1821                                                /* Failed to match estimated_wraps+1 */
1822                                                plc->wrap_count++;
1823                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1824                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1825                                                                     hdr->timestamp, MAXSKEW_82580)) {
1826                                                        /* Failed to match estimated_wraps+2 */
1827                                                        printf("WARNING - Hardware Timestamp failed to"
1828                                                               " match using systemtime!!\n");
1829                                                }
1830                                        }
1831                                }
1832                        }
1833                }
[c04929c]1834#else
1835
[d7fd648]1836                hdr->timestamp = cur_sys_time_ns;
1837                /* Offset the next packet by the wire time of previous */
1838                calculate_wire_time(format_data, hdr->cap_len);
[c04929c]1839
[d7fd648]1840#endif
1841        }
1842
1843        plc->ts_last_sys = cur_sys_time_ns;
1844        return;
[c04929c]1845}
1846
[29bbef0]1847
1848static void dpdk_fin_packet(libtrace_packet_t *packet)
1849{
1850        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1851                rte_pktmbuf_free(packet->buffer);
1852                packet->buffer = NULL;
1853        }
1854}
1855
[ddfc46d]1856/** Reads at least one packet or returns an error
1857 */
[c7e547e]1858int dpdk_read_packet_stream (libtrace_t *libtrace,
[ddfc46d]1859                                           dpdk_per_stream_t *stream,
1860                                           libtrace_message_queue_t *mesg,
1861                                           struct rte_mbuf* pkts_burst[],
1862                                           size_t nb_packets) {
1863        size_t nb_rx; /* Number of rx packets we've recevied */
1864        while (1) {
1865                /* Poll for a batch of packets */
1866                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1867                                         stream->queue_id, pkts_burst, nb_packets);
1868                if (nb_rx > 0) {
1869                        /* Got some packets - otherwise we keep spining */
1870                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
1871                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1872                        return nb_rx;
1873                }
1874                /* Check the message queue this could be less than 0 */
1875                if (mesg && libtrace_message_queue_count(mesg) > 0)
1876                        return READ_MESSAGE;
[174b3c7]1877                if ((nb_rx=is_halted(libtrace)) != (size_t) -1)
[5e3f16c]1878                        return nb_rx;
[ddfc46d]1879                /* Wait a while, polling on memory degrades performance
1880                 * This relieves the pressure on memory allowing the NIC to DMA */
1881                rte_delay_us(10);
1882        }
[d7fd648]1883
[ddfc46d]1884        /* We'll never get here - but if we did it would be bad */
1885        return READ_ERROR;
1886}
[c04929c]1887
[ddfc46d]1888static int dpdk_pread_packets (libtrace_t *libtrace,
1889                                    libtrace_thread_t *t,
1890                                    libtrace_packet_t **packets,
1891                                    size_t nb_packets) {
1892        int nb_rx; /* Number of rx packets we've recevied */
1893        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
[4ce6fca]1894        int i;
[ddfc46d]1895        dpdk_per_stream_t *stream = t->format_data;
[db84bb2]1896        struct dpdk_addt_hdr * hdr;
[ddfc46d]1897
1898        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
1899                                         pkts_burst, nb_packets);
1900
1901        if (nb_rx > 0) {
1902                for (i = 0; i < nb_rx; ++i) {
1903                        if (packets[i]->buffer != NULL) {
1904                                /* The packet should always be finished */
1905                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
1906                                free(packets[i]->buffer);
1907                        }
1908                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
1909                        packets[i]->type = TRACE_RT_DATA_DPDK;
1910                        packets[i]->buffer = pkts_burst[i];
1911                        packets[i]->trace = libtrace;
1912                        packets[i]->error = 1;
[db84bb2]1913                        hdr = (struct dpdk_addt_hdr *) 
1914                                        ((struct rte_mbuf*) pkts_burst[i] + 1);
1915                        packets[i]->order = hdr->timestamp;
[ddfc46d]1916                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
1917                }
[d7fd648]1918        }
[ddfc46d]1919
1920        return nb_rx;
1921}
1922
[c7e547e]1923int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
[ddfc46d]1924        int nb_rx; /* Number of rx packets we've received */
1925        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
1926
1927        /* Free the last packet buffer */
1928        if (packet->buffer != NULL) {
1929                /* The packet should always be finished */
1930                assert(packet->buf_control == TRACE_CTRL_PACKET);
1931                free(packet->buffer);
1932                packet->buffer = NULL;
1933        }
1934
1935        packet->buf_control = TRACE_CTRL_EXTERNAL;
1936        packet->type = TRACE_RT_DATA_DPDK;
1937
1938        /* Check if we already have some packets buffered */
1939        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
1940                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
[c7e547e]1941                packet->trace = libtrace;
[d7fd648]1942                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1943                return 1; // TODO should be bytes read, which essentially useless anyway
1944        }
1945
[ddfc46d]1946        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
1947                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
[50ce607]1948
[d7fd648]1949        if (nb_rx > 0) {
[ddfc46d]1950                FORMAT(libtrace)->burst_size = nb_rx;
1951                FORMAT(libtrace)->burst_offset = 1;
1952                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
[c7e547e]1953                packet->trace = libtrace;
[ddfc46d]1954                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1955                return 1;
[d7fd648]1956        }
[ddfc46d]1957        return nb_rx;
[29bbef0]1958}
[c04929c]1959
1960static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
[ddfc46d]1961        struct timeval tv;
1962        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
[d7fd648]1963
[ddfc46d]1964        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1965        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1966        return tv;
[c04929c]1967}
1968
1969static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
[ddfc46d]1970        struct timespec ts;
1971        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
[d7fd648]1972
[ddfc46d]1973        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1974        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1975        return ts;
[c04929c]1976}
1977
1978static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
[ddfc46d]1979        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
[c04929c]1980}
1981
1982static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
[ddfc46d]1983        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1984        return (libtrace_direction_t) hdr->direction;
[c04929c]1985}
1986
1987static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
[ddfc46d]1988        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1989        hdr->direction = (uint8_t) direction;
1990        return (libtrace_direction_t) hdr->direction;
[c04929c]1991}
1992
[b81f0a1]1993static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
1994        struct rte_eth_stats dev_stats = {0};
[d7fd648]1995
[ddfc46d]1996        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
[b81f0a1]1997                return;
1998
[ddfc46d]1999        /* Grab the current stats */
[b81f0a1]2000        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
[d7fd648]2001
[b81f0a1]2002        stats->captured_valid = true;
2003        stats->captured = dev_stats.ipackets;
[c04929c]2004
[b81f0a1]2005        stats->dropped_valid = true;
2006        stats->dropped = dev_stats.imissed;
2007
[b148e3b]2008#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2009        /* DPDK commit 86057c fixes ensures missed does not get counted as
2010         * errors */
2011        stats->errors_valid = true;
2012        stats->errors = dev_stats.ierrors;
2013#else
[b81f0a1]2014        /* DPDK errors includes drops */
2015        stats->errors_valid = true;
2016        stats->errors = dev_stats.ierrors - dev_stats.imissed;
[b148e3b]2017#endif
[b81f0a1]2018        stats->received_valid = true;
2019        stats->received = dev_stats.ipackets + dev_stats.imissed;
[d7fd648]2020
[c04929c]2021}
2022
2023/* Attempts to read a packet in a non-blocking fashion. If one is not
2024 * available a SLEEP event is returned. We do not have the ability to
2025 * create a select()able file descriptor in DPDK.
2026 */
[c7e547e]2027libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
[ddfc46d]2028                                            libtrace_packet_t *packet) {
2029        libtrace_eventobj_t event = {0,0,0.0,0};
[b15cc48]2030        size_t nb_rx; /* Number of received packets we've read */
[d7fd648]2031
[ddfc46d]2032        do {
[d7fd648]2033
[b15cc48]2034                /* No packets waiting in our buffer? Try and read some more */
2035                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
2036                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2037                                                 FORMAT_DATA_FIRST(trace)->queue_id,
2038                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
2039                        if (nb_rx > 0) {
2040                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
2041                                                FORMAT(trace)->burst_pkts, nb_rx);
2042                                FORMAT(trace)->burst_size = nb_rx;
2043                                FORMAT(trace)->burst_offset = 0;
2044                        }
2045                }
[ddfc46d]2046
[b15cc48]2047                /* Now do we have packets waiting? */
2048                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
[ddfc46d]2049                        /* Free the last packet buffer */
2050                        if (packet->buffer != NULL) {
2051                                /* The packet should always be finished */
2052                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2053                                free(packet->buffer);
2054                                packet->buffer = NULL;
2055                        }
[d7fd648]2056
[ddfc46d]2057                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2058                        packet->type = TRACE_RT_DATA_DPDK;
2059                        event.type = TRACE_EVENT_PACKET;
[b15cc48]2060                        packet->buffer = FORMAT(trace)->burst_pkts[
2061                                             FORMAT(trace)->burst_offset++];
[ddfc46d]2062                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2063                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2064
2065                        /* XXX - Check this passes the filter trace_read_packet normally
2066                         * does this for us but this wont */
2067                        if (trace->filter) {
2068                                if (!trace_apply_filter(trace->filter, packet)) {
2069                                        /* Failed the filter so we loop for another packet */
2070                                        trace->filtered_packets ++;
2071                                        continue;
2072                                }
2073                        }
2074                        trace->accepted_packets ++;
2075                } else {
2076                        /* We only want to sleep for a very short time - we are non-blocking */
2077                        event.type = TRACE_EVENT_SLEEP;
2078                        event.seconds = 0.0001;
2079                        event.size = 0;
[d7fd648]2080                }
2081
[ddfc46d]2082                /* If we get here we have our event */
2083                break;
2084        } while (1);
[c04929c]2085
[ddfc46d]2086        return event;
[c04929c]2087}
2088
2089static void dpdk_help(void) {
[c94f107]2090        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
[ddfc46d]2091        printf("Supported input URIs:\n");
2092        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2093        printf("\tThe -<coreid> is optional \n");
2094        printf("\t e.g. dpdk:0000:01:00.1\n");
2095        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2096        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2097        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2098        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2099        printf("\n");
2100        printf("Supported output URIs:\n");
2101        printf("\tSame format as the input URI.\n");
2102        printf("\t e.g. dpdk:0000:01:00.1\n");
2103        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2104        printf("\n");
[c04929c]2105}
2106
[b13b939]2107static struct libtrace_format_t dpdk = {
[c04929c]2108        "dpdk",
[b81f0a1]2109        "$Id$",
[c04929c]2110        TRACE_FORMAT_DPDK,
[b81f0a1]2111        NULL,                               /* probe filename */
2112        NULL,                               /* probe magic */
2113        dpdk_init_input,                    /* init_input */
2114        dpdk_config_input,                  /* config_input */
2115        dpdk_start_input,                   /* start_input */
2116        dpdk_pause_input,                   /* pause_input */
2117        dpdk_init_output,                   /* init_output */
2118        NULL,                               /* config_output */
2119        dpdk_start_output,                  /* start_ouput */
2120        dpdk_fin_input,                     /* fin_input */
2121        dpdk_fin_output,                    /* fin_output */
2122        dpdk_read_packet,                   /* read_packet */
2123        dpdk_prepare_packet,                /* prepare_packet */
2124        dpdk_fin_packet,                    /* fin_packet */
2125        dpdk_write_packet,                  /* write_packet */
2126        dpdk_get_link_type,                 /* get_link_type */
2127        dpdk_get_direction,                 /* get_direction */
2128        dpdk_set_direction,                 /* set_direction */
2129        NULL,                               /* get_erf_timestamp */
2130        dpdk_get_timeval,                   /* get_timeval */
2131        dpdk_get_timespec,                  /* get_timespec */
2132        NULL,                               /* get_seconds */
2133        NULL,                               /* seek_erf */
2134        NULL,                               /* seek_timeval */
2135        NULL,                               /* seek_seconds */
2136        dpdk_get_capture_length,            /* get_capture_length */
2137        dpdk_get_wire_length,               /* get_wire_length */
2138        dpdk_get_framing_length,            /* get_framing_length */
2139        dpdk_set_capture_length,            /* set_capture_length */
2140        NULL,                               /* get_received_packets */
2141        NULL,                               /* get_filtered_packets */
2142        NULL,                               /* get_dropped_packets */
2143        dpdk_get_stats,                     /* get_statistics */
2144        NULL,                               /* get_fd */
2145        dpdk_trace_event,                   /* trace_event */
2146        dpdk_help,                          /* help */
2147        NULL,                               /* next pointer */
2148        {true, 8},                          /* Live, NICs typically have 8 threads */
2149        dpdk_pstart_input,                  /* pstart_input */
2150        dpdk_pread_packets,                 /* pread_packets */
2151        dpdk_pause_input,                   /* ppause */
2152        dpdk_fin_input,                     /* p_fin */
2153        dpdk_pregister_thread,              /* pregister_thread */
2154        dpdk_punregister_thread,            /* punregister_thread */
2155        NULL                                /* get thread stats */
[c04929c]2156};
2157
2158void dpdk_constructor(void) {
2159        register_format(&dpdk);
2160}
Note: See TracBrowser for help on using the repository browser.