source: lib/format_dpdk.c @ 10fd24b

develop
Last change on this file since 10fd24b was 10fd24b, checked in by Shane Alcock <salcock@…>, 23 months ago

Update format_dpdk.c to handle new cached packet data structure.

  • Property mode set to 100644
File size: 72.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 *
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44#include "format_dpdk.h"
45
46#ifdef HAVE_INTTYPES_H
47#  include <inttypes.h>
48#else
49# error "Can't find inttypes.h"
50#endif
51
52#include <stdlib.h>
53#include <unistd.h>
54#include <endian.h>
55#include <string.h>
56#include <math.h>
57
58#ifdef HAVE_LIBNUMA
59#include <numa.h>
60#endif
61
62#define MBUF(x) ((struct rte_mbuf *) x)
63/* Get the original placement of the packet data */
64#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
65#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
66#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
67
68#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
69#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
70
71#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
72                        (uint64_t) tv.tv_usec*1000ull)
73#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
74                        (uint64_t) ts.tv_nsec)
75
76#if RTE_PKTMBUF_HEADROOM != 128
77#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
78         "any libtrace instance processing these packet must be have the" \
79         "same RTE_PKTMBUF_HEADROOM set"
80#endif
81
82
83static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
84/* Memory pools Per NUMA node */
85static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
86
87/* Used by both input and output however some fields are not used
88 * for output */
89struct dpdk_format_data_t {
90        int8_t promisc; /* promiscuous mode - RX only */
91        uint8_t paused; /* See paused_state */
92        portid_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
93        portid_t nb_ports; /* Total number of usable ports on system should be 1 */
94        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
95        int snaplen; /* The snap length for the capture - RX only */
96        /* We always have to setup both rx and tx queues even if we don't want them */
97        int nb_rx_buf; /* The number of packet buffers in the rx ring */
98        int nb_tx_buf; /* The number of packet buffers in the tx ring */
99        int nic_numa_node; /* The NUMA node that the NIC is attached to */
100        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
101#if DPDK_USE_BLACKLIST
102        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
103        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
104#endif
105        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
106        enum hasher_types hasher_type;
107        uint8_t *rss_key;
108        /* To improve single-threaded performance we always batch reading
109         * packets, in a burst, otherwise the parallel library does this for us */
110        struct rte_mbuf* burst_pkts[BURST_SIZE];
111        int burst_size; /* The total number read in the burst */
112        int burst_offset; /* The offset we are into the burst */
113
114        /* Our parallel streams */
115        libtrace_list_t *per_stream;
116};
117
118enum dpdk_addt_hdr_flags {
119        INCLUDES_CHECKSUM = 0x1,
120        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
121};
122
123/**
124 * A structure placed in front of the packet where we can store
125 * additional information about the given packet.
126 * +--------------------------+
127 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
128 * +--------------------------+
129 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
130 * +--------------------------+
131 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
132 * +--------------------------+
133 * *   hw_timestamp_82580     * 16 bytes Optional
134 * +--------------------------+
135 * |       Packet data        | Variable Size
136 * |                          |
137 */
138struct dpdk_addt_hdr {
139        uint64_t timestamp;
140        uint8_t flags;
141        uint8_t direction;
142        uint8_t reserved1;
143        uint8_t reserved2;
144        uint32_t cap_len; /* The size to say the capture is */
145};
146
147/**
148 * We want to blacklist all devices except those on the whitelist
149 * (I say list, but yes it is only the one).
150 *
151 * The default behaviour of rte_pci_probe() will map every possible device
152 * to its DPDK driver. The DPDK driver will take the ethernet device
153 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
154 *
155 * So blacklist all devices except the one that we wish to use so that
156 * the others can still be used as standard ethernet ports.
157 *
158 * @return 0 if successful, otherwise -1 on error.
159 */
160#if DPDK_USE_BLACKLIST
161static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
162{
163        struct rte_pci_device *dev = NULL;
164        format_data->nb_blacklist = 0;
165
166        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
167
168        TAILQ_FOREACH(dev, &device_list, next) {
169        if (whitelist != NULL && whitelist->domain == dev->addr.domain
170            && whitelist->bus == dev->addr.bus
171            && whitelist->devid == dev->addr.devid
172            && whitelist->function == dev->addr.function)
173            continue;
174                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
175                                / sizeof (format_data->blacklist[0])) {
176                        fprintf(stderr, "Warning: too many devices to blacklist consider"
177                                        " increasing BLACK_LIST_SIZE");
178                        break;
179                }
180                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
181                ++format_data->nb_blacklist;
182        }
183
184        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
185        return 0;
186}
187#else /* DPDK_USE_BLACKLIST */
188#include <rte_devargs.h>
189static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
190{
191        char pci_str[20] = {0};
192        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
193                 whitelist->domain,
194                 whitelist->bus,
195                 whitelist->devid,
196                 whitelist->function);
197        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
198                return -1;
199        }
200        return 0;
201}
202#endif
203
204/**
205 * Parse the URI format as a pci address
206 * Fills in addr, note core is optional and is unchanged if
207 * a value for it is not provided.
208 *
209 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
210 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
211 */
212static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
213        int matches;
214
215        if (!str) {
216                fprintf(stderr, "NULL str passed into parse_pciaddr()\n");
217                return -1;
218        }
219#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
220        matches = sscanf(str, "%8"SCNx32":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
221                         &addr->domain, &addr->bus, &addr->devid,
222                         &addr->function, core);
223#else
224        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
225                         &addr->domain, &addr->bus, &addr->devid,
226                         &addr->function, core);
227#endif
228        if (matches >= 4) {
229                return 0;
230        } else {
231                return -1;
232        }
233}
234
235/**
236 * Convert a pci address to the numa node it is
237 * connected to.
238 *
239 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
240 * so we can call it before DPDK
241 *
242 * @return -1 if unknown otherwise a number 0 or higher of the numa node
243 */
244static int pci_to_numa(struct rte_pci_addr * dev_addr) {
245        char path[50] = {0};
246        FILE *file;
247
248        /* Read from the system */
249        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
250                 dev_addr->domain,
251                 dev_addr->bus,
252                 dev_addr->devid,
253                 dev_addr->function);
254
255        if((file = fopen(path, "r")) != NULL) {
256                int numa_node = -1;
257                fscanf(file, "%d", &numa_node);
258                fclose(file);
259                return numa_node;
260        }
261        return -1;
262}
263
264#if DEBUG
265/* For debugging */
266static inline void dump_configuration()
267{
268        struct rte_config * global_config;
269        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
270
271        if (nb_cpu <= 0) {
272                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
273                       " Falling back to the first core.");
274                nb_cpu = 1; /* fallback to just 1 core */
275        }
276        if (nb_cpu > RTE_MAX_LCORE)
277                nb_cpu = RTE_MAX_LCORE;
278
279        global_config = rte_eal_get_configuration();
280
281        if (global_config != NULL) {
282                int i;
283                fprintf(stderr, "Intel DPDK setup\n"
284                        "---Version      : %s\n"
285                        "---Master LCore : %"PRIu32"\n"
286                        "---LCore Count  : %"PRIu32"\n",
287                        rte_version(),
288                        global_config->master_lcore, global_config->lcore_count);
289
290                for (i = 0 ; i < nb_cpu; i++) {
291                        fprintf(stderr, "   ---Core %d : %s\n", i,
292                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
293                }
294
295                const char * proc_type;
296                switch (global_config->process_type) {
297                case RTE_PROC_AUTO:
298                        proc_type = "auto";
299                        break;
300                case RTE_PROC_PRIMARY:
301                        proc_type = "primary";
302                        break;
303                case RTE_PROC_SECONDARY:
304                        proc_type = "secondary";
305                        break;
306                case RTE_PROC_INVALID:
307                        proc_type = "invalid";
308                        break;
309                default:
310                        proc_type = "something worse than invalid!!";
311                }
312                fprintf(stderr, "---Process Type : %s\n", proc_type);
313        }
314
315}
316#endif
317
318/**
319 * Expects to be called from the master lcore and moves it to the given dpdk id
320 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
321 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
322 *               and not already in use.
323 * @return 0 is successful otherwise -1 on error.
324 */
325static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
326        struct rte_config *cfg = rte_eal_get_configuration();
327        cpu_set_t cpuset;
328        int i;
329
330        if (core >= RTE_MAX_LCORE) {
331                fprintf(stderr, "Core must be a value less than the number of cores "
332                        "in dpdk_move_master_lcore()\n");
333                return -1;
334        }
335        if (rte_get_master_lcore() != rte_lcore_id()) {
336                fprintf(stderr, "Master core not equal to core id in dpdk_move_master_lcore()\n");
337                return -1;
338        }
339
340        if (core == rte_lcore_id())
341                return 0;
342
343        /* Make sure we are not overwriting someone else */
344        if (rte_lcore_is_enabled(core)) {
345                fprintf(stderr, "Cannot override another core in dpdk_move_master_lcore()\n");
346                return -1;
347        }
348
349        /* Move the core */
350        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
351        cfg->lcore_role[core] = ROLE_RTE;
352        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
353        rte_eal_get_configuration()->master_lcore = core;
354        RTE_PER_LCORE(_lcore_id) = core;
355
356        /* Now change the affinity, either mapped to a single core or all accepted */
357        CPU_ZERO(&cpuset);
358
359        if (lcore_config[core].detected) {
360                CPU_SET(core, &cpuset);
361        } else {
362                for (i = 0; i < RTE_MAX_LCORE; ++i) {
363                        if (lcore_config[i].detected)
364                                CPU_SET(i, &cpuset);
365                }
366        }
367
368        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
369        if (i != 0) {
370                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
371                return -1;
372        }
373        return 0;
374}
375
376/**
377 * XXX This is very bad XXX
378 * But we have to do something to allow getopts nesting
379 * Luckly normally the format is last so it doesn't matter
380 * DPDK only supports modern systems so hopefully this
381 * will continue to work
382 */
383struct saved_getopts {
384        char *optarg;
385        int optind;
386        int opterr;
387        int optopt;
388};
389
390static void save_getopts(struct saved_getopts *opts) {
391        opts->optarg = optarg;
392        opts->optind = optind;
393        opts->opterr = opterr;
394        opts->optopt = optopt;
395}
396
397static void restore_getopts(struct saved_getopts *opts) {
398        optarg = opts->optarg;
399        optind = opts->optind;
400        opterr = opts->opterr;
401        optopt = opts->optopt;
402}
403
404static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
405                                        char * err, int errlen) {
406        int ret; /* Returned error codes */
407        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
408        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
409        char mem_map[20] = {0}; /* The memory name */
410        long nb_cpu; /* The number of CPUs in the system */
411        long my_cpu; /* The CPU number we want to bind to */
412        int i;
413        struct rte_config *cfg = rte_eal_get_configuration();
414        struct saved_getopts save_opts;
415
416        /* This initialises the Environment Abstraction Layer (EAL)
417         * If we had slave workers these are put into WAITING state
418         *
419         * Basically binds this thread to a fixed core, which we choose as
420         * the last core on the machine (assuming fewer interrupts mapped here).
421         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
422         * "-n" the number of memory channels into the CPU (hardware specific)
423         *      - Most likely to be half the number of ram slots in your machine.
424         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
425         * Controls where in memory packets are stored such that they are spread
426         * across the channels. We just use 1 to be safe.
427         *
428         * Using unique file prefixes mean separate memory is used, unlinking
429         * the two processes. However be careful we still cannot access a
430         * port that already in use.
431         */
432        char* argv[] = {"libtrace",
433                        "-c", cpu_number,
434                        "-n", "1",
435                        "--proc-type", "auto",
436                        "--file-prefix", mem_map,
437                        "-m", "512",
438#if DPDK_USE_LOG_LEVEL
439#       if DEBUG
440                        "--log-level", "8", /* RTE_LOG_DEBUG */
441#       else
442                        "--log-level", "5", /* RTE_LOG_WARNING */
443#       endif
444#endif
445                        NULL};
446        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
447
448#if DEBUG
449        rte_log_set_global_level(RTE_LOG_DEBUG);
450#else
451        rte_log_set_global_level(RTE_LOG_WARNING);
452#endif
453
454        /* Get the number of cpu cores in the system and use the last core
455         * on the correct numa node */
456        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
457        if (nb_cpu <= 0) {
458                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
459                       " Falling back to the first core.");
460                nb_cpu = 1; /* fallback to the first core */
461        }
462        if (nb_cpu > RTE_MAX_LCORE)
463                nb_cpu = RTE_MAX_LCORE;
464
465        my_cpu = -1;
466        /* This allows the user to specify the core - we would try to do this
467         * automatically but it's hard to tell that this is secondary
468         * before running rte_eal_init(...). Currently we are limited to 1
469         * instance per core due to the way memory is allocated. */
470        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
471                snprintf(err, errlen, "Failed to parse URI");
472                return -1;
473        }
474
475#ifdef HAVE_LIBNUMA
476        format_data->nic_numa_node = pci_to_numa(&use_addr);
477        if (my_cpu < 0) {
478#if DEBUG
479                /* If we can assign to a core on the same numa node */
480                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
481#endif
482                if(format_data->nic_numa_node >= 0) {
483                        int max_node_cpu = -1;
484                        struct bitmask *mask = numa_allocate_cpumask();
485                        if (!mask) {
486                                fprintf(stderr, "Unable to allocate cpu mask in dpdk_init_environment()\n");
487                                return -1;
488                        }
489                        numa_node_to_cpus(format_data->nic_numa_node, mask);
490                        for (i = 0 ; i < nb_cpu; ++i) {
491                                if (numa_bitmask_isbitset(mask,i))
492                                        max_node_cpu = i+1;
493                        }
494                        my_cpu = max_node_cpu;
495                }
496        }
497#endif
498        if (my_cpu < 0) {
499                my_cpu = nb_cpu;
500        }
501
502
503        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
504                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
505
506        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
507                snprintf(err, errlen,
508                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
509                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
510                return -1;
511        }
512
513        /* Make our mask with all cores turned on this is so that DPDK
514         * gets all CPU info in older versions */
515        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
516        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
517
518#if !DPDK_USE_BLACKLIST
519        /* Black list all ports besides the one that we want to use */
520        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
521                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
522                         " are you sure the address is correct?: %s", strerror(-ret));
523                return -1;
524        }
525#endif
526
527        /* Give the memory map a unique name */
528        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
529        /* rte_eal_init it makes a call to getopt so we need to reset the
530         * global optind variable of getopt otherwise this fails */
531        save_getopts(&save_opts);
532        optind = 1;
533        if ((ret = rte_eal_init(argc, argv)) < 0) {
534                snprintf(err, errlen,
535                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
536                return -1;
537        }
538        restore_getopts(&save_opts);
539        // These are still running but will never do anything with DPDK v1.7 we
540        // should remove this XXX in the future
541        for(i = 0; i < RTE_MAX_LCORE; ++i) {
542                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
543                        cfg->lcore_role[i] = ROLE_OFF;
544                        cfg->lcore_count--;
545                }
546        }
547        // Only the master should be running
548        if (cfg->lcore_count != 1) {
549                fprintf(stderr, "Expected only the master core to be running in dpdk_init_environment()\n");
550                return -1;
551        }
552
553        // TODO XXX TODO
554        dpdk_move_master_lcore(NULL, my_cpu-1);
555
556#if DEBUG
557        dump_configuration();
558#endif
559
560#if DPDK_USE_PMD_INIT
561        /* This registers all available NICs with Intel DPDK
562         * These are not loaded until rte_eal_pci_probe() is called.
563         */
564        if ((ret = rte_pmd_init_all()) < 0) {
565                snprintf(err, errlen,
566                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
567                return -1;
568        }
569#endif
570
571#if DPDK_USE_BLACKLIST
572        /* Blacklist all ports besides the one that we want to use */
573        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
574                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
575                         " are you sure the address is correct?: %s", strerror(-ret));
576                return -1;
577        }
578#endif
579
580#if DPDK_USE_PCI_PROBE
581        /* This loads DPDK drivers against all ports that are not blacklisted */
582        if ((ret = rte_eal_pci_probe()) < 0) {
583                snprintf(err, errlen,
584                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
585                return -1;
586        }
587#endif
588
589        format_data->nb_ports = rte_eth_dev_count();
590
591        if (format_data->nb_ports != 1) {
592                snprintf(err, errlen,
593                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
594                         format_data->nb_ports);
595                return -1;
596        }
597
598        return 0;
599}
600
601int dpdk_init_input (libtrace_t *libtrace) {
602        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
603        char err[500];
604        err[0] = 0;
605
606        libtrace->format_data = (struct dpdk_format_data_t *)
607                                malloc(sizeof(struct dpdk_format_data_t));
608
609        if (!libtrace->format_data) {
610                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
611                        "format data inside dpdk_init_input()");
612                return 1;
613        }
614
615        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
616        FORMAT(libtrace)->nb_ports = 0;
617        FORMAT(libtrace)->snaplen = 0; /* Use default */
618        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
619        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
620        FORMAT(libtrace)->nic_numa_node = -1;
621        FORMAT(libtrace)->promisc = -1;
622        FORMAT(libtrace)->pktmbuf_pool = NULL;
623#if DPDK_USE_BLACKLIST
624        FORMAT(libtrace)->nb_blacklist = 0;
625#endif
626        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
627        FORMAT(libtrace)->mempool_name[0] = 0;
628        memset(FORMAT(libtrace)->burst_pkts, 0,
629               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
630        FORMAT(libtrace)->burst_size = 0;
631        FORMAT(libtrace)->burst_offset = 0;
632        FORMAT(libtrace)->hasher_type = HASHER_BALANCE;
633        FORMAT(libtrace)->rss_key = NULL;
634
635        /* Make our first stream */
636        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
637        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
638
639        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
640                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
641                free(libtrace->format_data);
642                libtrace->format_data = NULL;
643                return -1;
644        }
645        return 0;
646}
647
648static int dpdk_init_output(libtrace_out_t *libtrace)
649{
650        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
651        char err[500];
652        err[0] = 0;
653
654        libtrace->format_data = (struct dpdk_format_data_t *)
655                                malloc(sizeof(struct dpdk_format_data_t));
656
657        if (!libtrace->format_data) {
658                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
659                        "format data inside dpdk_init_output()");
660                return -1;
661        }
662        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
663        FORMAT(libtrace)->nb_ports = 0;
664        FORMAT(libtrace)->snaplen = 0; /* Use default */
665        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
666        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
667        FORMAT(libtrace)->nic_numa_node = -1;
668        FORMAT(libtrace)->promisc = -1;
669        FORMAT(libtrace)->pktmbuf_pool = NULL;
670#if DPDK_USE_BLACKLIST
671        FORMAT(libtrace)->nb_blacklist = 0;
672#endif
673        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
674        FORMAT(libtrace)->mempool_name[0] = 0;
675        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
676        FORMAT(libtrace)->burst_size = 0;
677        FORMAT(libtrace)->burst_offset = 0;
678
679        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
680        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
681
682        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
683                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
684                free(libtrace->format_data);
685                libtrace->format_data = NULL;
686                return -1;
687        }
688        return 0;
689}
690
691/**
692 * Note here snaplen excludes the MAC checksum. Packets over
693 * the requested snaplen will be dropped. (Excluding MAC checksum)
694 *
695 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
696 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
697 * is set the maximum size of the returned packet would be 1518 otherwise
698 * 1514 would be the largest size possibly returned.
699 *
700 */
701int dpdk_config_input (libtrace_t *libtrace,
702                              trace_option_t option,
703                              void *data) {
704        switch (option) {
705        case TRACE_OPTION_SNAPLEN:
706                /* Only support changing snaplen before a call to start is
707                 * made */
708                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
709                        FORMAT(libtrace)->snaplen=*(int*)data;
710                else
711                        return -1;
712                return 0;
713        case TRACE_OPTION_PROMISC:
714                FORMAT(libtrace)->promisc=*(int*)data;
715                return 0;
716        case TRACE_OPTION_HASHER:
717                switch (*((enum hasher_types *) data))
718                {
719                case HASHER_BALANCE:
720                case HASHER_UNIDIRECTIONAL:
721                case HASHER_BIDIRECTIONAL:
722                        FORMAT(libtrace)->hasher_type = *(enum hasher_types*)data;
723                        if (FORMAT(libtrace)->rss_key)
724                                free(FORMAT(libtrace)->rss_key);
725                        FORMAT(libtrace)->rss_key = NULL;
726                        return 0;
727                case HASHER_CUSTOM:
728                        // Let libtrace do this
729                        return -1;
730                }
731                break;
732        case TRACE_OPTION_FILTER:
733                /* TODO filtering */
734        case TRACE_OPTION_META_FREQ:
735        case TRACE_OPTION_EVENT_REALTIME:
736        case TRACE_OPTION_REPLAY_SPEEDUP:
737        case TRACE_OPTION_CONSTANT_ERF_FRAMING:
738                break;
739        /* Avoid default: so that future options will cause a warning
740         * here to remind us to implement it, or flag it as
741         * unimplementable
742         */
743        }
744
745        /* Don't set an error - trace_config will try to deal with the
746         * option and will set an error if it fails */
747        return -1;
748}
749
750/* Can set jumbo frames/ or limit the size of a frame by setting both
751 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
752 *
753 */
754static struct rte_eth_conf port_conf = {
755        .rxmode = {
756                .mq_mode = ETH_RSS,
757                .split_hdr_size = 0,
758                .header_split   = 0, /**< Header Split disabled */
759                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
760                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
761                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
762                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
763#if GET_MAC_CRC_CHECKSUM
764/* So it appears that if hw_strip_crc is turned off the driver will still
765 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
766 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
767 * So lets just add it back on when we receive the packet.
768 */
769                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
770#else
771/* By default strip the MAC checksum because it's a bit of a hack to
772 * actually read these. And don't want to rely on disabling this to actualy
773 * always cut off the checksum in the future
774 */
775                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
776#endif
777        },
778        .txmode = {
779                .mq_mode = ETH_DCB_NONE,
780        },
781        .rx_adv_conf = {
782                .rss_conf = {
783                        .rss_hf = RX_RSS_FLAGS,
784                },
785        },
786        .intr_conf = {
787                .lsc = 1
788        }
789};
790
791static const struct rte_eth_rxconf rx_conf = {
792        .rx_thresh = {
793                .pthresh = 8,/* RX_PTHRESH prefetch */
794                .hthresh = 8,/* RX_HTHRESH host */
795                .wthresh = 4,/* RX_WTHRESH writeback */
796        },
797        .rx_free_thresh = 0,
798        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
799};
800
801static const struct rte_eth_txconf tx_conf = {
802        .tx_thresh = {
803                /*
804                 * TX_PTHRESH prefetch
805                 * Set on the NIC, if the number of unprocessed descriptors to queued on
806                 * the card fall below this try grab at least hthresh more unprocessed
807                 * descriptors.
808                 */
809                .pthresh = 36,
810
811                /* TX_HTHRESH host
812                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
813                 */
814                .hthresh = 0,
815
816                /* TX_WTHRESH writeback
817                 * Set on the NIC, the number of sent descriptors before writing back
818                 * status to confirm the transmission. This is done more efficiently as
819                 * a bulk DMA-transfer rather than writing one at a time.
820                 * Similar to tx_free_thresh however this is applied to the NIC, where
821                 * as tx_free_thresh is when DPDK will check these. This is extended
822                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
823                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
824                 */
825                .wthresh = 4,
826        },
827
828        /* Used internally by DPDK rather than passed to the NIC. The number of
829         * packet descriptors to send before checking for any responses written
830         * back (to confirm the transmission). Default = 32 if set to 0)
831         */
832        .tx_free_thresh = 0,
833
834        /* This is the Report Status threshold, used by 10Gbit cards,
835         * This signals the card to only write back status (such as
836         * transmission successful) after this minimum number of transmit
837         * descriptors are seen. The default is 32 (if set to 0) however if set
838         * to greater than 1 TX wthresh must be set to zero, because this is kindof
839         * a replacement. See the dpdk programmers guide for more restrictions.
840         */
841        .tx_rs_thresh = 1,
842};
843
844/**
845 * A callback for a link state change (LSC).
846 *
847 * Packets may be received before this notification. In fact the DPDK IGXBE
848 * driver likes to put a delay upto 5sec before sending this.
849 *
850 * We use this to ensure the link speed is correct for our timestamp
851 * calculations. Because packets might be received before the link up we still
852 * update this when the packet is received.
853 *
854 * @param port The DPDK port
855 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
856 * @param cb_arg The dpdk_format_data_t structure associated with the format
857 */
858#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
859static int dpdk_lsc_callback(portid_t port, enum rte_eth_event_type event,
860                              void *cb_arg, void *retparam UNUSED) {
861#else
862static void dpdk_lsc_callback(portid_t port, enum rte_eth_event_type event,
863                              void *cb_arg) {
864#endif
865        struct dpdk_format_data_t * format_data = cb_arg;
866        struct rte_eth_link link_info;
867        if (event != RTE_ETH_EVENT_INTR_LSC) {
868                fprintf(stderr, "Received unexpected event in dpdk_lsc_callback()\n");
869                #if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
870                return -1;
871                #else
872                return;
873                #endif
874        }
875        if (port != format_data->port) {
876                fprintf(stderr, "Port does not match port in format data in dpdk_lsc_callback()\n");
877                return -1;
878        }
879
880        rte_eth_link_get_nowait(port, &link_info);
881
882        if (link_info.link_status)
883                format_data->link_speed = link_info.link_speed;
884        else
885                format_data->link_speed = 0;
886
887#if DEBUG
888        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
889                link_info.link_status ? "up" : "down",
890                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
891                                          "full-duplex" : "half-duplex",
892                (int) link_info.link_speed);
893#endif
894
895        /* Turns out DPDK drivers might not come back up if the link speed
896         * changes. So we reset the autoneg procedure. This is very unsafe
897         * we have have threads reading packets and we stop the port. */
898#if 0
899        if (!link_info.link_status) {
900                int ret;
901                rte_eth_dev_stop(port);
902                ret = rte_eth_dev_start(port);
903                if (ret < 0) {
904                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
905                                strerror(-ret));
906                }
907        }
908#endif
909#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
910        return 0;
911#endif
912}
913
914/** Reserve a DPDK lcore ID for a thread globally.
915 *
916 * @param real If true allocate a real lcore, otherwise allocate a core which
917 * does not exist on the local machine.
918 * @param socket the prefered NUMA socket - only used if a real core is requested
919 * @return a valid core, which can later be used with dpdk_register_lcore() or a
920 * -1 if have run out of cores.
921 *
922 * If any thread is reading or freeing packets we need to register it here
923 * due to TLS caches in the memory pools.
924 */
925static int dpdk_reserve_lcore(bool real, int socket) {
926        int new_id = -1;
927        int i;
928        struct rte_config *cfg = rte_eal_get_configuration();
929        (void) socket;
930
931        pthread_mutex_lock(&dpdk_lock);
932        /* If 'reading packets' fill in cores from 0 up and bind affinity
933         * otherwise start from the MAX core (which is also the master) and work backwards
934         * in this case physical cores on the system will not exist so we don't bind
935         * these to any particular physical core */
936        if (real) {
937#ifdef HAVE_LIBNUMA
938                for (i = 0; i < RTE_MAX_LCORE; ++i) {
939                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
940                                new_id = i;
941                                if (!lcore_config[i].detected)
942                                        new_id = -1;
943                                break;
944                        }
945                }
946#endif
947                /* Retry without the the numa restriction */
948                if (new_id == -1) {
949                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
950                                if (!rte_lcore_is_enabled(i)) {
951                                        new_id = i;
952                                        if (!lcore_config[i].detected)
953                                                fprintf(stderr, "Warning the"
954                                                        " number of 'reading' "
955                                                        "threads exceed cores\n");
956                                        break;
957                                }
958                        }
959                }
960        } else {
961                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
962                        if (!rte_lcore_is_enabled(i)) {
963                                new_id = i;
964                                break;
965                        }
966                }
967        }
968
969        if (new_id != -1) {
970                /* Enable the core in global DPDK structs */
971                cfg->lcore_role[new_id] = ROLE_RTE;
972                cfg->lcore_count++;
973        }
974
975        pthread_mutex_unlock(&dpdk_lock);
976        return new_id;
977}
978
979/** Register a thread as a lcore
980 * @param libtrace any error is set against libtrace on exit
981 * @param real If this is a true lcore we will bind its affinty to the
982 * requested core.
983 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
984 * @return 0, if successful otherwise -1 if an error occured (details are stored
985 * in libtrace)
986 *
987 * @note This must be called from the thread being registered.
988 */
989static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
990        int ret;
991        RTE_PER_LCORE(_lcore_id) = lcore;
992
993        /* Set affinity bind to corresponding core */
994        if (real) {
995                cpu_set_t cpuset;
996                CPU_ZERO(&cpuset);
997                CPU_SET(rte_lcore_id(), &cpuset);
998                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
999                if (ret != 0) {
1000                        trace_set_err(libtrace, errno, "Warning "
1001                                      "pthread_setaffinity_np failed");
1002                        return -1;
1003                }
1004        }
1005
1006        return 0;
1007}
1008
1009/** Allocates a new dpdk packet buffer memory pool.
1010 *
1011 * @param n The number of threads
1012 * @param pkt_size The packet size we need ot store
1013 * @param socket_id The NUMA socket id
1014 * @param A new mempool, if NULL query the DPDK library for the error code
1015 * see rte_mempool_create() documentation.
1016 *
1017 * This allocates a new pool or recycles an existing memory pool.
1018 * Call dpdk_free_memory() to free the memory.
1019 * We cannot delete memory so instead we store the pools, allowing them to be
1020 * re-used.
1021 */
1022static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1023                                             unsigned pkt_size,
1024                                             int socket_id) {
1025        struct rte_mempool *ret;
1026        size_t j,k;
1027        char name[MEMPOOL_NAME_LEN];
1028
1029        /* Add on packet size overheads */
1030        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1031
1032        pthread_mutex_lock(&dpdk_lock);
1033
1034        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1035                /* Best guess go for zero */
1036                socket_id = 0;
1037        }
1038
1039        /* Find a valid pool */
1040        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1041                if (mem_pools[socket_id][j]->size >= n &&
1042                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1043                        break;
1044                }
1045        }
1046
1047        /* Find the end (+1) of the list */
1048        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1049
1050        if (mem_pools[socket_id][j]) {
1051                ret = mem_pools[socket_id][j];
1052                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1053                mem_pools[socket_id][k-1] = NULL;
1054                mem_pools[socket_id][j] = NULL;
1055        } else {
1056                static uint32_t test = 10;
1057                test++;
1058                snprintf(name, MEMPOOL_NAME_LEN,
1059                         "libtrace_pool_%"PRIu32, test);
1060
1061                ret = rte_mempool_create(name, n, pkt_size,
1062                                         128, sizeof(struct rte_pktmbuf_pool_private),
1063                                         rte_pktmbuf_pool_init, NULL,
1064                                         rte_pktmbuf_init, NULL,
1065                                         socket_id, 0);
1066        }
1067
1068        pthread_mutex_unlock(&dpdk_lock);
1069        return ret;
1070}
1071
1072/** Stores the memory against the DPDK library.
1073 *
1074 * @param mempool The mempool to free
1075 * @param socket_id The NUMA socket this mempool was allocated upon.
1076 *
1077 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1078 * store the memory shared globally against the format.
1079 */
1080static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1081        size_t i;
1082        pthread_mutex_lock(&dpdk_lock);
1083
1084        /* We should have all entries back in the mempool */
1085        rte_mempool_audit(mempool);
1086        if (!rte_mempool_full(mempool)) {
1087                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1088                        "free all packets before finishing a trace\n",
1089                        rte_mempool_avail_count(mempool), mempool->size);
1090        }
1091
1092        /* Find the end (+1) of the list */
1093        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1094
1095        if (i >= RTE_MAX_LCORE) {
1096                fprintf(stderr, "Too many memory pools, dropping this one\n");
1097        } else {
1098                mem_pools[socket_id][i] = mempool;
1099        }
1100
1101        pthread_mutex_unlock(&dpdk_lock);
1102}
1103
1104/* Attach memory to the port and start (or restart) the port/s.
1105 */
1106static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1107                              char *err, int errlen, uint16_t rx_queues) {
1108        int ret, i;
1109        struct rte_eth_link link_info; /* Wait for link */
1110        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1111
1112        /* Already started */
1113        if (format_data->paused == DPDK_RUNNING)
1114                return 0;
1115
1116        /* First time started we need to alloc our memory, doing this here
1117         * rather than in environment setup because we don't have snaplen then */
1118        if (format_data->paused == DPDK_NEVER_STARTED) {
1119                if (format_data->snaplen == 0) {
1120                        format_data->snaplen = RX_MBUF_SIZE;
1121                        port_conf.rxmode.jumbo_frame = 0;
1122                        port_conf.rxmode.max_rx_pkt_len = 0;
1123                } else {
1124                        double expn;
1125
1126                        /* Use jumbo frames */
1127                        port_conf.rxmode.jumbo_frame = 1;
1128                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1129
1130                        /* Use less buffers if we're supporting jumbo frames
1131                         * otherwise we won't be able to allocate memory.
1132                         */
1133                        if (format_data->snaplen > 1500) {
1134                                format_data->nb_rx_buf /= 2;
1135                        }
1136
1137                        /* snaplen should be rounded up to next power of two
1138                         * to ensure enough memory is allocated for each
1139                         * mbuf :(
1140                         */
1141                        expn = ceil(log2((double)(format_data->snaplen)));
1142                        format_data->snaplen = pow(2, (int)expn);
1143                }
1144
1145#if GET_MAC_CRC_CHECKSUM
1146                /* This is additional overhead so make sure we allow space for this */
1147                format_data->snaplen += ETHER_CRC_LEN;
1148#endif
1149#if HAS_HW_TIMESTAMPS_82580
1150                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1151#endif
1152
1153                /* Create the mbuf pool, which is the place packets are allocated
1154                 * from - There is no free function (I cannot see one).
1155                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1156                 * allocate however that extra 1 packet is not used.
1157                 * (I assume <= vs < error some where in DPDK code)
1158                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1159                 * so that will fill the new buffer and wait until slots in the
1160                 * ring become available.
1161                 */
1162#if DEBUG
1163                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1164#endif
1165                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1166                                                              format_data->snaplen,
1167                                                              format_data->nic_numa_node);
1168
1169                if (format_data->pktmbuf_pool == NULL) {
1170                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1171                                 "pool failed: %s", strerror(rte_errno));
1172                        return -1;
1173                }
1174        }
1175
1176        /* Generate the hash key, based on the device */
1177        uint8_t rss_size = 52; // 52 for i40e, 40 for others, use the largest by default
1178        // In new versions DPDK we can query the size
1179#if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
1180        struct rte_eth_dev_info dev_info;
1181        rte_eth_dev_info_get(format_data->port, &dev_info);
1182        rss_size = dev_info.hash_key_size;
1183#endif
1184        if (rss_size != 0) {
1185                format_data->rss_key = malloc(rss_size);
1186                if (format_data->hasher_type == HASHER_BIDIRECTIONAL) {
1187                        toeplitz_ncreate_bikey(format_data->rss_key, rss_size);
1188                } else {
1189                        toeplitz_ncreate_unikey(format_data->rss_key, rss_size);
1190                }
1191                port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1192#if RTE_VERSION >= RTE_VERSION_NUM(1, 7, 0, 1)
1193                port_conf.rx_adv_conf.rss_conf.rss_key_len = rss_size;
1194#endif
1195        } else {
1196                fprintf(stderr, "DPDK couldn't configure RSS hashing!");
1197        }
1198
1199        /* ----------- Now do the setup for the port mapping ------------ */
1200        /* Order of calls must be
1201         * rte_eth_dev_configure()
1202         * rte_eth_tx_queue_setup()
1203         * rte_eth_rx_queue_setup()
1204         * rte_eth_dev_start()
1205         * other rte_eth calls
1206         */
1207
1208        /* This must be called first before another *eth* function
1209         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1210        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1211        if (ret < 0) {
1212                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1213                         " %"PRIu8" : %s", format_data->port,
1214                         strerror(-ret));
1215                return -1;
1216        }
1217#if DEBUG
1218        fprintf(stderr, "Doing dev configure\n");
1219#endif
1220        /* Initialise the TX queue a minimum value if using this port for
1221         * receiving. Otherwise a larger size if writing packets.
1222         */
1223        ret = rte_eth_tx_queue_setup(format_data->port,
1224                                     0 /* queue XXX */,
1225                                     format_data->nb_tx_buf,
1226                                     SOCKET_ID_ANY,
1227                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1228        if (ret < 0) {
1229                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1230                         " on port %d : %s", (int)format_data->port,
1231                         strerror(-ret));
1232                return -1;
1233        }
1234
1235        /* Attach memory to our RX queues */
1236        for (i=0; i < rx_queues; i++) {
1237                dpdk_per_stream_t *stream;
1238#if DEBUG
1239                fprintf(stderr, "Configuring queue %d\n", i);
1240#endif
1241
1242                /* Add storage for the stream */
1243                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1244                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1245                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1246                stream->queue_id = i;
1247
1248                if (stream->lcore == -1)
1249                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1250
1251                if (stream->lcore == -1) {
1252                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1253                                 ". Too many threads?");
1254                        return -1;
1255                }
1256
1257                if (stream->mempool == NULL) {
1258                        stream->mempool = dpdk_alloc_memory(
1259                                                  format_data->nb_rx_buf*2,
1260                                                  format_data->snaplen,
1261                                                  rte_lcore_to_socket_id(stream->lcore));
1262
1263                        if (stream->mempool == NULL) {
1264                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1265                                         "pool failed: %s", strerror(rte_errno));
1266                                return -1;
1267                        }
1268                }
1269
1270                /* Initialise the RX queue with some packets from memory */
1271                ret = rte_eth_rx_queue_setup(format_data->port,
1272                                             stream->queue_id,
1273                                             format_data->nb_rx_buf,
1274                                             format_data->nic_numa_node,
1275                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1276                                             stream->mempool);
1277                if (ret < 0) {
1278                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1279                                 " RX queue on port %d : %s",
1280                                 (int)format_data->port,
1281                                 strerror(-ret));
1282                        return -1;
1283                }
1284        }
1285
1286#if DEBUG
1287        fprintf(stderr, "Doing start device\n");
1288#endif
1289        rte_eth_stats_reset(format_data->port);
1290        /* Start device */
1291        ret = rte_eth_dev_start(format_data->port);
1292        if (ret < 0) {
1293                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1294                         strerror(-ret));
1295                return -1;
1296        }
1297
1298        /* Default promiscuous to on */
1299        if (format_data->promisc == -1)
1300                format_data->promisc = 1;
1301
1302        if (format_data->promisc == 1)
1303                rte_eth_promiscuous_enable(format_data->port);
1304        else
1305                rte_eth_promiscuous_disable(format_data->port);
1306
1307        /* We have now successfully started/unpased */
1308        format_data->paused = DPDK_RUNNING;
1309
1310
1311        /* Register a callback for link state changes */
1312        ret = rte_eth_dev_callback_register(format_data->port,
1313                                            RTE_ETH_EVENT_INTR_LSC,
1314                                            dpdk_lsc_callback,
1315                                            format_data);
1316#if DEBUG
1317        if (ret)
1318                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1319                        ret, strerror(-ret));
1320#endif
1321
1322        /* Get the current link status */
1323        rte_eth_link_get_nowait(format_data->port, &link_info);
1324        format_data->link_speed = link_info.link_speed;
1325#if DEBUG
1326        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1327                (int) link_info.link_duplex, (int) link_info.link_speed);
1328#endif
1329
1330        return 0;
1331}
1332
1333int dpdk_start_input (libtrace_t *libtrace) {
1334        char err[500];
1335        err[0] = 0;
1336
1337        /* Make sure we don't reserve an extra thread for this */
1338        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1339
1340        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1341                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1342                free(libtrace->format_data);
1343                libtrace->format_data = NULL;
1344                return -1;
1345        }
1346        return 0;
1347}
1348
1349static inline size_t dpdk_get_max_rx_queues (portid_t port_id) {
1350        struct rte_eth_dev_info dev_info;
1351        rte_eth_dev_info_get(port_id, &dev_info);
1352        return dev_info.max_rx_queues;
1353}
1354
1355static inline size_t dpdk_processor_count () {
1356        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1357        if (nb_cpu <= 0)
1358                return 1;
1359        else
1360                return (size_t) nb_cpu;
1361}
1362
1363int dpdk_pstart_input (libtrace_t *libtrace) {
1364        char err[500];
1365        int i=0, phys_cores=0;
1366        int tot = libtrace->perpkt_thread_count;
1367        libtrace_list_node_t *n;
1368        err[0] = 0;
1369
1370        if (rte_lcore_id() != rte_get_master_lcore())
1371                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1372                        " from the master DPDK thread!\n");
1373
1374        /* If the master is not on the last thread we move it there */
1375        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1376                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1377                        return -1;
1378        }
1379
1380        /* Don't exceed the number of cores in the system/detected by dpdk
1381         * We don't have to force this but performance wont be good if we don't */
1382        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1383                if (lcore_config[i].detected) {
1384                        if (rte_lcore_is_enabled(i)) {
1385#if DEBUG
1386                                fprintf(stderr, "Found core %d already in use!\n", i);
1387#endif
1388                        } else {
1389                                phys_cores++;
1390                        }
1391                }
1392        }
1393        /* If we are restarting we have already allocated some threads as such
1394         * we add these back to the count for this calculation */
1395        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1396                dpdk_per_stream_t * stream = n->data;
1397                if (stream->lcore != -1)
1398                        phys_cores++;
1399        }
1400
1401        tot = MIN(libtrace->perpkt_thread_count,
1402                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1403        tot = MIN(tot, phys_cores);
1404
1405#if DEBUG
1406        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1407                libtrace->perpkt_thread_count, phys_cores);
1408#endif
1409
1410        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1411                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1412                free(libtrace->format_data);
1413                libtrace->format_data = NULL;
1414                return -1;
1415        }
1416
1417        /* Make sure we only start the number that we should */
1418        libtrace->perpkt_thread_count = tot;
1419        return 0;
1420}
1421
1422/**
1423 * Register a thread with the DPDK system,
1424 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1425 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1426 * gives it.
1427 *
1428 * We then allow a mapper thread to be started on every real core as DPDK would,
1429 * we also bind these to the corresponding CPU cores.
1430 *
1431 * @param libtrace A pointer to the trace
1432 * @param reading True if the thread will be used to read packets, i.e. will
1433 *                call pread_packet(), false if thread used to process packet
1434 *                in any other manner including statistics functions.
1435 */
1436int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1437{
1438#if DEBUG
1439        char name[99];
1440        name[0] = 0;
1441#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1442        pthread_getname_np(pthread_self(),
1443                           name, sizeof(name));
1444#endif
1445#endif
1446        if (reading) {
1447                dpdk_per_stream_t *stream;
1448                /* Attach our thread */
1449                if(t->type == THREAD_PERPKT) {
1450                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1451                        if (t->format_data == NULL) {
1452                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1453                                              "Too many threads registered");
1454                                return -1;
1455                        }
1456                } else {
1457                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1458                }
1459                stream = t->format_data;
1460#if DEBUG
1461                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1462#endif
1463                return dpdk_register_lcore(libtrace, true, stream->lcore);
1464        } else {
1465                int lcore = dpdk_reserve_lcore(reading, 0);
1466                if (lcore == -1) {
1467                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1468                                      " for DPDK");
1469                        return -1;
1470                }
1471#if DEBUG
1472                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1473#endif
1474                return dpdk_register_lcore(libtrace, false, lcore);
1475        }
1476
1477        return 0;
1478}
1479
1480/**
1481 * Unregister a thread with the DPDK system.
1482 *
1483 * Only previously registered threads should be calling this just before
1484 * they are destroyed.
1485 */
1486void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1487{
1488        struct rte_config *cfg = rte_eal_get_configuration();
1489
1490        if (rte_lcore_id() >= RTE_MAX_LCORE) {
1491                fprintf(stderr, "Expected core id less than or equal to RTE_MAX_LCORE in "
1492                        "dpdk_punregister_thread()\n");
1493                return;
1494        }
1495        pthread_mutex_lock(&dpdk_lock);
1496        /* Skip if master */
1497        if (rte_lcore_id() == rte_get_master_lcore()) {
1498                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1499                pthread_mutex_unlock(&dpdk_lock);
1500                return;
1501        }
1502
1503        /* Disable this core in global DPDK structs */
1504        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1505        cfg->lcore_count--;
1506        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1507        if (cfg->lcore_count < 1) {
1508                fprintf(stderr, "You cannot unregister the master lcore in dpdk_punregister_thread()\n");
1509                return;
1510        }
1511        pthread_mutex_unlock(&dpdk_lock);
1512        return;
1513}
1514
1515static int dpdk_start_output(libtrace_out_t *libtrace)
1516{
1517        char err[500];
1518        err[0] = 0;
1519
1520        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1521                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1522                free(libtrace->format_data);
1523                libtrace->format_data = NULL;
1524                return -1;
1525        }
1526        return 0;
1527}
1528
1529int dpdk_pause_input(libtrace_t * libtrace) {
1530        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1531        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1532        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1533#if DEBUG
1534                fprintf(stderr, "Pausing DPDK port\n");
1535#endif
1536                rte_eth_dev_stop(FORMAT(libtrace)->port);
1537                FORMAT(libtrace)->paused = DPDK_PAUSED;
1538                /* Empty the queue of packets */
1539                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1540                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1541                }
1542                FORMAT(libtrace)->burst_offset = 0;
1543                FORMAT(libtrace)->burst_size = 0;
1544
1545                for (; tmp != NULL; tmp = tmp->next) {
1546                        dpdk_per_stream_t *stream = tmp->data;
1547                        stream->ts_last_sys = 0;
1548#if HAS_HW_TIMESTAMPS_82580
1549                        stream->ts_first_sys = 0;
1550#endif
1551                }
1552
1553        }
1554        return 0;
1555}
1556
1557static int dpdk_write_packet(libtrace_out_t *trace,
1558                             libtrace_packet_t *packet){
1559        struct rte_mbuf* m_buff[1];
1560
1561        int wirelen = trace_get_wire_length(packet);
1562        int caplen = trace_get_capture_length(packet);
1563
1564        /* Check for a checksum and remove it */
1565        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1566            wirelen == caplen)
1567                caplen -= ETHER_CRC_LEN;
1568
1569        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1570        if (m_buff[0] == NULL) {
1571                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1572                return -1;
1573        } else {
1574                int ret;
1575                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1576                do {
1577                        ret = rte_eth_tx_burst(FORMAT(trace)->port, 0 /*queue TODO*/, m_buff, 1);
1578                } while (ret != 1);
1579        }
1580
1581        return 0;
1582}
1583
1584int dpdk_fin_input(libtrace_t * libtrace) {
1585        libtrace_list_node_t * n;
1586        /* Free our memory structures */
1587        if (libtrace->format_data != NULL) {
1588
1589                if (FORMAT(libtrace)->port != 0xFF)
1590                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1591                                                        RTE_ETH_EVENT_INTR_LSC,
1592                                                        dpdk_lsc_callback,
1593                                                        FORMAT(libtrace));
1594                /* Close the device completely, device cannot be restarted */
1595                rte_eth_dev_close(FORMAT(libtrace)->port);
1596
1597                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1598                                 FORMAT(libtrace)->nic_numa_node);
1599
1600                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1601                        dpdk_per_stream_t * stream = n->data;
1602                        if (stream->mempool)
1603                                dpdk_free_memory(stream->mempool,
1604                                                 rte_lcore_to_socket_id(stream->lcore));
1605                }
1606
1607                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1608                /* filter here if we used it */
1609                if (FORMAT(libtrace)->rss_key)
1610                        free(FORMAT(libtrace)->rss_key);
1611                free(libtrace->format_data);
1612        }
1613
1614        return 0;
1615}
1616
1617
1618static int dpdk_fin_output(libtrace_out_t * libtrace) {
1619        /* Free our memory structures */
1620        if (libtrace->format_data != NULL) {
1621                /* Close the device completely, device cannot be restarted */
1622                if (FORMAT(libtrace)->port != 0xFF)
1623                        rte_eth_dev_close(FORMAT(libtrace)->port);
1624                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1625                /* filter here if we used it */
1626                free(libtrace->format_data);
1627        }
1628
1629        return 0;
1630}
1631
1632/**
1633 * Get the start of the additional header that we added to a packet.
1634 */
1635static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1636        if (!packet) {
1637                fprintf(stderr, "NULL packet passed into dpdk_addt_hdr()\n");
1638                return NULL;
1639        }
1640        if (!packet->buffer) {
1641                fprintf(stderr, "NULL packet buffer passed into dpdk_addt_hdr()\n");
1642                return NULL;
1643        }
1644        /* Our header sits straight after the mbuf header */
1645        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1646}
1647
1648static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1649        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1650        return hdr->cap_len;
1651}
1652
1653static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1654        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1655        if (size > hdr->cap_len) {
1656                /* Cannot make a packet bigger */
1657                return trace_get_capture_length(packet);
1658        }
1659
1660        /* Reset the cached capture length first*/
1661        packet->cached.capture_length = -1;
1662        hdr->cap_len = (uint32_t) size;
1663        return trace_get_capture_length(packet);
1664}
1665
1666static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1667        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1668        int org_cap_size; /* The original capture size */
1669        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1670                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1671                               sizeof(struct hw_timestamp_82580);
1672        } else {
1673                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1674        }
1675        if (hdr->flags & INCLUDES_CHECKSUM) {
1676                return org_cap_size;
1677        } else {
1678                /* DPDK packets are always TRACE_TYPE_ETH packets */
1679                return org_cap_size + ETHER_CRC_LEN;
1680        }
1681}
1682
1683int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1684        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1685        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1686                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1687                                sizeof(struct hw_timestamp_82580);
1688        else
1689                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1690}
1691
1692int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1693                               libtrace_packet_t *packet, void *buffer,
1694                               libtrace_rt_types_t rt_type, uint32_t flags) {
1695        if (!packet) {
1696                fprintf(stderr, "NULL packet passed into dpdk_prepare_packet()\n");
1697                return TRACE_ERR_NULL_PACKET;
1698        }
1699        if (packet->buffer != buffer &&
1700            packet->buf_control == TRACE_CTRL_PACKET) {
1701                free(packet->buffer);
1702        }
1703
1704        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1705                packet->buf_control = TRACE_CTRL_PACKET;
1706        else
1707                packet->buf_control = TRACE_CTRL_EXTERNAL;
1708
1709        packet->buffer = buffer;
1710        packet->header = buffer;
1711
1712        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1713        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1714        packet->type = rt_type;
1715        return 0;
1716}
1717
1718/**
1719 * Given a packet size and a link speed, computes the
1720 * time to transmit in nanoseconds.
1721 *
1722 * @param format_data The dpdk format data from which we get the link speed
1723 *        and if unset updates it in a thread safe manner
1724 * @param pkt_size The size of the packet in bytes
1725 * @return The wire time in nanoseconds
1726 */
1727static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1728        uint32_t wire_time;
1729        /* 20 extra bytes of interframe gap and preamble */
1730# if GET_MAC_CRC_CHECKSUM
1731        wire_time = ((pkt_size + 20) * 8000);
1732# else
1733        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1734# endif
1735
1736        /* Division is really slow and introduces a pipeline stall
1737         * The compiler will optimise this into magical multiplication and shifting
1738         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1739         */
1740retry_calc_wiretime:
1741        switch (format_data->link_speed) {
1742        case ETH_SPEED_NUM_40G:
1743                wire_time /=  ETH_SPEED_NUM_40G;
1744                break;
1745        case ETH_SPEED_NUM_20G:
1746                wire_time /= ETH_SPEED_NUM_20G;
1747                break;
1748        case ETH_SPEED_NUM_10G:
1749                wire_time /= ETH_SPEED_NUM_10G;
1750                break;
1751        case ETH_SPEED_NUM_1G:
1752                wire_time /= ETH_SPEED_NUM_1G;
1753                break;
1754        case 0:
1755                {
1756                /* Maybe the link was down originally, but now it should be up */
1757                struct rte_eth_link link = {0};
1758                rte_eth_link_get_nowait(format_data->port, &link);
1759                if (link.link_status && link.link_speed) {
1760                        format_data->link_speed = link.link_speed;
1761#ifdef DEBUG
1762                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1763#endif
1764                        goto retry_calc_wiretime;
1765                }
1766                /* We don't know the link speed, make sure numbers are counting up */
1767                wire_time = 1;
1768                break;
1769                }
1770        default:
1771                wire_time /= format_data->link_speed;
1772        }
1773        return wire_time;
1774}
1775
1776/**
1777 * Does any extra preperation to all captured packets
1778 * This includes adding our extra header to it with the timestamp,
1779 * and any snapping
1780 *
1781 * @param format_data The DPDK format data
1782 * @param plc The DPDK per lcore format data
1783 * @param pkts An array of size nb_pkts of DPDK packets
1784 */
1785static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1786                                   struct dpdk_per_stream_t *plc,
1787                                   struct rte_mbuf **pkts,
1788                                   size_t nb_pkts) {
1789        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1790        struct dpdk_addt_hdr *hdr;
1791        size_t i;
1792        uint64_t cur_sys_time_ns;
1793#if HAS_HW_TIMESTAMPS_82580
1794        struct hw_timestamp_82580 *hw_ts;
1795        uint64_t estimated_wraps;
1796#else
1797
1798#endif
1799
1800#if USE_CLOCK_GETTIME
1801        struct timespec cur_sys_time = {0};
1802        /* This looks terrible and I feel bad doing it. But it's OK
1803         * on new kernels, because this is a fast vsyscall */
1804        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1805        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1806#else
1807        struct timeval cur_sys_time = {0};
1808        /* Also a fast vsyscall */
1809        gettimeofday(&cur_sys_time, NULL);
1810        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1811#endif
1812
1813        /* The system clock is not perfect so when running
1814         * at linerate we could timestamp a packet in the past.
1815         * To avoid this we munge the timestamp to appear 1ns
1816         * after the previous packet. We should eventually catch up
1817         * to system time since a 64byte packet on a 10G link takes 67ns.
1818         *
1819         * Note with parallel readers timestamping packets
1820         * with duplicate stamps or out of order is unavoidable without
1821         * hardware timestamping from the NIC.
1822         */
1823#if !HAS_HW_TIMESTAMPS_82580
1824        if (plc->ts_last_sys >= cur_sys_time_ns) {
1825                cur_sys_time_ns = plc->ts_last_sys + 1;
1826        }
1827#endif
1828
1829        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1830        for (i = 0 ; i < nb_pkts ; ++i) {
1831
1832                /* We put our header straight after the dpdk header */
1833                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1834                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1835
1836#if GET_MAC_CRC_CHECKSUM
1837                /* Add back in the CRC sum */
1838                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1839                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1840                hdr->flags |= INCLUDES_CHECKSUM;
1841#endif
1842
1843                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1844
1845#if HAS_HW_TIMESTAMPS_82580
1846                /* The timestamp is sitting before our packet and is included in pkt_len */
1847                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1848                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1849                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1850
1851                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1852                 *
1853                 *        +----------+---+   +--------------+
1854                 *  82580 |    24    | 8 |   |      32      |
1855                 *        +----------+---+   +--------------+
1856                 *          reserved  \______ 40 bits _____/
1857                 *
1858                 * The 40 bit 82580 SYSTIM overflows every
1859                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1860                 *
1861                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1862                 * Endian (for the full 64 bits) i.e. picture is mirrored
1863                 */
1864
1865                /* Despite what the documentation says this is in Little
1866                 * Endian byteorder. Mask the reserved section out.
1867                 */
1868                hdr->timestamp = le64toh(hw_ts->timestamp) &
1869                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1870
1871                if (unlikely(plc->ts_first_sys == 0)) {
1872                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1873                        plc->ts_last_sys = plc->ts_first_sys;
1874                }
1875
1876                /* This will have serious problems if packets aren't read quickly
1877                 * that is within a couple of seconds because our clock cycles every
1878                 * 18 seconds */
1879                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1880                                  / (1ull<<TS_NBITS_82580);
1881
1882                /* Estimated_wraps gives the number of times the counter should have
1883                 * wrapped (however depending on value last time it could have wrapped
1884                 * twice more (if hw clock is close to its max value) or once less (allowing
1885                 * for a bit of variance between hw and sys clock). But if the clock
1886                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1887                if (unlikely(estimated_wraps >= 2)) {
1888                        /* 2 or more wrap arounds add all but the very last wrap */
1889                        plc->wrap_count += estimated_wraps - 1;
1890                }
1891
1892                /* Set the timestamp to the lowest possible value we're considering */
1893                hdr->timestamp += plc->ts_first_sys +
1894                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1895
1896                /* In most runs only the first if() will need evaluating - i.e our
1897                 * estimate is correct. */
1898                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1899                                              hdr->timestamp, MAXSKEW_82580))) {
1900                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1901                        plc->wrap_count++;
1902                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1903                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1904                                             hdr->timestamp, MAXSKEW_82580)) {
1905                                /* Failed to match estimated_wraps */
1906                                plc->wrap_count++;
1907                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1908                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1909                                                     hdr->timestamp, MAXSKEW_82580)) {
1910                                        if (estimated_wraps == 0) {
1911                                                /* 0 case Failed to match estimated_wraps+2 */
1912                                                printf("WARNING - Hardware Timestamp failed to"
1913                                                       " match using systemtime!\n");
1914                                                hdr->timestamp = cur_sys_time_ns;
1915                                        } else {
1916                                                /* Failed to match estimated_wraps+1 */
1917                                                plc->wrap_count++;
1918                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1919                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1920                                                                     hdr->timestamp, MAXSKEW_82580)) {
1921                                                        /* Failed to match estimated_wraps+2 */
1922                                                        printf("WARNING - Hardware Timestamp failed to"
1923                                                               " match using systemtime!!\n");
1924                                                }
1925                                        }
1926                                }
1927                        }
1928                }
1929#else
1930
1931                hdr->timestamp = cur_sys_time_ns;
1932                /* Offset the next packet by the wire time of previous */
1933                calculate_wire_time(format_data, hdr->cap_len);
1934
1935#endif
1936        }
1937
1938        plc->ts_last_sys = cur_sys_time_ns;
1939        return;
1940}
1941
1942
1943static void dpdk_fin_packet(libtrace_packet_t *packet)
1944{
1945        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1946                rte_pktmbuf_free(packet->buffer);
1947                packet->buffer = NULL;
1948        }
1949}
1950
1951/** Reads at least one packet or returns an error
1952 */
1953int dpdk_read_packet_stream (libtrace_t *libtrace,
1954                                           dpdk_per_stream_t *stream,
1955                                           libtrace_message_queue_t *mesg,
1956                                           struct rte_mbuf* pkts_burst[],
1957                                           size_t nb_packets) {
1958        size_t nb_rx; /* Number of rx packets we've recevied */
1959        while (1) {
1960                /* Poll for a batch of packets */
1961                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1962                                         stream->queue_id, pkts_burst, nb_packets);
1963                if (nb_rx > 0) {
1964                        /* Got some packets - otherwise we keep spining */
1965                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
1966                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1967                        return nb_rx;
1968                }
1969                /* Check the message queue this could be less than 0 */
1970                if (mesg && libtrace_message_queue_count(mesg) > 0)
1971                        return READ_MESSAGE;
1972                if ((nb_rx=is_halted(libtrace)) != (size_t) -1)
1973                        return nb_rx;
1974                /* Wait a while, polling on memory degrades performance
1975                 * This relieves the pressure on memory allowing the NIC to DMA */
1976                rte_delay_us(10);
1977        }
1978
1979        /* We'll never get here - but if we did it would be bad */
1980        return READ_ERROR;
1981}
1982
1983static int dpdk_pread_packets (libtrace_t *libtrace,
1984                                    libtrace_thread_t *t,
1985                                    libtrace_packet_t **packets,
1986                                    size_t nb_packets) {
1987        int nb_rx; /* Number of rx packets we've recevied */
1988        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
1989        int i;
1990        dpdk_per_stream_t *stream = t->format_data;
1991        struct dpdk_addt_hdr * hdr;
1992
1993        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
1994                                         pkts_burst, nb_packets);
1995
1996        if (nb_rx > 0) {
1997                for (i = 0; i < nb_rx; ++i) {
1998                        if (packets[i]->buffer != NULL) {
1999                                /* The packet should always be finished */
2000                                if (packets[i]->buf_control != TRACE_CTRL_PACKET) {
2001                                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Expected packet buffer "
2002                                                "to be empty in dpdk_pread_packets()\n");
2003                                        return -1;
2004                                }
2005                                free(packets[i]->buffer);
2006                        }
2007                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2008                        packets[i]->type = TRACE_RT_DATA_DPDK;
2009                        packets[i]->buffer = pkts_burst[i];
2010                        packets[i]->trace = libtrace;
2011                        packets[i]->error = 1;
2012                        hdr = (struct dpdk_addt_hdr *) 
2013                                        ((struct rte_mbuf*) pkts_burst[i] + 1);
2014                        packets[i]->order = hdr->timestamp;
2015                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2016                }
2017        }
2018
2019        return nb_rx;
2020}
2021
2022int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2023        int nb_rx; /* Number of rx packets we've received */
2024        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2025
2026        /* Free the last packet buffer */
2027        if (packet->buffer != NULL) {
2028                /* The packet should always be finished */
2029                if (packet->buf_control != TRACE_CTRL_PACKET) {
2030                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Expected packet buffer to be "
2031                                "empty in dpdk_read_packet()\n");
2032                        return -1;
2033                }
2034                free(packet->buffer);
2035                packet->buffer = NULL;
2036        }
2037
2038        packet->buf_control = TRACE_CTRL_EXTERNAL;
2039        packet->type = TRACE_RT_DATA_DPDK;
2040
2041        /* Check if we already have some packets buffered */
2042        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2043                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2044                packet->trace = libtrace;
2045                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2046                return 1; // TODO should be bytes read, which essentially useless anyway
2047        }
2048
2049        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2050                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2051
2052        if (nb_rx > 0) {
2053                FORMAT(libtrace)->burst_size = nb_rx;
2054                FORMAT(libtrace)->burst_offset = 1;
2055                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2056                packet->trace = libtrace;
2057                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2058                return 1;
2059        }
2060        return nb_rx;
2061}
2062
2063static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2064        struct timeval tv;
2065        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2066
2067        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2068        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2069        return tv;
2070}
2071
2072static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2073        struct timespec ts;
2074        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2075
2076        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2077        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2078        return ts;
2079}
2080
2081static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2082        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2083}
2084
2085static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2086        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2087        return (libtrace_direction_t) hdr->direction;
2088}
2089
2090static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2091        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2092        hdr->direction = (uint8_t) direction;
2093        return (libtrace_direction_t) hdr->direction;
2094}
2095
2096void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2097        struct rte_eth_stats dev_stats = {0};
2098
2099        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2100                return;
2101
2102        /* Grab the current stats */
2103        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2104
2105        stats->captured_valid = true;
2106        stats->captured = dev_stats.ipackets;
2107
2108        stats->dropped_valid = true;
2109        stats->dropped = dev_stats.imissed;
2110
2111#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2112        /* DPDK commit 86057c fixes ensures missed does not get counted as
2113         * errors */
2114        stats->errors_valid = true;
2115        stats->errors = dev_stats.ierrors;
2116#else
2117        /* DPDK errors includes drops */
2118        stats->errors_valid = true;
2119        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2120#endif
2121        stats->received_valid = true;
2122        stats->received = dev_stats.ipackets + dev_stats.imissed;
2123
2124}
2125
2126/* Attempts to read a packet in a non-blocking fashion. If one is not
2127 * available a SLEEP event is returned. We do not have the ability to
2128 * create a select()able file descriptor in DPDK.
2129 */
2130libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2131                                            libtrace_packet_t *packet) {
2132        libtrace_eventobj_t event = {0,0,0.0,0};
2133        size_t nb_rx; /* Number of received packets we've read */
2134
2135        do {
2136
2137                /* No packets waiting in our buffer? Try and read some more */
2138                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
2139                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2140                                                 FORMAT_DATA_FIRST(trace)->queue_id,
2141                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
2142                        if (nb_rx > 0) {
2143                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
2144                                                FORMAT(trace)->burst_pkts, nb_rx);
2145                                FORMAT(trace)->burst_size = nb_rx;
2146                                FORMAT(trace)->burst_offset = 0;
2147                        }
2148                }
2149
2150                /* Now do we have packets waiting? */
2151                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
2152                        /* Free the last packet buffer */
2153                        if (packet->buffer != NULL) {
2154                                /* The packet should always be finished */
2155                                if (packet->buf_control != TRACE_CTRL_PACKET) {
2156                                        trace_set_err(trace, TRACE_ERR_BAD_PACKET, "Expected packet "
2157                                                "buffer to be empty in dpdk_trace_event()\n");
2158                                        event.type = TRACE_EVENT_TERMINATE;
2159                                        return event;
2160                                }
2161                                free(packet->buffer);
2162                                packet->buffer = NULL;
2163                        }
2164
2165                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2166                        packet->type = TRACE_RT_DATA_DPDK;
2167                        event.type = TRACE_EVENT_PACKET;
2168                        packet->buffer = FORMAT(trace)->burst_pkts[
2169                                             FORMAT(trace)->burst_offset++];
2170                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2171                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2172
2173                        /* XXX - Check this passes the filter trace_read_packet normally
2174                         * does this for us but this wont */
2175                        if (trace->filter) {
2176                                if (!trace_apply_filter(trace->filter, packet)) {
2177                                        /* Failed the filter so we loop for another packet */
2178                                        trace->filtered_packets ++;
2179                                        continue;
2180                                }
2181                        }
2182                        trace->accepted_packets ++;
2183                } else {
2184                        /* We only want to sleep for a very short time - we are non-blocking */
2185                        event.type = TRACE_EVENT_SLEEP;
2186                        event.seconds = 0.0001;
2187                        event.size = 0;
2188                }
2189
2190                /* If we get here we have our event */
2191                break;
2192        } while (1);
2193
2194        return event;
2195}
2196
2197static void dpdk_help(void) {
2198        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
2199        printf("Supported input URIs:\n");
2200        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2201        printf("\tThe -<coreid> is optional \n");
2202        printf("\t e.g. dpdk:0000:01:00.1\n");
2203        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2204        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2205        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2206        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2207        printf("\n");
2208        printf("Supported output URIs:\n");
2209        printf("\tSame format as the input URI.\n");
2210        printf("\t e.g. dpdk:0000:01:00.1\n");
2211        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2212        printf("\n");
2213}
2214
2215static struct libtrace_format_t dpdk = {
2216        "dpdk",
2217        "$Id$",
2218        TRACE_FORMAT_DPDK,
2219        NULL,                               /* probe filename */
2220        NULL,                               /* probe magic */
2221        dpdk_init_input,                    /* init_input */
2222        dpdk_config_input,                  /* config_input */
2223        dpdk_start_input,                   /* start_input */
2224        dpdk_pause_input,                   /* pause_input */
2225        dpdk_init_output,                   /* init_output */
2226        NULL,                               /* config_output */
2227        dpdk_start_output,                  /* start_ouput */
2228        dpdk_fin_input,                     /* fin_input */
2229        dpdk_fin_output,                    /* fin_output */
2230        dpdk_read_packet,                   /* read_packet */
2231        dpdk_prepare_packet,                /* prepare_packet */
2232        dpdk_fin_packet,                    /* fin_packet */
2233        dpdk_write_packet,                  /* write_packet */
2234        NULL,                               /* flush_output */
2235        dpdk_get_link_type,                 /* get_link_type */
2236        dpdk_get_direction,                 /* get_direction */
2237        dpdk_set_direction,                 /* set_direction */
2238        NULL,                               /* get_erf_timestamp */
2239        dpdk_get_timeval,                   /* get_timeval */
2240        dpdk_get_timespec,                  /* get_timespec */
2241        NULL,                               /* get_seconds */
2242        NULL,                               /* seek_erf */
2243        NULL,                               /* seek_timeval */
2244        NULL,                               /* seek_seconds */
2245        dpdk_get_capture_length,            /* get_capture_length */
2246        dpdk_get_wire_length,               /* get_wire_length */
2247        dpdk_get_framing_length,            /* get_framing_length */
2248        dpdk_set_capture_length,            /* set_capture_length */
2249        NULL,                               /* get_received_packets */
2250        NULL,                               /* get_filtered_packets */
2251        NULL,                               /* get_dropped_packets */
2252        dpdk_get_stats,                     /* get_statistics */
2253        NULL,                               /* get_fd */
2254        dpdk_trace_event,                   /* trace_event */
2255        dpdk_help,                          /* help */
2256        NULL,                               /* next pointer */
2257        {true, 8},                          /* Live, NICs typically have 8 threads */
2258        dpdk_pstart_input,                  /* pstart_input */
2259        dpdk_pread_packets,                 /* pread_packets */
2260        dpdk_pause_input,                   /* ppause */
2261        dpdk_fin_input,                     /* p_fin */
2262        dpdk_pregister_thread,              /* pregister_thread */
2263        dpdk_punregister_thread,            /* punregister_thread */
2264        NULL                                /* get thread stats */
2265};
2266
2267void dpdk_constructor(void) {
2268        register_format(&dpdk);
2269}
Note: See TracBrowser for help on using the repository browser.