source: lib/format_dpdk.c @ 2725318

develop
Last change on this file since 2725318 was 2725318, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Cleanup some of the assertions

  • Property mode set to 100644
File size: 71.1 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 *
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44#include "format_dpdk.h"
45
46#ifdef HAVE_INTTYPES_H
47#  include <inttypes.h>
48#else
49# error "Can't find inttypes.h"
50#endif
51
52#include <stdlib.h>
53#include <assert.h>
54#include <unistd.h>
55#include <endian.h>
56#include <string.h>
57#include <math.h>
58
59#ifdef HAVE_LIBNUMA
60#include <numa.h>
61#endif
62
63#define MBUF(x) ((struct rte_mbuf *) x)
64/* Get the original placement of the packet data */
65#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
66#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
67#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
68
69#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
70#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
71
72#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
73                        (uint64_t) tv.tv_usec*1000ull)
74#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
75                        (uint64_t) ts.tv_nsec)
76
77#if RTE_PKTMBUF_HEADROOM != 128
78#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
79         "any libtrace instance processing these packet must be have the" \
80         "same RTE_PKTMBUF_HEADROOM set"
81#endif
82
83
84static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
85/* Memory pools Per NUMA node */
86static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
87
88/* Used by both input and output however some fields are not used
89 * for output */
90struct dpdk_format_data_t {
91        int8_t promisc; /* promiscuous mode - RX only */
92        uint8_t paused; /* See paused_state */
93        portid_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
94        portid_t nb_ports; /* Total number of usable ports on system should be 1 */
95        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
96        int snaplen; /* The snap length for the capture - RX only */
97        /* We always have to setup both rx and tx queues even if we don't want them */
98        int nb_rx_buf; /* The number of packet buffers in the rx ring */
99        int nb_tx_buf; /* The number of packet buffers in the tx ring */
100        int nic_numa_node; /* The NUMA node that the NIC is attached to */
101        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
102#if DPDK_USE_BLACKLIST
103        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
104        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
105#endif
106        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
107        enum hasher_types hasher_type;
108        uint8_t *rss_key;
109        /* To improve single-threaded performance we always batch reading
110         * packets, in a burst, otherwise the parallel library does this for us */
111        struct rte_mbuf* burst_pkts[BURST_SIZE];
112        int burst_size; /* The total number read in the burst */
113        int burst_offset; /* The offset we are into the burst */
114
115        /* Our parallel streams */
116        libtrace_list_t *per_stream;
117};
118
119enum dpdk_addt_hdr_flags {
120        INCLUDES_CHECKSUM = 0x1,
121        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
122};
123
124/**
125 * A structure placed in front of the packet where we can store
126 * additional information about the given packet.
127 * +--------------------------+
128 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
129 * +--------------------------+
130 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
131 * +--------------------------+
132 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
133 * +--------------------------+
134 * *   hw_timestamp_82580     * 16 bytes Optional
135 * +--------------------------+
136 * |       Packet data        | Variable Size
137 * |                          |
138 */
139struct dpdk_addt_hdr {
140        uint64_t timestamp;
141        uint8_t flags;
142        uint8_t direction;
143        uint8_t reserved1;
144        uint8_t reserved2;
145        uint32_t cap_len; /* The size to say the capture is */
146};
147
148/**
149 * We want to blacklist all devices except those on the whitelist
150 * (I say list, but yes it is only the one).
151 *
152 * The default behaviour of rte_pci_probe() will map every possible device
153 * to its DPDK driver. The DPDK driver will take the ethernet device
154 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
155 *
156 * So blacklist all devices except the one that we wish to use so that
157 * the others can still be used as standard ethernet ports.
158 *
159 * @return 0 if successful, otherwise -1 on error.
160 */
161#if DPDK_USE_BLACKLIST
162static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
163{
164        struct rte_pci_device *dev = NULL;
165        format_data->nb_blacklist = 0;
166
167        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
168
169        TAILQ_FOREACH(dev, &device_list, next) {
170        if (whitelist != NULL && whitelist->domain == dev->addr.domain
171            && whitelist->bus == dev->addr.bus
172            && whitelist->devid == dev->addr.devid
173            && whitelist->function == dev->addr.function)
174            continue;
175                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
176                                / sizeof (format_data->blacklist[0])) {
177                        fprintf(stderr, "Warning: too many devices to blacklist consider"
178                                        " increasing BLACK_LIST_SIZE");
179                        break;
180                }
181                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
182                ++format_data->nb_blacklist;
183        }
184
185        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
186        return 0;
187}
188#else /* DPDK_USE_BLACKLIST */
189#include <rte_devargs.h>
190static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
191{
192        char pci_str[20] = {0};
193        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
194                 whitelist->domain,
195                 whitelist->bus,
196                 whitelist->devid,
197                 whitelist->function);
198        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
199                return -1;
200        }
201        return 0;
202}
203#endif
204
205/**
206 * Parse the URI format as a pci address
207 * Fills in addr, note core is optional and is unchanged if
208 * a value for it is not provided.
209 *
210 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
211 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
212 */
213static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
214        int matches;
215        assert(str);
216#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
217        matches = sscanf(str, "%8"SCNx32":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
218                         &addr->domain, &addr->bus, &addr->devid,
219                         &addr->function, core);
220#else
221        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
222                         &addr->domain, &addr->bus, &addr->devid,
223                         &addr->function, core);
224#endif
225        if (matches >= 4) {
226                return 0;
227        } else {
228                return -1;
229        }
230}
231
232/**
233 * Convert a pci address to the numa node it is
234 * connected to.
235 *
236 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
237 * so we can call it before DPDK
238 *
239 * @return -1 if unknown otherwise a number 0 or higher of the numa node
240 */
241static int pci_to_numa(struct rte_pci_addr * dev_addr) {
242        char path[50] = {0};
243        FILE *file;
244
245        /* Read from the system */
246        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
247                 dev_addr->domain,
248                 dev_addr->bus,
249                 dev_addr->devid,
250                 dev_addr->function);
251
252        if((file = fopen(path, "r")) != NULL) {
253                int numa_node = -1;
254                fscanf(file, "%d", &numa_node);
255                fclose(file);
256                return numa_node;
257        }
258        return -1;
259}
260
261#if DEBUG
262/* For debugging */
263static inline void dump_configuration()
264{
265        struct rte_config * global_config;
266        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
267
268        if (nb_cpu <= 0) {
269                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
270                       " Falling back to the first core.");
271                nb_cpu = 1; /* fallback to just 1 core */
272        }
273        if (nb_cpu > RTE_MAX_LCORE)
274                nb_cpu = RTE_MAX_LCORE;
275
276        global_config = rte_eal_get_configuration();
277
278        if (global_config != NULL) {
279                int i;
280                fprintf(stderr, "Intel DPDK setup\n"
281                        "---Version      : %s\n"
282                        "---Master LCore : %"PRIu32"\n"
283                        "---LCore Count  : %"PRIu32"\n",
284                        rte_version(),
285                        global_config->master_lcore, global_config->lcore_count);
286
287                for (i = 0 ; i < nb_cpu; i++) {
288                        fprintf(stderr, "   ---Core %d : %s\n", i,
289                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
290                }
291
292                const char * proc_type;
293                switch (global_config->process_type) {
294                case RTE_PROC_AUTO:
295                        proc_type = "auto";
296                        break;
297                case RTE_PROC_PRIMARY:
298                        proc_type = "primary";
299                        break;
300                case RTE_PROC_SECONDARY:
301                        proc_type = "secondary";
302                        break;
303                case RTE_PROC_INVALID:
304                        proc_type = "invalid";
305                        break;
306                default:
307                        proc_type = "something worse than invalid!!";
308                }
309                fprintf(stderr, "---Process Type : %s\n", proc_type);
310        }
311
312}
313#endif
314
315/**
316 * Expects to be called from the master lcore and moves it to the given dpdk id
317 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
318 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
319 *               and not already in use.
320 * @return 0 is successful otherwise -1 on error.
321 */
322static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
323        struct rte_config *cfg = rte_eal_get_configuration();
324        cpu_set_t cpuset;
325        int i;
326
327        assert (core < RTE_MAX_LCORE);
328        assert (rte_get_master_lcore() == rte_lcore_id());
329
330        if (core == rte_lcore_id())
331                return 0;
332
333        /* Make sure we are not overwriting someone else */
334        assert(!rte_lcore_is_enabled(core));
335
336        /* Move the core */
337        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
338        cfg->lcore_role[core] = ROLE_RTE;
339        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
340        rte_eal_get_configuration()->master_lcore = core;
341        RTE_PER_LCORE(_lcore_id) = core;
342
343        /* Now change the affinity, either mapped to a single core or all accepted */
344        CPU_ZERO(&cpuset);
345
346        if (lcore_config[core].detected) {
347                CPU_SET(core, &cpuset);
348        } else {
349                for (i = 0; i < RTE_MAX_LCORE; ++i) {
350                        if (lcore_config[i].detected)
351                                CPU_SET(i, &cpuset);
352                }
353        }
354
355        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
356        if (i != 0) {
357                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
358                return -1;
359        }
360        return 0;
361}
362
363/**
364 * XXX This is very bad XXX
365 * But we have to do something to allow getopts nesting
366 * Luckly normally the format is last so it doesn't matter
367 * DPDK only supports modern systems so hopefully this
368 * will continue to work
369 */
370struct saved_getopts {
371        char *optarg;
372        int optind;
373        int opterr;
374        int optopt;
375};
376
377static void save_getopts(struct saved_getopts *opts) {
378        opts->optarg = optarg;
379        opts->optind = optind;
380        opts->opterr = opterr;
381        opts->optopt = optopt;
382}
383
384static void restore_getopts(struct saved_getopts *opts) {
385        optarg = opts->optarg;
386        optind = opts->optind;
387        opterr = opts->opterr;
388        optopt = opts->optopt;
389}
390
391static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
392                                        char * err, int errlen) {
393        int ret; /* Returned error codes */
394        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
395        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
396        char mem_map[20] = {0}; /* The memory name */
397        long nb_cpu; /* The number of CPUs in the system */
398        long my_cpu; /* The CPU number we want to bind to */
399        int i;
400        struct rte_config *cfg = rte_eal_get_configuration();
401        struct saved_getopts save_opts;
402
403        /* This initialises the Environment Abstraction Layer (EAL)
404         * If we had slave workers these are put into WAITING state
405         *
406         * Basically binds this thread to a fixed core, which we choose as
407         * the last core on the machine (assuming fewer interrupts mapped here).
408         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
409         * "-n" the number of memory channels into the CPU (hardware specific)
410         *      - Most likely to be half the number of ram slots in your machine.
411         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
412         * Controls where in memory packets are stored such that they are spread
413         * across the channels. We just use 1 to be safe.
414         *
415         * Using unique file prefixes mean separate memory is used, unlinking
416         * the two processes. However be careful we still cannot access a
417         * port that already in use.
418         */
419        char* argv[] = {"libtrace",
420                        "-c", cpu_number,
421                        "-n", "1",
422                        "--proc-type", "auto",
423                        "--file-prefix", mem_map,
424                        "-m", "512",
425#if DPDK_USE_LOG_LEVEL
426#       if DEBUG
427                        "--log-level", "8", /* RTE_LOG_DEBUG */
428#       else
429                        "--log-level", "5", /* RTE_LOG_WARNING */
430#       endif
431#endif
432                        NULL};
433        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
434
435#if DEBUG
436        rte_log_set_global_level(RTE_LOG_DEBUG);
437#else
438        rte_log_set_global_level(RTE_LOG_WARNING);
439#endif
440
441        /* Get the number of cpu cores in the system and use the last core
442         * on the correct numa node */
443        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
444        if (nb_cpu <= 0) {
445                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
446                       " Falling back to the first core.");
447                nb_cpu = 1; /* fallback to the first core */
448        }
449        if (nb_cpu > RTE_MAX_LCORE)
450                nb_cpu = RTE_MAX_LCORE;
451
452        my_cpu = -1;
453        /* This allows the user to specify the core - we would try to do this
454         * automatically but it's hard to tell that this is secondary
455         * before running rte_eal_init(...). Currently we are limited to 1
456         * instance per core due to the way memory is allocated. */
457        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
458                snprintf(err, errlen, "Failed to parse URI");
459                return -1;
460        }
461
462#ifdef HAVE_LIBNUMA
463        format_data->nic_numa_node = pci_to_numa(&use_addr);
464        if (my_cpu < 0) {
465#if DEBUG
466                /* If we can assign to a core on the same numa node */
467                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
468#endif
469                if(format_data->nic_numa_node >= 0) {
470                        int max_node_cpu = -1;
471                        struct bitmask *mask = numa_allocate_cpumask();
472                        assert(mask);
473                        numa_node_to_cpus(format_data->nic_numa_node, mask);
474                        for (i = 0 ; i < nb_cpu; ++i) {
475                                if (numa_bitmask_isbitset(mask,i))
476                                        max_node_cpu = i+1;
477                        }
478                        my_cpu = max_node_cpu;
479                }
480        }
481#endif
482        if (my_cpu < 0) {
483                my_cpu = nb_cpu;
484        }
485
486
487        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
488                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
489
490        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
491                snprintf(err, errlen,
492                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
493                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
494                return -1;
495        }
496
497        /* Make our mask with all cores turned on this is so that DPDK
498         * gets all CPU info in older versions */
499        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
500        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
501
502#if !DPDK_USE_BLACKLIST
503        /* Black list all ports besides the one that we want to use */
504        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
505                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
506                         " are you sure the address is correct?: %s", strerror(-ret));
507                return -1;
508        }
509#endif
510
511        /* Give the memory map a unique name */
512        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
513        /* rte_eal_init it makes a call to getopt so we need to reset the
514         * global optind variable of getopt otherwise this fails */
515        save_getopts(&save_opts);
516        optind = 1;
517        if ((ret = rte_eal_init(argc, argv)) < 0) {
518                snprintf(err, errlen,
519                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
520                return -1;
521        }
522        restore_getopts(&save_opts);
523        // These are still running but will never do anything with DPDK v1.7 we
524        // should remove this XXX in the future
525        for(i = 0; i < RTE_MAX_LCORE; ++i) {
526                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
527                        cfg->lcore_role[i] = ROLE_OFF;
528                        cfg->lcore_count--;
529                }
530        }
531        // Only the master should be running
532        assert(cfg->lcore_count == 1);
533
534        // TODO XXX TODO
535        dpdk_move_master_lcore(NULL, my_cpu-1);
536
537#if DEBUG
538        dump_configuration();
539#endif
540
541#if DPDK_USE_PMD_INIT
542        /* This registers all available NICs with Intel DPDK
543         * These are not loaded until rte_eal_pci_probe() is called.
544         */
545        if ((ret = rte_pmd_init_all()) < 0) {
546                snprintf(err, errlen,
547                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
548                return -1;
549        }
550#endif
551
552#if DPDK_USE_BLACKLIST
553        /* Blacklist all ports besides the one that we want to use */
554        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
555                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
556                         " are you sure the address is correct?: %s", strerror(-ret));
557                return -1;
558        }
559#endif
560
561#if DPDK_USE_PCI_PROBE
562        /* This loads DPDK drivers against all ports that are not blacklisted */
563        if ((ret = rte_eal_pci_probe()) < 0) {
564                snprintf(err, errlen,
565                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
566                return -1;
567        }
568#endif
569
570        format_data->nb_ports = rte_eth_dev_count();
571
572        if (format_data->nb_ports != 1) {
573                snprintf(err, errlen,
574                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
575                         format_data->nb_ports);
576                return -1;
577        }
578
579        return 0;
580}
581
582int dpdk_init_input (libtrace_t *libtrace) {
583        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
584        char err[500];
585        err[0] = 0;
586
587        libtrace->format_data = (struct dpdk_format_data_t *)
588                                malloc(sizeof(struct dpdk_format_data_t));
589
590        if (!libtrace->format_data) {
591                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory dpdk_init_input()");
592                return 1;
593        }
594
595        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
596        FORMAT(libtrace)->nb_ports = 0;
597        FORMAT(libtrace)->snaplen = 0; /* Use default */
598        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
599        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
600        FORMAT(libtrace)->nic_numa_node = -1;
601        FORMAT(libtrace)->promisc = -1;
602        FORMAT(libtrace)->pktmbuf_pool = NULL;
603#if DPDK_USE_BLACKLIST
604        FORMAT(libtrace)->nb_blacklist = 0;
605#endif
606        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
607        FORMAT(libtrace)->mempool_name[0] = 0;
608        memset(FORMAT(libtrace)->burst_pkts, 0,
609               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
610        FORMAT(libtrace)->burst_size = 0;
611        FORMAT(libtrace)->burst_offset = 0;
612        FORMAT(libtrace)->hasher_type = HASHER_BALANCE;
613        FORMAT(libtrace)->rss_key = NULL;
614
615        /* Make our first stream */
616        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
617        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
618
619        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
620                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
621                free(libtrace->format_data);
622                libtrace->format_data = NULL;
623                return -1;
624        }
625        return 0;
626}
627
628static int dpdk_init_output(libtrace_out_t *libtrace)
629{
630        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
631        char err[500];
632        err[0] = 0;
633
634        libtrace->format_data = (struct dpdk_format_data_t *)
635                                malloc(sizeof(struct dpdk_format_data_t));
636
637        if (!libtrace->format_data) {
638                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory dpdk_init_output()");
639                return -1;
640        }
641        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
642        FORMAT(libtrace)->nb_ports = 0;
643        FORMAT(libtrace)->snaplen = 0; /* Use default */
644        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
645        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
646        FORMAT(libtrace)->nic_numa_node = -1;
647        FORMAT(libtrace)->promisc = -1;
648        FORMAT(libtrace)->pktmbuf_pool = NULL;
649#if DPDK_USE_BLACKLIST
650        FORMAT(libtrace)->nb_blacklist = 0;
651#endif
652        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
653        FORMAT(libtrace)->mempool_name[0] = 0;
654        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
655        FORMAT(libtrace)->burst_size = 0;
656        FORMAT(libtrace)->burst_offset = 0;
657
658        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
659        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
660
661        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
662                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
663                free(libtrace->format_data);
664                libtrace->format_data = NULL;
665                return -1;
666        }
667        return 0;
668}
669
670/**
671 * Note here snaplen excludes the MAC checksum. Packets over
672 * the requested snaplen will be dropped. (Excluding MAC checksum)
673 *
674 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
675 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
676 * is set the maximum size of the returned packet would be 1518 otherwise
677 * 1514 would be the largest size possibly returned.
678 *
679 */
680int dpdk_config_input (libtrace_t *libtrace,
681                              trace_option_t option,
682                              void *data) {
683        switch (option) {
684        case TRACE_OPTION_SNAPLEN:
685                /* Only support changing snaplen before a call to start is
686                 * made */
687                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
688                        FORMAT(libtrace)->snaplen=*(int*)data;
689                else
690                        return -1;
691                return 0;
692        case TRACE_OPTION_PROMISC:
693                FORMAT(libtrace)->promisc=*(int*)data;
694                return 0;
695        case TRACE_OPTION_HASHER:
696                switch (*((enum hasher_types *) data))
697                {
698                case HASHER_BALANCE:
699                case HASHER_UNIDIRECTIONAL:
700                case HASHER_BIDIRECTIONAL:
701                        FORMAT(libtrace)->hasher_type = *(enum hasher_types*)data;
702                        if (FORMAT(libtrace)->rss_key)
703                                free(FORMAT(libtrace)->rss_key);
704                        FORMAT(libtrace)->rss_key = NULL;
705                        return 0;
706                case HASHER_CUSTOM:
707                        // Let libtrace do this
708                        return -1;
709                }
710                break;
711        case TRACE_OPTION_FILTER:
712                /* TODO filtering */
713        case TRACE_OPTION_META_FREQ:
714        case TRACE_OPTION_EVENT_REALTIME:
715        case TRACE_OPTION_REPLAY_SPEEDUP:
716                break;
717        /* Avoid default: so that future options will cause a warning
718         * here to remind us to implement it, or flag it as
719         * unimplementable
720         */
721        }
722
723        /* Don't set an error - trace_config will try to deal with the
724         * option and will set an error if it fails */
725        return -1;
726}
727
728/* Can set jumbo frames/ or limit the size of a frame by setting both
729 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
730 *
731 */
732static struct rte_eth_conf port_conf = {
733        .rxmode = {
734                .mq_mode = ETH_RSS,
735                .split_hdr_size = 0,
736                .header_split   = 0, /**< Header Split disabled */
737                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
738                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
739                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
740                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
741#if GET_MAC_CRC_CHECKSUM
742/* So it appears that if hw_strip_crc is turned off the driver will still
743 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
744 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
745 * So lets just add it back on when we receive the packet.
746 */
747                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
748#else
749/* By default strip the MAC checksum because it's a bit of a hack to
750 * actually read these. And don't want to rely on disabling this to actualy
751 * always cut off the checksum in the future
752 */
753                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
754#endif
755        },
756        .txmode = {
757                .mq_mode = ETH_DCB_NONE,
758        },
759        .rx_adv_conf = {
760                .rss_conf = {
761                        .rss_hf = RX_RSS_FLAGS,
762                },
763        },
764        .intr_conf = {
765                .lsc = 1
766        }
767};
768
769static const struct rte_eth_rxconf rx_conf = {
770        .rx_thresh = {
771                .pthresh = 8,/* RX_PTHRESH prefetch */
772                .hthresh = 8,/* RX_HTHRESH host */
773                .wthresh = 4,/* RX_WTHRESH writeback */
774        },
775        .rx_free_thresh = 0,
776        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
777};
778
779static const struct rte_eth_txconf tx_conf = {
780        .tx_thresh = {
781                /*
782                 * TX_PTHRESH prefetch
783                 * Set on the NIC, if the number of unprocessed descriptors to queued on
784                 * the card fall below this try grab at least hthresh more unprocessed
785                 * descriptors.
786                 */
787                .pthresh = 36,
788
789                /* TX_HTHRESH host
790                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
791                 */
792                .hthresh = 0,
793
794                /* TX_WTHRESH writeback
795                 * Set on the NIC, the number of sent descriptors before writing back
796                 * status to confirm the transmission. This is done more efficiently as
797                 * a bulk DMA-transfer rather than writing one at a time.
798                 * Similar to tx_free_thresh however this is applied to the NIC, where
799                 * as tx_free_thresh is when DPDK will check these. This is extended
800                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
801                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
802                 */
803                .wthresh = 4,
804        },
805
806        /* Used internally by DPDK rather than passed to the NIC. The number of
807         * packet descriptors to send before checking for any responses written
808         * back (to confirm the transmission). Default = 32 if set to 0)
809         */
810        .tx_free_thresh = 0,
811
812        /* This is the Report Status threshold, used by 10Gbit cards,
813         * This signals the card to only write back status (such as
814         * transmission successful) after this minimum number of transmit
815         * descriptors are seen. The default is 32 (if set to 0) however if set
816         * to greater than 1 TX wthresh must be set to zero, because this is kindof
817         * a replacement. See the dpdk programmers guide for more restrictions.
818         */
819        .tx_rs_thresh = 1,
820};
821
822/**
823 * A callback for a link state change (LSC).
824 *
825 * Packets may be received before this notification. In fact the DPDK IGXBE
826 * driver likes to put a delay upto 5sec before sending this.
827 *
828 * We use this to ensure the link speed is correct for our timestamp
829 * calculations. Because packets might be received before the link up we still
830 * update this when the packet is received.
831 *
832 * @param port The DPDK port
833 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
834 * @param cb_arg The dpdk_format_data_t structure associated with the format
835 */
836#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
837static int dpdk_lsc_callback(portid_t port, enum rte_eth_event_type event,
838                              void *cb_arg, void *retparam UNUSED) {
839#else
840static void dpdk_lsc_callback(portid_t port, enum rte_eth_event_type event,
841                              void *cb_arg) {
842#endif
843        struct dpdk_format_data_t * format_data = cb_arg;
844        struct rte_eth_link link_info;
845        assert(event == RTE_ETH_EVENT_INTR_LSC);
846        assert(port == format_data->port);
847
848        rte_eth_link_get_nowait(port, &link_info);
849
850        if (link_info.link_status)
851                format_data->link_speed = link_info.link_speed;
852        else
853                format_data->link_speed = 0;
854
855#if DEBUG
856        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
857                link_info.link_status ? "up" : "down",
858                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
859                                          "full-duplex" : "half-duplex",
860                (int) link_info.link_speed);
861#endif
862
863        /* Turns out DPDK drivers might not come back up if the link speed
864         * changes. So we reset the autoneg procedure. This is very unsafe
865         * we have have threads reading packets and we stop the port. */
866#if 0
867        if (!link_info.link_status) {
868                int ret;
869                rte_eth_dev_stop(port);
870                ret = rte_eth_dev_start(port);
871                if (ret < 0) {
872                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
873                                strerror(-ret));
874                }
875        }
876#endif
877#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
878        return 0;
879#endif
880}
881
882/** Reserve a DPDK lcore ID for a thread globally.
883 *
884 * @param real If true allocate a real lcore, otherwise allocate a core which
885 * does not exist on the local machine.
886 * @param socket the prefered NUMA socket - only used if a real core is requested
887 * @return a valid core, which can later be used with dpdk_register_lcore() or a
888 * -1 if have run out of cores.
889 *
890 * If any thread is reading or freeing packets we need to register it here
891 * due to TLS caches in the memory pools.
892 */
893static int dpdk_reserve_lcore(bool real, int socket) {
894        int new_id = -1;
895        int i;
896        struct rte_config *cfg = rte_eal_get_configuration();
897        (void) socket;
898
899        pthread_mutex_lock(&dpdk_lock);
900        /* If 'reading packets' fill in cores from 0 up and bind affinity
901         * otherwise start from the MAX core (which is also the master) and work backwards
902         * in this case physical cores on the system will not exist so we don't bind
903         * these to any particular physical core */
904        if (real) {
905#ifdef HAVE_LIBNUMA
906                for (i = 0; i < RTE_MAX_LCORE; ++i) {
907                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
908                                new_id = i;
909                                if (!lcore_config[i].detected)
910                                        new_id = -1;
911                                break;
912                        }
913                }
914#endif
915                /* Retry without the the numa restriction */
916                if (new_id == -1) {
917                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
918                                if (!rte_lcore_is_enabled(i)) {
919                                        new_id = i;
920                                        if (!lcore_config[i].detected)
921                                                fprintf(stderr, "Warning the"
922                                                        " number of 'reading' "
923                                                        "threads exceed cores\n");
924                                        break;
925                                }
926                        }
927                }
928        } else {
929                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
930                        if (!rte_lcore_is_enabled(i)) {
931                                new_id = i;
932                                break;
933                        }
934                }
935        }
936
937        if (new_id != -1) {
938                /* Enable the core in global DPDK structs */
939                cfg->lcore_role[new_id] = ROLE_RTE;
940                cfg->lcore_count++;
941        }
942
943        pthread_mutex_unlock(&dpdk_lock);
944        return new_id;
945}
946
947/** Register a thread as a lcore
948 * @param libtrace any error is set against libtrace on exit
949 * @param real If this is a true lcore we will bind its affinty to the
950 * requested core.
951 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
952 * @return 0, if successful otherwise -1 if an error occured (details are stored
953 * in libtrace)
954 *
955 * @note This must be called from the thread being registered.
956 */
957static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
958        int ret;
959        RTE_PER_LCORE(_lcore_id) = lcore;
960
961        /* Set affinity bind to corresponding core */
962        if (real) {
963                cpu_set_t cpuset;
964                CPU_ZERO(&cpuset);
965                CPU_SET(rte_lcore_id(), &cpuset);
966                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
967                if (ret != 0) {
968                        trace_set_err(libtrace, errno, "Warning "
969                                      "pthread_setaffinity_np failed");
970                        return -1;
971                }
972        }
973
974        return 0;
975}
976
977/** Allocates a new dpdk packet buffer memory pool.
978 *
979 * @param n The number of threads
980 * @param pkt_size The packet size we need ot store
981 * @param socket_id The NUMA socket id
982 * @param A new mempool, if NULL query the DPDK library for the error code
983 * see rte_mempool_create() documentation.
984 *
985 * This allocates a new pool or recycles an existing memory pool.
986 * Call dpdk_free_memory() to free the memory.
987 * We cannot delete memory so instead we store the pools, allowing them to be
988 * re-used.
989 */
990static struct rte_mempool *dpdk_alloc_memory(unsigned n,
991                                             unsigned pkt_size,
992                                             int socket_id) {
993        struct rte_mempool *ret;
994        size_t j,k;
995        char name[MEMPOOL_NAME_LEN];
996
997        /* Add on packet size overheads */
998        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
999
1000        pthread_mutex_lock(&dpdk_lock);
1001
1002        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1003                /* Best guess go for zero */
1004                socket_id = 0;
1005        }
1006
1007        /* Find a valid pool */
1008        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1009                if (mem_pools[socket_id][j]->size >= n &&
1010                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1011                        break;
1012                }
1013        }
1014
1015        /* Find the end (+1) of the list */
1016        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1017
1018        if (mem_pools[socket_id][j]) {
1019                ret = mem_pools[socket_id][j];
1020                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1021                mem_pools[socket_id][k-1] = NULL;
1022                mem_pools[socket_id][j] = NULL;
1023        } else {
1024                static uint32_t test = 10;
1025                test++;
1026                snprintf(name, MEMPOOL_NAME_LEN,
1027                         "libtrace_pool_%"PRIu32, test);
1028
1029                ret = rte_mempool_create(name, n, pkt_size,
1030                                         128, sizeof(struct rte_pktmbuf_pool_private),
1031                                         rte_pktmbuf_pool_init, NULL,
1032                                         rte_pktmbuf_init, NULL,
1033                                         socket_id, 0);
1034        }
1035
1036        pthread_mutex_unlock(&dpdk_lock);
1037        return ret;
1038}
1039
1040/** Stores the memory against the DPDK library.
1041 *
1042 * @param mempool The mempool to free
1043 * @param socket_id The NUMA socket this mempool was allocated upon.
1044 *
1045 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1046 * store the memory shared globally against the format.
1047 */
1048static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1049        size_t i;
1050        pthread_mutex_lock(&dpdk_lock);
1051
1052        /* We should have all entries back in the mempool */
1053        rte_mempool_audit(mempool);
1054        if (!rte_mempool_full(mempool)) {
1055                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1056                        "free all packets before finishing a trace\n",
1057                        rte_mempool_avail_count(mempool), mempool->size);
1058        }
1059
1060        /* Find the end (+1) of the list */
1061        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1062
1063        if (i >= RTE_MAX_LCORE) {
1064                fprintf(stderr, "Too many memory pools, dropping this one\n");
1065        } else {
1066                mem_pools[socket_id][i] = mempool;
1067        }
1068
1069        pthread_mutex_unlock(&dpdk_lock);
1070}
1071
1072/* Attach memory to the port and start (or restart) the port/s.
1073 */
1074static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1075                              char *err, int errlen, uint16_t rx_queues) {
1076        int ret, i;
1077        struct rte_eth_link link_info; /* Wait for link */
1078        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1079
1080        /* Already started */
1081        if (format_data->paused == DPDK_RUNNING)
1082                return 0;
1083
1084        /* First time started we need to alloc our memory, doing this here
1085         * rather than in environment setup because we don't have snaplen then */
1086        if (format_data->paused == DPDK_NEVER_STARTED) {
1087                if (format_data->snaplen == 0) {
1088                        format_data->snaplen = RX_MBUF_SIZE;
1089                        port_conf.rxmode.jumbo_frame = 0;
1090                        port_conf.rxmode.max_rx_pkt_len = 0;
1091                } else {
1092                        double expn;
1093
1094                        /* Use jumbo frames */
1095                        port_conf.rxmode.jumbo_frame = 1;
1096                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1097
1098                        /* Use less buffers if we're supporting jumbo frames
1099                         * otherwise we won't be able to allocate memory.
1100                         */
1101                        if (format_data->snaplen > 1500) {
1102                                format_data->nb_rx_buf /= 2;
1103                        }
1104
1105                        /* snaplen should be rounded up to next power of two
1106                         * to ensure enough memory is allocated for each
1107                         * mbuf :(
1108                         */
1109                        expn = ceil(log2((double)(format_data->snaplen)));
1110                        format_data->snaplen = pow(2, (int)expn);
1111                }
1112
1113#if GET_MAC_CRC_CHECKSUM
1114                /* This is additional overhead so make sure we allow space for this */
1115                format_data->snaplen += ETHER_CRC_LEN;
1116#endif
1117#if HAS_HW_TIMESTAMPS_82580
1118                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1119#endif
1120
1121                /* Create the mbuf pool, which is the place packets are allocated
1122                 * from - There is no free function (I cannot see one).
1123                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1124                 * allocate however that extra 1 packet is not used.
1125                 * (I assume <= vs < error some where in DPDK code)
1126                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1127                 * so that will fill the new buffer and wait until slots in the
1128                 * ring become available.
1129                 */
1130#if DEBUG
1131                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1132#endif
1133                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1134                                                              format_data->snaplen,
1135                                                              format_data->nic_numa_node);
1136
1137                if (format_data->pktmbuf_pool == NULL) {
1138                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1139                                 "pool failed: %s", strerror(rte_errno));
1140                        return -1;
1141                }
1142        }
1143
1144        /* Generate the hash key, based on the device */
1145        uint8_t rss_size = 52; // 52 for i40e, 40 for others, use the largest by default
1146        // In new versions DPDK we can query the size
1147#if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
1148        struct rte_eth_dev_info dev_info;
1149        rte_eth_dev_info_get(format_data->port, &dev_info);
1150        rss_size = dev_info.hash_key_size;
1151#endif
1152        if (rss_size != 0) {
1153                format_data->rss_key = malloc(rss_size);
1154                if (format_data->hasher_type == HASHER_BIDIRECTIONAL) {
1155                        toeplitz_ncreate_bikey(format_data->rss_key, rss_size);
1156                } else {
1157                        toeplitz_ncreate_unikey(format_data->rss_key, rss_size);
1158                }
1159                port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1160#if RTE_VERSION >= RTE_VERSION_NUM(1, 7, 0, 1)
1161                port_conf.rx_adv_conf.rss_conf.rss_key_len = rss_size;
1162#endif
1163        } else {
1164                fprintf(stderr, "DPDK couldn't configure RSS hashing!");
1165        }
1166
1167        /* ----------- Now do the setup for the port mapping ------------ */
1168        /* Order of calls must be
1169         * rte_eth_dev_configure()
1170         * rte_eth_tx_queue_setup()
1171         * rte_eth_rx_queue_setup()
1172         * rte_eth_dev_start()
1173         * other rte_eth calls
1174         */
1175
1176        /* This must be called first before another *eth* function
1177         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1178        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1179        if (ret < 0) {
1180                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1181                         " %"PRIu8" : %s", format_data->port,
1182                         strerror(-ret));
1183                return -1;
1184        }
1185#if DEBUG
1186        fprintf(stderr, "Doing dev configure\n");
1187#endif
1188        /* Initialise the TX queue a minimum value if using this port for
1189         * receiving. Otherwise a larger size if writing packets.
1190         */
1191        ret = rte_eth_tx_queue_setup(format_data->port,
1192                                     0 /* queue XXX */,
1193                                     format_data->nb_tx_buf,
1194                                     SOCKET_ID_ANY,
1195                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1196        if (ret < 0) {
1197                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1198                         " on port %d : %s", (int)format_data->port,
1199                         strerror(-ret));
1200                return -1;
1201        }
1202
1203        /* Attach memory to our RX queues */
1204        for (i=0; i < rx_queues; i++) {
1205                dpdk_per_stream_t *stream;
1206#if DEBUG
1207                fprintf(stderr, "Configuring queue %d\n", i);
1208#endif
1209
1210                /* Add storage for the stream */
1211                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1212                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1213                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1214                stream->queue_id = i;
1215
1216                if (stream->lcore == -1)
1217                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1218
1219                if (stream->lcore == -1) {
1220                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1221                                 ". Too many threads?");
1222                        return -1;
1223                }
1224
1225                if (stream->mempool == NULL) {
1226                        stream->mempool = dpdk_alloc_memory(
1227                                                  format_data->nb_rx_buf*2,
1228                                                  format_data->snaplen,
1229                                                  rte_lcore_to_socket_id(stream->lcore));
1230
1231                        if (stream->mempool == NULL) {
1232                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1233                                         "pool failed: %s", strerror(rte_errno));
1234                                return -1;
1235                        }
1236                }
1237
1238                /* Initialise the RX queue with some packets from memory */
1239                ret = rte_eth_rx_queue_setup(format_data->port,
1240                                             stream->queue_id,
1241                                             format_data->nb_rx_buf,
1242                                             format_data->nic_numa_node,
1243                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1244                                             stream->mempool);
1245                if (ret < 0) {
1246                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1247                                 " RX queue on port %d : %s",
1248                                 (int)format_data->port,
1249                                 strerror(-ret));
1250                        return -1;
1251                }
1252        }
1253
1254#if DEBUG
1255        fprintf(stderr, "Doing start device\n");
1256#endif
1257        rte_eth_stats_reset(format_data->port);
1258        /* Start device */
1259        ret = rte_eth_dev_start(format_data->port);
1260        if (ret < 0) {
1261                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1262                         strerror(-ret));
1263                return -1;
1264        }
1265
1266        /* Default promiscuous to on */
1267        if (format_data->promisc == -1)
1268                format_data->promisc = 1;
1269
1270        if (format_data->promisc == 1)
1271                rte_eth_promiscuous_enable(format_data->port);
1272        else
1273                rte_eth_promiscuous_disable(format_data->port);
1274
1275        /* We have now successfully started/unpased */
1276        format_data->paused = DPDK_RUNNING;
1277
1278
1279        /* Register a callback for link state changes */
1280        ret = rte_eth_dev_callback_register(format_data->port,
1281                                            RTE_ETH_EVENT_INTR_LSC,
1282                                            dpdk_lsc_callback,
1283                                            format_data);
1284#if DEBUG
1285        if (ret)
1286                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1287                        ret, strerror(-ret));
1288#endif
1289
1290        /* Get the current link status */
1291        rte_eth_link_get_nowait(format_data->port, &link_info);
1292        format_data->link_speed = link_info.link_speed;
1293#if DEBUG
1294        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1295                (int) link_info.link_duplex, (int) link_info.link_speed);
1296#endif
1297
1298        return 0;
1299}
1300
1301int dpdk_start_input (libtrace_t *libtrace) {
1302        char err[500];
1303        err[0] = 0;
1304
1305        /* Make sure we don't reserve an extra thread for this */
1306        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1307
1308        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1309                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1310                free(libtrace->format_data);
1311                libtrace->format_data = NULL;
1312                return -1;
1313        }
1314        return 0;
1315}
1316
1317static inline size_t dpdk_get_max_rx_queues (portid_t port_id) {
1318        struct rte_eth_dev_info dev_info;
1319        rte_eth_dev_info_get(port_id, &dev_info);
1320        return dev_info.max_rx_queues;
1321}
1322
1323static inline size_t dpdk_processor_count () {
1324        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1325        if (nb_cpu <= 0)
1326                return 1;
1327        else
1328                return (size_t) nb_cpu;
1329}
1330
1331int dpdk_pstart_input (libtrace_t *libtrace) {
1332        char err[500];
1333        int i=0, phys_cores=0;
1334        int tot = libtrace->perpkt_thread_count;
1335        libtrace_list_node_t *n;
1336        err[0] = 0;
1337
1338        if (rte_lcore_id() != rte_get_master_lcore())
1339                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1340                        " from the master DPDK thread!\n");
1341
1342        /* If the master is not on the last thread we move it there */
1343        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1344                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1345                        return -1;
1346        }
1347
1348        /* Don't exceed the number of cores in the system/detected by dpdk
1349         * We don't have to force this but performance wont be good if we don't */
1350        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1351                if (lcore_config[i].detected) {
1352                        if (rte_lcore_is_enabled(i)) {
1353#if DEBUG
1354                                fprintf(stderr, "Found core %d already in use!\n", i);
1355#endif
1356                        } else {
1357                                phys_cores++;
1358                        }
1359                }
1360        }
1361        /* If we are restarting we have already allocated some threads as such
1362         * we add these back to the count for this calculation */
1363        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1364                dpdk_per_stream_t * stream = n->data;
1365                if (stream->lcore != -1)
1366                        phys_cores++;
1367        }
1368
1369        tot = MIN(libtrace->perpkt_thread_count,
1370                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1371        tot = MIN(tot, phys_cores);
1372
1373#if DEBUG
1374        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1375                libtrace->perpkt_thread_count, phys_cores);
1376#endif
1377
1378        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1379                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1380                free(libtrace->format_data);
1381                libtrace->format_data = NULL;
1382                return -1;
1383        }
1384
1385        /* Make sure we only start the number that we should */
1386        libtrace->perpkt_thread_count = tot;
1387        return 0;
1388}
1389
1390/**
1391 * Register a thread with the DPDK system,
1392 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1393 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1394 * gives it.
1395 *
1396 * We then allow a mapper thread to be started on every real core as DPDK would,
1397 * we also bind these to the corresponding CPU cores.
1398 *
1399 * @param libtrace A pointer to the trace
1400 * @param reading True if the thread will be used to read packets, i.e. will
1401 *                call pread_packet(), false if thread used to process packet
1402 *                in any other manner including statistics functions.
1403 */
1404int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1405{
1406#if DEBUG
1407        char name[99];
1408        name[0] = 0;
1409#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1410        pthread_getname_np(pthread_self(),
1411                           name, sizeof(name));
1412#endif
1413#endif
1414        if (reading) {
1415                dpdk_per_stream_t *stream;
1416                /* Attach our thread */
1417                if(t->type == THREAD_PERPKT) {
1418                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1419                        if (t->format_data == NULL) {
1420                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1421                                              "Too many threads registered");
1422                                return -1;
1423                        }
1424                } else {
1425                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1426                }
1427                stream = t->format_data;
1428#if DEBUG
1429                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1430#endif
1431                return dpdk_register_lcore(libtrace, true, stream->lcore);
1432        } else {
1433                int lcore = dpdk_reserve_lcore(reading, 0);
1434                if (lcore == -1) {
1435                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1436                                      " for DPDK");
1437                        return -1;
1438                }
1439#if DEBUG
1440                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1441#endif
1442                return dpdk_register_lcore(libtrace, false, lcore);
1443        }
1444
1445        return 0;
1446}
1447
1448/**
1449 * Unregister a thread with the DPDK system.
1450 *
1451 * Only previously registered threads should be calling this just before
1452 * they are destroyed.
1453 */
1454void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1455{
1456        struct rte_config *cfg = rte_eal_get_configuration();
1457
1458        assert(rte_lcore_id() < RTE_MAX_LCORE);
1459        pthread_mutex_lock(&dpdk_lock);
1460        /* Skip if master */
1461        if (rte_lcore_id() == rte_get_master_lcore()) {
1462                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1463                pthread_mutex_unlock(&dpdk_lock);
1464                return;
1465        }
1466
1467        /* Disable this core in global DPDK structs */
1468        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1469        cfg->lcore_count--;
1470        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1471        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1472        pthread_mutex_unlock(&dpdk_lock);
1473        return;
1474}
1475
1476static int dpdk_start_output(libtrace_out_t *libtrace)
1477{
1478        char err[500];
1479        err[0] = 0;
1480
1481        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1482                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1483                free(libtrace->format_data);
1484                libtrace->format_data = NULL;
1485                return -1;
1486        }
1487        return 0;
1488}
1489
1490int dpdk_pause_input(libtrace_t * libtrace) {
1491        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1492        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1493        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1494#if DEBUG
1495                fprintf(stderr, "Pausing DPDK port\n");
1496#endif
1497                rte_eth_dev_stop(FORMAT(libtrace)->port);
1498                FORMAT(libtrace)->paused = DPDK_PAUSED;
1499                /* Empty the queue of packets */
1500                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1501                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1502                }
1503                FORMAT(libtrace)->burst_offset = 0;
1504                FORMAT(libtrace)->burst_size = 0;
1505
1506                for (; tmp != NULL; tmp = tmp->next) {
1507                        dpdk_per_stream_t *stream = tmp->data;
1508                        stream->ts_last_sys = 0;
1509#if HAS_HW_TIMESTAMPS_82580
1510                        stream->ts_first_sys = 0;
1511#endif
1512                }
1513
1514        }
1515        return 0;
1516}
1517
1518static int dpdk_write_packet(libtrace_out_t *trace,
1519                             libtrace_packet_t *packet){
1520        struct rte_mbuf* m_buff[1];
1521
1522        int wirelen = trace_get_wire_length(packet);
1523        int caplen = trace_get_capture_length(packet);
1524
1525        /* Check for a checksum and remove it */
1526        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1527            wirelen == caplen)
1528                caplen -= ETHER_CRC_LEN;
1529
1530        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1531        if (m_buff[0] == NULL) {
1532                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1533                return -1;
1534        } else {
1535                int ret;
1536                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1537                do {
1538                        ret = rte_eth_tx_burst(FORMAT(trace)->port, 0 /*queue TODO*/, m_buff, 1);
1539                } while (ret != 1);
1540        }
1541
1542        return 0;
1543}
1544
1545int dpdk_fin_input(libtrace_t * libtrace) {
1546        libtrace_list_node_t * n;
1547        /* Free our memory structures */
1548        if (libtrace->format_data != NULL) {
1549
1550                if (FORMAT(libtrace)->port != 0xFF)
1551                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1552                                                        RTE_ETH_EVENT_INTR_LSC,
1553                                                        dpdk_lsc_callback,
1554                                                        FORMAT(libtrace));
1555                /* Close the device completely, device cannot be restarted */
1556                rte_eth_dev_close(FORMAT(libtrace)->port);
1557
1558                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1559                                 FORMAT(libtrace)->nic_numa_node);
1560
1561                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1562                        dpdk_per_stream_t * stream = n->data;
1563                        if (stream->mempool)
1564                                dpdk_free_memory(stream->mempool,
1565                                                 rte_lcore_to_socket_id(stream->lcore));
1566                }
1567
1568                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1569                /* filter here if we used it */
1570                if (FORMAT(libtrace)->rss_key)
1571                        free(FORMAT(libtrace)->rss_key);
1572                free(libtrace->format_data);
1573        }
1574
1575        return 0;
1576}
1577
1578
1579static int dpdk_fin_output(libtrace_out_t * libtrace) {
1580        /* Free our memory structures */
1581        if (libtrace->format_data != NULL) {
1582                /* Close the device completely, device cannot be restarted */
1583                if (FORMAT(libtrace)->port != 0xFF)
1584                        rte_eth_dev_close(FORMAT(libtrace)->port);
1585                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1586                /* filter here if we used it */
1587                free(libtrace->format_data);
1588        }
1589
1590        return 0;
1591}
1592
1593/**
1594 * Get the start of the additional header that we added to a packet.
1595 */
1596static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1597        assert(packet);
1598        assert(packet->buffer);
1599        /* Our header sits straight after the mbuf header */
1600        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1601}
1602
1603static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1604        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1605        return hdr->cap_len;
1606}
1607
1608static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1609        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1610        if (size > hdr->cap_len) {
1611                /* Cannot make a packet bigger */
1612                return trace_get_capture_length(packet);
1613        }
1614
1615        /* Reset the cached capture length first*/
1616        packet->capture_length = -1;
1617        hdr->cap_len = (uint32_t) size;
1618        return trace_get_capture_length(packet);
1619}
1620
1621static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1622        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1623        int org_cap_size; /* The original capture size */
1624        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1625                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1626                               sizeof(struct hw_timestamp_82580);
1627        } else {
1628                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1629        }
1630        if (hdr->flags & INCLUDES_CHECKSUM) {
1631                return org_cap_size;
1632        } else {
1633                /* DPDK packets are always TRACE_TYPE_ETH packets */
1634                return org_cap_size + ETHER_CRC_LEN;
1635        }
1636}
1637
1638int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1639        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1640        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1641                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1642                                sizeof(struct hw_timestamp_82580);
1643        else
1644                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1645}
1646
1647int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1648                               libtrace_packet_t *packet, void *buffer,
1649                               libtrace_rt_types_t rt_type, uint32_t flags) {
1650        assert(packet);
1651        if (packet->buffer != buffer &&
1652            packet->buf_control == TRACE_CTRL_PACKET) {
1653                free(packet->buffer);
1654        }
1655
1656        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1657                packet->buf_control = TRACE_CTRL_PACKET;
1658        else
1659                packet->buf_control = TRACE_CTRL_EXTERNAL;
1660
1661        packet->buffer = buffer;
1662        packet->header = buffer;
1663
1664        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1665        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1666        packet->type = rt_type;
1667        return 0;
1668}
1669
1670/**
1671 * Given a packet size and a link speed, computes the
1672 * time to transmit in nanoseconds.
1673 *
1674 * @param format_data The dpdk format data from which we get the link speed
1675 *        and if unset updates it in a thread safe manner
1676 * @param pkt_size The size of the packet in bytes
1677 * @return The wire time in nanoseconds
1678 */
1679static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1680        uint32_t wire_time;
1681        /* 20 extra bytes of interframe gap and preamble */
1682# if GET_MAC_CRC_CHECKSUM
1683        wire_time = ((pkt_size + 20) * 8000);
1684# else
1685        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1686# endif
1687
1688        /* Division is really slow and introduces a pipeline stall
1689         * The compiler will optimise this into magical multiplication and shifting
1690         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1691         */
1692retry_calc_wiretime:
1693        switch (format_data->link_speed) {
1694        case ETH_SPEED_NUM_40G:
1695                wire_time /=  ETH_SPEED_NUM_40G;
1696                break;
1697        case ETH_SPEED_NUM_20G:
1698                wire_time /= ETH_SPEED_NUM_20G;
1699                break;
1700        case ETH_SPEED_NUM_10G:
1701                wire_time /= ETH_SPEED_NUM_10G;
1702                break;
1703        case ETH_SPEED_NUM_1G:
1704                wire_time /= ETH_SPEED_NUM_1G;
1705                break;
1706        case 0:
1707                {
1708                /* Maybe the link was down originally, but now it should be up */
1709                struct rte_eth_link link = {0};
1710                rte_eth_link_get_nowait(format_data->port, &link);
1711                if (link.link_status && link.link_speed) {
1712                        format_data->link_speed = link.link_speed;
1713#ifdef DEBUG
1714                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1715#endif
1716                        goto retry_calc_wiretime;
1717                }
1718                /* We don't know the link speed, make sure numbers are counting up */
1719                wire_time = 1;
1720                break;
1721                }
1722        default:
1723                wire_time /= format_data->link_speed;
1724        }
1725        return wire_time;
1726}
1727
1728/**
1729 * Does any extra preperation to all captured packets
1730 * This includes adding our extra header to it with the timestamp,
1731 * and any snapping
1732 *
1733 * @param format_data The DPDK format data
1734 * @param plc The DPDK per lcore format data
1735 * @param pkts An array of size nb_pkts of DPDK packets
1736 */
1737static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1738                                   struct dpdk_per_stream_t *plc,
1739                                   struct rte_mbuf **pkts,
1740                                   size_t nb_pkts) {
1741        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1742        struct dpdk_addt_hdr *hdr;
1743        size_t i;
1744        uint64_t cur_sys_time_ns;
1745#if HAS_HW_TIMESTAMPS_82580
1746        struct hw_timestamp_82580 *hw_ts;
1747        uint64_t estimated_wraps;
1748#else
1749
1750#endif
1751
1752#if USE_CLOCK_GETTIME
1753        struct timespec cur_sys_time = {0};
1754        /* This looks terrible and I feel bad doing it. But it's OK
1755         * on new kernels, because this is a fast vsyscall */
1756        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1757        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1758#else
1759        struct timeval cur_sys_time = {0};
1760        /* Also a fast vsyscall */
1761        gettimeofday(&cur_sys_time, NULL);
1762        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1763#endif
1764
1765        /* The system clock is not perfect so when running
1766         * at linerate we could timestamp a packet in the past.
1767         * To avoid this we munge the timestamp to appear 1ns
1768         * after the previous packet. We should eventually catch up
1769         * to system time since a 64byte packet on a 10G link takes 67ns.
1770         *
1771         * Note with parallel readers timestamping packets
1772         * with duplicate stamps or out of order is unavoidable without
1773         * hardware timestamping from the NIC.
1774         */
1775#if !HAS_HW_TIMESTAMPS_82580
1776        if (plc->ts_last_sys >= cur_sys_time_ns) {
1777                cur_sys_time_ns = plc->ts_last_sys + 1;
1778        }
1779#endif
1780
1781        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1782        for (i = 0 ; i < nb_pkts ; ++i) {
1783
1784                /* We put our header straight after the dpdk header */
1785                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1786                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1787
1788#if GET_MAC_CRC_CHECKSUM
1789                /* Add back in the CRC sum */
1790                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1791                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1792                hdr->flags |= INCLUDES_CHECKSUM;
1793#endif
1794
1795                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1796
1797#if HAS_HW_TIMESTAMPS_82580
1798                /* The timestamp is sitting before our packet and is included in pkt_len */
1799                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1800                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1801                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1802
1803                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1804                 *
1805                 *        +----------+---+   +--------------+
1806                 *  82580 |    24    | 8 |   |      32      |
1807                 *        +----------+---+   +--------------+
1808                 *          reserved  \______ 40 bits _____/
1809                 *
1810                 * The 40 bit 82580 SYSTIM overflows every
1811                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1812                 *
1813                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1814                 * Endian (for the full 64 bits) i.e. picture is mirrored
1815                 */
1816
1817                /* Despite what the documentation says this is in Little
1818                 * Endian byteorder. Mask the reserved section out.
1819                 */
1820                hdr->timestamp = le64toh(hw_ts->timestamp) &
1821                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1822
1823                if (unlikely(plc->ts_first_sys == 0)) {
1824                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1825                        plc->ts_last_sys = plc->ts_first_sys;
1826                }
1827
1828                /* This will have serious problems if packets aren't read quickly
1829                 * that is within a couple of seconds because our clock cycles every
1830                 * 18 seconds */
1831                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1832                                  / (1ull<<TS_NBITS_82580);
1833
1834                /* Estimated_wraps gives the number of times the counter should have
1835                 * wrapped (however depending on value last time it could have wrapped
1836                 * twice more (if hw clock is close to its max value) or once less (allowing
1837                 * for a bit of variance between hw and sys clock). But if the clock
1838                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1839                if (unlikely(estimated_wraps >= 2)) {
1840                        /* 2 or more wrap arounds add all but the very last wrap */
1841                        plc->wrap_count += estimated_wraps - 1;
1842                }
1843
1844                /* Set the timestamp to the lowest possible value we're considering */
1845                hdr->timestamp += plc->ts_first_sys +
1846                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1847
1848                /* In most runs only the first if() will need evaluating - i.e our
1849                 * estimate is correct. */
1850                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1851                                              hdr->timestamp, MAXSKEW_82580))) {
1852                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1853                        plc->wrap_count++;
1854                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1855                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1856                                             hdr->timestamp, MAXSKEW_82580)) {
1857                                /* Failed to match estimated_wraps */
1858                                plc->wrap_count++;
1859                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1860                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1861                                                     hdr->timestamp, MAXSKEW_82580)) {
1862                                        if (estimated_wraps == 0) {
1863                                                /* 0 case Failed to match estimated_wraps+2 */
1864                                                printf("WARNING - Hardware Timestamp failed to"
1865                                                       " match using systemtime!\n");
1866                                                hdr->timestamp = cur_sys_time_ns;
1867                                        } else {
1868                                                /* Failed to match estimated_wraps+1 */
1869                                                plc->wrap_count++;
1870                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1871                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1872                                                                     hdr->timestamp, MAXSKEW_82580)) {
1873                                                        /* Failed to match estimated_wraps+2 */
1874                                                        printf("WARNING - Hardware Timestamp failed to"
1875                                                               " match using systemtime!!\n");
1876                                                }
1877                                        }
1878                                }
1879                        }
1880                }
1881#else
1882
1883                hdr->timestamp = cur_sys_time_ns;
1884                /* Offset the next packet by the wire time of previous */
1885                calculate_wire_time(format_data, hdr->cap_len);
1886
1887#endif
1888        }
1889
1890        plc->ts_last_sys = cur_sys_time_ns;
1891        return;
1892}
1893
1894
1895static void dpdk_fin_packet(libtrace_packet_t *packet)
1896{
1897        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1898                rte_pktmbuf_free(packet->buffer);
1899                packet->buffer = NULL;
1900        }
1901}
1902
1903/** Reads at least one packet or returns an error
1904 */
1905int dpdk_read_packet_stream (libtrace_t *libtrace,
1906                                           dpdk_per_stream_t *stream,
1907                                           libtrace_message_queue_t *mesg,
1908                                           struct rte_mbuf* pkts_burst[],
1909                                           size_t nb_packets) {
1910        size_t nb_rx; /* Number of rx packets we've recevied */
1911        while (1) {
1912                /* Poll for a batch of packets */
1913                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1914                                         stream->queue_id, pkts_burst, nb_packets);
1915                if (nb_rx > 0) {
1916                        /* Got some packets - otherwise we keep spining */
1917                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
1918                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1919                        return nb_rx;
1920                }
1921                /* Check the message queue this could be less than 0 */
1922                if (mesg && libtrace_message_queue_count(mesg) > 0)
1923                        return READ_MESSAGE;
1924                if ((nb_rx=is_halted(libtrace)) != (size_t) -1)
1925                        return nb_rx;
1926                /* Wait a while, polling on memory degrades performance
1927                 * This relieves the pressure on memory allowing the NIC to DMA */
1928                rte_delay_us(10);
1929        }
1930
1931        /* We'll never get here - but if we did it would be bad */
1932        return READ_ERROR;
1933}
1934
1935static int dpdk_pread_packets (libtrace_t *libtrace,
1936                                    libtrace_thread_t *t,
1937                                    libtrace_packet_t **packets,
1938                                    size_t nb_packets) {
1939        int nb_rx; /* Number of rx packets we've recevied */
1940        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
1941        int i;
1942        dpdk_per_stream_t *stream = t->format_data;
1943        struct dpdk_addt_hdr * hdr;
1944
1945        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
1946                                         pkts_burst, nb_packets);
1947
1948        if (nb_rx > 0) {
1949                for (i = 0; i < nb_rx; ++i) {
1950                        if (packets[i]->buffer != NULL) {
1951                                /* The packet should always be finished */
1952                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
1953                                free(packets[i]->buffer);
1954                        }
1955                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
1956                        packets[i]->type = TRACE_RT_DATA_DPDK;
1957                        packets[i]->buffer = pkts_burst[i];
1958                        packets[i]->trace = libtrace;
1959                        packets[i]->error = 1;
1960                        hdr = (struct dpdk_addt_hdr *) 
1961                                        ((struct rte_mbuf*) pkts_burst[i] + 1);
1962                        packets[i]->order = hdr->timestamp;
1963                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
1964                }
1965        }
1966
1967        return nb_rx;
1968}
1969
1970int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1971        int nb_rx; /* Number of rx packets we've received */
1972        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
1973
1974        /* Free the last packet buffer */
1975        if (packet->buffer != NULL) {
1976                /* The packet should always be finished */
1977                assert(packet->buf_control == TRACE_CTRL_PACKET);
1978                free(packet->buffer);
1979                packet->buffer = NULL;
1980        }
1981
1982        packet->buf_control = TRACE_CTRL_EXTERNAL;
1983        packet->type = TRACE_RT_DATA_DPDK;
1984
1985        /* Check if we already have some packets buffered */
1986        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
1987                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
1988                packet->trace = libtrace;
1989                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1990                return 1; // TODO should be bytes read, which essentially useless anyway
1991        }
1992
1993        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
1994                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
1995
1996        if (nb_rx > 0) {
1997                FORMAT(libtrace)->burst_size = nb_rx;
1998                FORMAT(libtrace)->burst_offset = 1;
1999                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2000                packet->trace = libtrace;
2001                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2002                return 1;
2003        }
2004        return nb_rx;
2005}
2006
2007static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2008        struct timeval tv;
2009        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2010
2011        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2012        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2013        return tv;
2014}
2015
2016static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2017        struct timespec ts;
2018        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2019
2020        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2021        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2022        return ts;
2023}
2024
2025static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2026        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2027}
2028
2029static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2030        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2031        return (libtrace_direction_t) hdr->direction;
2032}
2033
2034static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2035        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2036        hdr->direction = (uint8_t) direction;
2037        return (libtrace_direction_t) hdr->direction;
2038}
2039
2040void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2041        struct rte_eth_stats dev_stats = {0};
2042
2043        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2044                return;
2045
2046        /* Grab the current stats */
2047        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2048
2049        stats->captured_valid = true;
2050        stats->captured = dev_stats.ipackets;
2051
2052        stats->dropped_valid = true;
2053        stats->dropped = dev_stats.imissed;
2054
2055#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2056        /* DPDK commit 86057c fixes ensures missed does not get counted as
2057         * errors */
2058        stats->errors_valid = true;
2059        stats->errors = dev_stats.ierrors;
2060#else
2061        /* DPDK errors includes drops */
2062        stats->errors_valid = true;
2063        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2064#endif
2065        stats->received_valid = true;
2066        stats->received = dev_stats.ipackets + dev_stats.imissed;
2067
2068}
2069
2070/* Attempts to read a packet in a non-blocking fashion. If one is not
2071 * available a SLEEP event is returned. We do not have the ability to
2072 * create a select()able file descriptor in DPDK.
2073 */
2074libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2075                                            libtrace_packet_t *packet) {
2076        libtrace_eventobj_t event = {0,0,0.0,0};
2077        size_t nb_rx; /* Number of received packets we've read */
2078
2079        do {
2080
2081                /* No packets waiting in our buffer? Try and read some more */
2082                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
2083                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2084                                                 FORMAT_DATA_FIRST(trace)->queue_id,
2085                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
2086                        if (nb_rx > 0) {
2087                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
2088                                                FORMAT(trace)->burst_pkts, nb_rx);
2089                                FORMAT(trace)->burst_size = nb_rx;
2090                                FORMAT(trace)->burst_offset = 0;
2091                        }
2092                }
2093
2094                /* Now do we have packets waiting? */
2095                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
2096                        /* Free the last packet buffer */
2097                        if (packet->buffer != NULL) {
2098                                /* The packet should always be finished */
2099                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2100                                free(packet->buffer);
2101                                packet->buffer = NULL;
2102                        }
2103
2104                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2105                        packet->type = TRACE_RT_DATA_DPDK;
2106                        event.type = TRACE_EVENT_PACKET;
2107                        packet->buffer = FORMAT(trace)->burst_pkts[
2108                                             FORMAT(trace)->burst_offset++];
2109                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2110                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2111
2112                        /* XXX - Check this passes the filter trace_read_packet normally
2113                         * does this for us but this wont */
2114                        if (trace->filter) {
2115                                if (!trace_apply_filter(trace->filter, packet)) {
2116                                        /* Failed the filter so we loop for another packet */
2117                                        trace->filtered_packets ++;
2118                                        continue;
2119                                }
2120                        }
2121                        trace->accepted_packets ++;
2122                } else {
2123                        /* We only want to sleep for a very short time - we are non-blocking */
2124                        event.type = TRACE_EVENT_SLEEP;
2125                        event.seconds = 0.0001;
2126                        event.size = 0;
2127                }
2128
2129                /* If we get here we have our event */
2130                break;
2131        } while (1);
2132
2133        return event;
2134}
2135
2136static void dpdk_help(void) {
2137        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
2138        printf("Supported input URIs:\n");
2139        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2140        printf("\tThe -<coreid> is optional \n");
2141        printf("\t e.g. dpdk:0000:01:00.1\n");
2142        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2143        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2144        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2145        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2146        printf("\n");
2147        printf("Supported output URIs:\n");
2148        printf("\tSame format as the input URI.\n");
2149        printf("\t e.g. dpdk:0000:01:00.1\n");
2150        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2151        printf("\n");
2152}
2153
2154static struct libtrace_format_t dpdk = {
2155        "dpdk",
2156        "$Id$",
2157        TRACE_FORMAT_DPDK,
2158        NULL,                               /* probe filename */
2159        NULL,                               /* probe magic */
2160        dpdk_init_input,                    /* init_input */
2161        dpdk_config_input,                  /* config_input */
2162        dpdk_start_input,                   /* start_input */
2163        dpdk_pause_input,                   /* pause_input */
2164        dpdk_init_output,                   /* init_output */
2165        NULL,                               /* config_output */
2166        dpdk_start_output,                  /* start_ouput */
2167        dpdk_fin_input,                     /* fin_input */
2168        dpdk_fin_output,                    /* fin_output */
2169        dpdk_read_packet,                   /* read_packet */
2170        dpdk_prepare_packet,                /* prepare_packet */
2171        dpdk_fin_packet,                    /* fin_packet */
2172        dpdk_write_packet,                  /* write_packet */
2173        NULL,                               /* flush_output */
2174        dpdk_get_link_type,                 /* get_link_type */
2175        dpdk_get_direction,                 /* get_direction */
2176        dpdk_set_direction,                 /* set_direction */
2177        NULL,                               /* get_erf_timestamp */
2178        dpdk_get_timeval,                   /* get_timeval */
2179        dpdk_get_timespec,                  /* get_timespec */
2180        NULL,                               /* get_seconds */
2181        NULL,                               /* seek_erf */
2182        NULL,                               /* seek_timeval */
2183        NULL,                               /* seek_seconds */
2184        dpdk_get_capture_length,            /* get_capture_length */
2185        dpdk_get_wire_length,               /* get_wire_length */
2186        dpdk_get_framing_length,            /* get_framing_length */
2187        dpdk_set_capture_length,            /* set_capture_length */
2188        NULL,                               /* get_received_packets */
2189        NULL,                               /* get_filtered_packets */
2190        NULL,                               /* get_dropped_packets */
2191        dpdk_get_stats,                     /* get_statistics */
2192        NULL,                               /* get_fd */
2193        dpdk_trace_event,                   /* trace_event */
2194        dpdk_help,                          /* help */
2195        NULL,                               /* next pointer */
2196        {true, 8},                          /* Live, NICs typically have 8 threads */
2197        dpdk_pstart_input,                  /* pstart_input */
2198        dpdk_pread_packets,                 /* pread_packets */
2199        dpdk_pause_input,                   /* ppause */
2200        dpdk_fin_input,                     /* p_fin */
2201        dpdk_pregister_thread,              /* pregister_thread */
2202        dpdk_punregister_thread,            /* punregister_thread */
2203        NULL                                /* get thread stats */
2204};
2205
2206void dpdk_constructor(void) {
2207        register_format(&dpdk);
2208}
Note: See TracBrowser for help on using the repository browser.