source: lib/format_dpdk.c @ 3621e1c

develop
Last change on this file since 3621e1c was 3621e1c, checked in by Shane Alcock <salcock@…>, 20 months ago

Fix unhandled case in dpdk_config_input()

  • Property mode set to 100644
File size: 73.7 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 *
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44#include "format_dpdk.h"
45
46#ifdef HAVE_INTTYPES_H
47#  include <inttypes.h>
48#else
49# error "Can't find inttypes.h"
50#endif
51
52#include <stdlib.h>
53#include <unistd.h>
54#include <endian.h>
55#include <string.h>
56#include <math.h>
57
58#ifdef HAVE_LIBNUMA
59#include <numa.h>
60#endif
61
62#define MBUF(x) ((struct rte_mbuf *) x)
63/* Get the original placement of the packet data */
64#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
65#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
66#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
67
68#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
69#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
70
71#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
72                        (uint64_t) tv.tv_usec*1000ull)
73#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
74                        (uint64_t) ts.tv_nsec)
75
76#if RTE_PKTMBUF_HEADROOM != 128
77#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
78         "any libtrace instance processing these packet must be have the" \
79         "same RTE_PKTMBUF_HEADROOM set"
80#endif
81
82
83static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
84/* Memory pools Per NUMA node */
85static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
86
87/* Used by both input and output however some fields are not used
88 * for output */
89struct dpdk_format_data_t {
90        int8_t promisc; /* promiscuous mode - RX only */
91        uint8_t paused; /* See paused_state */
92        portid_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
93        portid_t nb_ports; /* Total number of usable ports on system should be 1 */
94        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
95        int snaplen; /* The snap length for the capture - RX only */
96        /* We always have to setup both rx and tx queues even if we don't want them */
97        int nb_rx_buf; /* The number of packet buffers in the rx ring */
98        int nb_tx_buf; /* The number of packet buffers in the tx ring */
99        int nic_numa_node; /* The NUMA node that the NIC is attached to */
100        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
101#if DPDK_USE_BLACKLIST
102        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
103        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
104#endif
105        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
106        enum hasher_types hasher_type;
107        uint8_t *rss_key;
108        /* To improve single-threaded performance we always batch reading
109         * packets, in a burst, otherwise the parallel library does this for us */
110        struct rte_mbuf* burst_pkts[BURST_SIZE];
111        int burst_size; /* The total number read in the burst */
112        int burst_offset; /* The offset we are into the burst */
113
114        /* Our parallel streams */
115        libtrace_list_t *per_stream;
116};
117
118enum dpdk_addt_hdr_flags {
119        INCLUDES_CHECKSUM = 0x1,
120        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
121};
122
123/**
124 * A structure placed in front of the packet where we can store
125 * additional information about the given packet.
126 * +--------------------------+
127 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
128 * +--------------------------+
129 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
130 * +--------------------------+
131 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
132 * +--------------------------+
133 * *   hw_timestamp_82580     * 16 bytes Optional
134 * +--------------------------+
135 * |       Packet data        | Variable Size
136 * |                          |
137 */
138struct dpdk_addt_hdr {
139        uint64_t timestamp;
140        uint8_t flags;
141        uint8_t direction;
142        uint8_t reserved1;
143        uint8_t reserved2;
144        uint32_t cap_len; /* The size to say the capture is */
145};
146
147static bool dpdk_can_write(libtrace_packet_t *packet) {
148        libtrace_linktype_t ltype = trace_get_link_type(packet);
149
150        if (ltype == TRACE_TYPE_CONTENT_INVALID) {
151                return false;
152        }
153        if (ltype == TRACE_TYPE_NONDATA || ltype == TRACE_TYPE_ERF_META ||
154                        ltype == TRACE_TYPE_PCAPNG_META) {
155                return false;
156        }
157        return true;
158}
159
160/**
161 * We want to blacklist all devices except those on the whitelist
162 * (I say list, but yes it is only the one).
163 *
164 * The default behaviour of rte_pci_probe() will map every possible device
165 * to its DPDK driver. The DPDK driver will take the ethernet device
166 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
167 *
168 * So blacklist all devices except the one that we wish to use so that
169 * the others can still be used as standard ethernet ports.
170 *
171 * @return 0 if successful, otherwise -1 on error.
172 */
173#if DPDK_USE_BLACKLIST
174static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
175{
176        struct rte_pci_device *dev = NULL;
177        format_data->nb_blacklist = 0;
178
179        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
180
181        TAILQ_FOREACH(dev, &device_list, next) {
182        if (whitelist != NULL && whitelist->domain == dev->addr.domain
183            && whitelist->bus == dev->addr.bus
184            && whitelist->devid == dev->addr.devid
185            && whitelist->function == dev->addr.function)
186            continue;
187                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
188                                / sizeof (format_data->blacklist[0])) {
189                        fprintf(stderr, "Warning: too many devices to blacklist consider"
190                                        " increasing BLACK_LIST_SIZE");
191                        break;
192                }
193                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
194                ++format_data->nb_blacklist;
195        }
196
197        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
198        return 0;
199}
200#else /* DPDK_USE_BLACKLIST */
201#include <rte_devargs.h>
202static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
203{
204        char pci_str[20] = {0};
205        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
206                 whitelist->domain,
207                 whitelist->bus,
208                 whitelist->devid,
209                 whitelist->function);
210        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
211                return -1;
212        }
213        return 0;
214}
215#endif
216
217/**
218 * Parse the URI format as a pci address
219 * Fills in addr, note core is optional and is unchanged if
220 * a value for it is not provided.
221 *
222 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
223 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
224 */
225static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
226        int matches;
227
228        if (!str) {
229                fprintf(stderr, "NULL str passed into parse_pciaddr()\n");
230                return -1;
231        }
232#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
233        matches = sscanf(str, "%8"SCNx32":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
234                         &addr->domain, &addr->bus, &addr->devid,
235                         &addr->function, core);
236#else
237        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
238                         &addr->domain, &addr->bus, &addr->devid,
239                         &addr->function, core);
240#endif
241        if (matches >= 4) {
242                return 0;
243        } else {
244                return -1;
245        }
246}
247
248/**
249 * Convert a pci address to the numa node it is
250 * connected to.
251 *
252 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
253 * so we can call it before DPDK
254 *
255 * @return -1 if unknown otherwise a number 0 or higher of the numa node
256 */
257static int pci_to_numa(struct rte_pci_addr * dev_addr) {
258        char path[50] = {0};
259        FILE *file;
260
261        /* Read from the system */
262        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
263                 dev_addr->domain,
264                 dev_addr->bus,
265                 dev_addr->devid,
266                 dev_addr->function);
267
268        if((file = fopen(path, "r")) != NULL) {
269                int numa_node = -1;
270                if (fscanf(file, "%d", &numa_node) != 1) {
271                        numa_node = -1;
272                }
273                fclose(file);
274                return numa_node;
275        }
276        return -1;
277}
278
279#if DEBUG
280/* For debugging */
281static inline void dump_configuration()
282{
283        struct rte_config * global_config;
284        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
285
286        if (nb_cpu <= 0) {
287                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
288                       " Falling back to the first core.");
289                nb_cpu = 1; /* fallback to just 1 core */
290        }
291        if (nb_cpu > RTE_MAX_LCORE)
292                nb_cpu = RTE_MAX_LCORE;
293
294        global_config = rte_eal_get_configuration();
295
296        if (global_config != NULL) {
297                int i;
298                fprintf(stderr, "Intel DPDK setup\n"
299                        "---Version      : %s\n"
300                        "---Master LCore : %"PRIu32"\n"
301                        "---LCore Count  : %"PRIu32"\n",
302                        rte_version(),
303                        global_config->master_lcore, global_config->lcore_count);
304
305                for (i = 0 ; i < nb_cpu; i++) {
306                        fprintf(stderr, "   ---Core %d : %s\n", i,
307                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
308                }
309
310                const char * proc_type;
311                switch (global_config->process_type) {
312                case RTE_PROC_AUTO:
313                        proc_type = "auto";
314                        break;
315                case RTE_PROC_PRIMARY:
316                        proc_type = "primary";
317                        break;
318                case RTE_PROC_SECONDARY:
319                        proc_type = "secondary";
320                        break;
321                case RTE_PROC_INVALID:
322                        proc_type = "invalid";
323                        break;
324                default:
325                        proc_type = "something worse than invalid!!";
326                }
327                fprintf(stderr, "---Process Type : %s\n", proc_type);
328        }
329
330}
331#endif
332
333/**
334 * Expects to be called from the master lcore and moves it to the given dpdk id
335 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
336 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
337 *               and not already in use.
338 * @return 0 is successful otherwise -1 on error.
339 */
340static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
341        struct rte_config *cfg = rte_eal_get_configuration();
342        cpu_set_t cpuset;
343        int i;
344
345        if (core >= RTE_MAX_LCORE) {
346                fprintf(stderr, "Core must be a value less than the number of cores "
347                        "in dpdk_move_master_lcore()\n");
348                return -1;
349        }
350        if (rte_get_master_lcore() != rte_lcore_id()) {
351                fprintf(stderr, "Master core not equal to core id in dpdk_move_master_lcore()\n");
352                return -1;
353        }
354
355        if (core == rte_lcore_id())
356                return 0;
357
358        /* Make sure we are not overwriting someone else */
359        if (rte_lcore_is_enabled(core)) {
360                fprintf(stderr, "Cannot override another core in dpdk_move_master_lcore()\n");
361                return -1;
362        }
363
364        /* Move the core */
365        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
366        cfg->lcore_role[core] = ROLE_RTE;
367        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
368        rte_eal_get_configuration()->master_lcore = core;
369        RTE_PER_LCORE(_lcore_id) = core;
370
371        /* Now change the affinity, either mapped to a single core or all accepted */
372        CPU_ZERO(&cpuset);
373
374        if (lcore_config[core].detected) {
375                CPU_SET(core, &cpuset);
376        } else {
377                for (i = 0; i < RTE_MAX_LCORE; ++i) {
378                        if (lcore_config[i].detected)
379                                CPU_SET(i, &cpuset);
380                }
381        }
382
383        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
384        if (i != 0) {
385                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
386                return -1;
387        }
388        return 0;
389}
390
391/**
392 * XXX This is very bad XXX
393 * But we have to do something to allow getopts nesting
394 * Luckly normally the format is last so it doesn't matter
395 * DPDK only supports modern systems so hopefully this
396 * will continue to work
397 */
398struct saved_getopts {
399        char *optarg;
400        int optind;
401        int opterr;
402        int optopt;
403};
404
405static void save_getopts(struct saved_getopts *opts) {
406        opts->optarg = optarg;
407        opts->optind = optind;
408        opts->opterr = opterr;
409        opts->optopt = optopt;
410}
411
412static void restore_getopts(struct saved_getopts *opts) {
413        optarg = opts->optarg;
414        optind = opts->optind;
415        opterr = opts->opterr;
416        optopt = opts->optopt;
417}
418
419static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
420                                        char * err, int errlen) {
421        int ret; /* Returned error codes */
422        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
423        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
424        char mem_map[20] = {0}; /* The memory name */
425        long nb_cpu; /* The number of CPUs in the system */
426        long my_cpu; /* The CPU number we want to bind to */
427        int i;
428        struct rte_config *cfg = rte_eal_get_configuration();
429        struct saved_getopts save_opts;
430
431        /* This initialises the Environment Abstraction Layer (EAL)
432         * If we had slave workers these are put into WAITING state
433         *
434         * Basically binds this thread to a fixed core, which we choose as
435         * the last core on the machine (assuming fewer interrupts mapped here).
436         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
437         * "-n" the number of memory channels into the CPU (hardware specific)
438         *      - Most likely to be half the number of ram slots in your machine.
439         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
440         * Controls where in memory packets are stored such that they are spread
441         * across the channels. We just use 1 to be safe.
442         *
443         * Using unique file prefixes mean separate memory is used, unlinking
444         * the two processes. However be careful we still cannot access a
445         * port that already in use.
446         */
447        char* argv[] = {"libtrace",
448                        "-c", cpu_number,
449                        "-n", "1",
450                        "--proc-type", "auto",
451                        "--file-prefix", mem_map,
452                        "-m", "512",
453#if DPDK_USE_LOG_LEVEL
454#       if DEBUG
455                        "--log-level", "8", /* RTE_LOG_DEBUG */
456#       else
457                        "--log-level", "5", /* RTE_LOG_WARNING */
458#       endif
459#endif
460                        NULL};
461        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
462
463#if DEBUG
464        rte_log_set_global_level(RTE_LOG_DEBUG);
465#else
466        rte_log_set_global_level(RTE_LOG_WARNING);
467#endif
468
469        /* Get the number of cpu cores in the system and use the last core
470         * on the correct numa node */
471        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
472        if (nb_cpu <= 0) {
473                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
474                       " Falling back to the first core.");
475                nb_cpu = 1; /* fallback to the first core */
476        }
477        if (nb_cpu > RTE_MAX_LCORE)
478                nb_cpu = RTE_MAX_LCORE;
479
480        my_cpu = -1;
481        /* This allows the user to specify the core - we would try to do this
482         * automatically but it's hard to tell that this is secondary
483         * before running rte_eal_init(...). Currently we are limited to 1
484         * instance per core due to the way memory is allocated. */
485        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
486                snprintf(err, errlen, "Failed to parse URI");
487                return -1;
488        }
489
490#ifdef HAVE_LIBNUMA
491        format_data->nic_numa_node = pci_to_numa(&use_addr);
492        if (my_cpu < 0) {
493#if DEBUG
494                /* If we can assign to a core on the same numa node */
495                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
496#endif
497                if(format_data->nic_numa_node >= 0) {
498                        int max_node_cpu = -1;
499                        struct bitmask *mask = numa_allocate_cpumask();
500                        if (!mask) {
501                                fprintf(stderr, "Unable to allocate cpu mask in dpdk_init_environment()\n");
502                                return -1;
503                        }
504                        numa_node_to_cpus(format_data->nic_numa_node, mask);
505                        for (i = 0 ; i < nb_cpu; ++i) {
506                                if (numa_bitmask_isbitset(mask,i))
507                                        max_node_cpu = i+1;
508                        }
509                        my_cpu = max_node_cpu;
510                }
511        }
512#endif
513        if (my_cpu < 0) {
514                my_cpu = nb_cpu;
515        }
516
517
518        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
519                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
520
521        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
522                snprintf(err, errlen,
523                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
524                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
525                return -1;
526        }
527
528        /* Make our mask with all cores turned on this is so that DPDK
529         * gets all CPU info in older versions */
530        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
531        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
532
533#if !DPDK_USE_BLACKLIST
534        /* Black list all ports besides the one that we want to use */
535        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
536                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
537                         " are you sure the address is correct?: %s", strerror(-ret));
538                return -1;
539        }
540#endif
541
542        /* Give the memory map a unique name */
543        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
544        /* rte_eal_init it makes a call to getopt so we need to reset the
545         * global optind variable of getopt otherwise this fails */
546        save_getopts(&save_opts);
547        optind = 1;
548        if ((ret = rte_eal_init(argc, argv)) < 0) {
549                snprintf(err, errlen,
550                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
551                return -1;
552        }
553        restore_getopts(&save_opts);
554        // These are still running but will never do anything with DPDK v1.7 we
555        // should remove this XXX in the future
556        for(i = 0; i < RTE_MAX_LCORE; ++i) {
557                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
558                        cfg->lcore_role[i] = ROLE_OFF;
559                        cfg->lcore_count--;
560                }
561        }
562        // Only the master should be running
563        if (cfg->lcore_count != 1) {
564                fprintf(stderr, "Expected only the master core to be running in dpdk_init_environment()\n");
565                return -1;
566        }
567
568        // TODO XXX TODO
569        dpdk_move_master_lcore(NULL, my_cpu-1);
570
571#if DEBUG
572        dump_configuration();
573#endif
574
575#if DPDK_USE_PMD_INIT
576        /* This registers all available NICs with Intel DPDK
577         * These are not loaded until rte_eal_pci_probe() is called.
578         */
579        if ((ret = rte_pmd_init_all()) < 0) {
580                snprintf(err, errlen,
581                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
582                return -1;
583        }
584#endif
585
586#if DPDK_USE_BLACKLIST
587        /* Blacklist all ports besides the one that we want to use */
588        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
589                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
590                         " are you sure the address is correct?: %s", strerror(-ret));
591                return -1;
592        }
593#endif
594
595#if DPDK_USE_PCI_PROBE
596        /* This loads DPDK drivers against all ports that are not blacklisted */
597        if ((ret = rte_eal_pci_probe()) < 0) {
598                snprintf(err, errlen,
599                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
600                return -1;
601        }
602#endif
603
604        format_data->nb_ports = rte_eth_dev_count();
605
606        if (format_data->nb_ports != 1) {
607                snprintf(err, errlen,
608                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
609                         format_data->nb_ports);
610                return -1;
611        }
612
613        return 0;
614}
615
616int dpdk_init_input (libtrace_t *libtrace) {
617        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
618        char err[500];
619        err[0] = 0;
620
621        libtrace->format_data = (struct dpdk_format_data_t *)
622                                malloc(sizeof(struct dpdk_format_data_t));
623
624        if (!libtrace->format_data) {
625                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
626                        "format data inside dpdk_init_input()");
627                return 1;
628        }
629
630        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
631        FORMAT(libtrace)->nb_ports = 0;
632        FORMAT(libtrace)->snaplen = 0; /* Use default */
633        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
634        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
635        FORMAT(libtrace)->nic_numa_node = -1;
636        FORMAT(libtrace)->promisc = -1;
637        FORMAT(libtrace)->pktmbuf_pool = NULL;
638#if DPDK_USE_BLACKLIST
639        FORMAT(libtrace)->nb_blacklist = 0;
640#endif
641        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
642        FORMAT(libtrace)->mempool_name[0] = 0;
643        memset(FORMAT(libtrace)->burst_pkts, 0,
644               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
645        FORMAT(libtrace)->burst_size = 0;
646        FORMAT(libtrace)->burst_offset = 0;
647        FORMAT(libtrace)->hasher_type = HASHER_BALANCE;
648        FORMAT(libtrace)->rss_key = NULL;
649
650        /* Make our first stream */
651        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
652        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
653
654        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
655                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
656                free(libtrace->format_data);
657                libtrace->format_data = NULL;
658                return -1;
659        }
660        return 0;
661}
662
663static int dpdk_init_output(libtrace_out_t *libtrace)
664{
665        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
666        char err[500];
667        err[0] = 0;
668
669        libtrace->format_data = (struct dpdk_format_data_t *)
670                                malloc(sizeof(struct dpdk_format_data_t));
671
672        if (!libtrace->format_data) {
673                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
674                        "format data inside dpdk_init_output()");
675                return -1;
676        }
677        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
678        FORMAT(libtrace)->nb_ports = 0;
679        FORMAT(libtrace)->snaplen = 0; /* Use default */
680        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
681        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
682        FORMAT(libtrace)->nic_numa_node = -1;
683        FORMAT(libtrace)->promisc = -1;
684        FORMAT(libtrace)->pktmbuf_pool = NULL;
685#if DPDK_USE_BLACKLIST
686        FORMAT(libtrace)->nb_blacklist = 0;
687#endif
688        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
689        FORMAT(libtrace)->mempool_name[0] = 0;
690        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
691        FORMAT(libtrace)->burst_size = 0;
692        FORMAT(libtrace)->burst_offset = 0;
693
694        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
695        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
696
697        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
698                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
699                free(libtrace->format_data);
700                libtrace->format_data = NULL;
701                return -1;
702        }
703        return 0;
704}
705
706/**
707 * Note here snaplen excludes the MAC checksum. Packets over
708 * the requested snaplen will be dropped. (Excluding MAC checksum)
709 *
710 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
711 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
712 * is set the maximum size of the returned packet would be 1518 otherwise
713 * 1514 would be the largest size possibly returned.
714 *
715 */
716int dpdk_config_input (libtrace_t *libtrace,
717                              trace_option_t option,
718                              void *data) {
719        switch (option) {
720        case TRACE_OPTION_SNAPLEN:
721                /* Only support changing snaplen before a call to start is
722                 * made */
723                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
724                        FORMAT(libtrace)->snaplen=*(int*)data;
725                else
726                        return -1;
727                return 0;
728        case TRACE_OPTION_PROMISC:
729                FORMAT(libtrace)->promisc=*(int*)data;
730                return 0;
731        case TRACE_OPTION_HASHER:
732                switch (*((enum hasher_types *) data))
733                {
734                case HASHER_BALANCE:
735                case HASHER_UNIDIRECTIONAL:
736                case HASHER_BIDIRECTIONAL:
737                        FORMAT(libtrace)->hasher_type = *(enum hasher_types*)data;
738                        if (FORMAT(libtrace)->rss_key)
739                                free(FORMAT(libtrace)->rss_key);
740                        FORMAT(libtrace)->rss_key = NULL;
741                        return 0;
742                case HASHER_CUSTOM:
743                        // Let libtrace do this
744                        return -1;
745                }
746                break;
747        case TRACE_OPTION_FILTER:
748                /* TODO filtering */
749        case TRACE_OPTION_META_FREQ:
750        case TRACE_OPTION_DISCARD_META:
751        case TRACE_OPTION_EVENT_REALTIME:
752        case TRACE_OPTION_REPLAY_SPEEDUP:
753        case TRACE_OPTION_CONSTANT_ERF_FRAMING:
754                break;
755        /* Avoid default: so that future options will cause a warning
756         * here to remind us to implement it, or flag it as
757         * unimplementable
758         */
759        }
760
761        /* Don't set an error - trace_config will try to deal with the
762         * option and will set an error if it fails */
763        return -1;
764}
765
766/* Can set jumbo frames/ or limit the size of a frame by setting both
767 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
768 *
769 */
770static struct rte_eth_conf port_conf = {
771        .rxmode = {
772                .mq_mode = ETH_RSS,
773                .split_hdr_size = 0,
774                .header_split   = 0, /**< Header Split disabled */
775                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
776                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
777                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
778                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
779#if GET_MAC_CRC_CHECKSUM
780/* So it appears that if hw_strip_crc is turned off the driver will still
781 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
782 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
783 * So lets just add it back on when we receive the packet.
784 */
785                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
786#else
787/* By default strip the MAC checksum because it's a bit of a hack to
788 * actually read these. And don't want to rely on disabling this to actualy
789 * always cut off the checksum in the future
790 */
791                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
792#endif
793        },
794        .txmode = {
795                .mq_mode = ETH_DCB_NONE,
796        },
797        .rx_adv_conf = {
798                .rss_conf = {
799                        .rss_hf = RX_RSS_FLAGS,
800                },
801        },
802        .intr_conf = {
803                .lsc = 1
804        }
805};
806
807static const struct rte_eth_rxconf rx_conf = {
808        .rx_thresh = {
809                .pthresh = 8,/* RX_PTHRESH prefetch */
810                .hthresh = 8,/* RX_HTHRESH host */
811                .wthresh = 4,/* RX_WTHRESH writeback */
812        },
813        .rx_free_thresh = 0,
814        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
815};
816
817static const struct rte_eth_txconf tx_conf = {
818        .tx_thresh = {
819                /*
820                 * TX_PTHRESH prefetch
821                 * Set on the NIC, if the number of unprocessed descriptors to queued on
822                 * the card fall below this try grab at least hthresh more unprocessed
823                 * descriptors.
824                 */
825                .pthresh = 36,
826
827                /* TX_HTHRESH host
828                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
829                 */
830                .hthresh = 0,
831
832                /* TX_WTHRESH writeback
833                 * Set on the NIC, the number of sent descriptors before writing back
834                 * status to confirm the transmission. This is done more efficiently as
835                 * a bulk DMA-transfer rather than writing one at a time.
836                 * Similar to tx_free_thresh however this is applied to the NIC, where
837                 * as tx_free_thresh is when DPDK will check these. This is extended
838                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
839                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
840                 */
841                .wthresh = 4,
842        },
843
844        /* Used internally by DPDK rather than passed to the NIC. The number of
845         * packet descriptors to send before checking for any responses written
846         * back (to confirm the transmission). Default = 32 if set to 0)
847         */
848        .tx_free_thresh = 0,
849
850        /* This is the Report Status threshold, used by 10Gbit cards,
851         * This signals the card to only write back status (such as
852         * transmission successful) after this minimum number of transmit
853         * descriptors are seen. The default is 32 (if set to 0) however if set
854         * to greater than 1 TX wthresh must be set to zero, because this is kindof
855         * a replacement. See the dpdk programmers guide for more restrictions.
856         */
857        .tx_rs_thresh = 1,
858};
859
860/**
861 * A callback for a link state change (LSC).
862 *
863 * Packets may be received before this notification. In fact the DPDK IGXBE
864 * driver likes to put a delay upto 5sec before sending this.
865 *
866 * We use this to ensure the link speed is correct for our timestamp
867 * calculations. Because packets might be received before the link up we still
868 * update this when the packet is received.
869 *
870 * @param port The DPDK port
871 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
872 * @param cb_arg The dpdk_format_data_t structure associated with the format
873 */
874#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
875static int dpdk_lsc_callback(portid_t port, enum rte_eth_event_type event,
876                              void *cb_arg, void *retparam UNUSED) {
877#else
878static void dpdk_lsc_callback(portid_t port, enum rte_eth_event_type event,
879                              void *cb_arg) {
880#endif
881        struct dpdk_format_data_t * format_data = cb_arg;
882        struct rte_eth_link link_info;
883        if (event != RTE_ETH_EVENT_INTR_LSC) {
884                fprintf(stderr, "Received unexpected event in dpdk_lsc_callback()\n");
885                #if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
886                return -1;
887                #else
888                return;
889                #endif
890        }
891        if (port != format_data->port) {
892                fprintf(stderr, "Port does not match port in format data in dpdk_lsc_callback()\n");
893                #if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
894                return -1;
895                #else
896                return;
897                #endif
898        }
899
900        rte_eth_link_get_nowait(port, &link_info);
901
902        if (link_info.link_status)
903                format_data->link_speed = link_info.link_speed;
904        else
905                format_data->link_speed = 0;
906
907#if DEBUG
908        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
909                link_info.link_status ? "up" : "down",
910                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
911                                          "full-duplex" : "half-duplex",
912                (int) link_info.link_speed);
913#endif
914
915        /* Turns out DPDK drivers might not come back up if the link speed
916         * changes. So we reset the autoneg procedure. This is very unsafe
917         * we have have threads reading packets and we stop the port. */
918#if 0
919        if (!link_info.link_status) {
920                int ret;
921                rte_eth_dev_stop(port);
922                ret = rte_eth_dev_start(port);
923                if (ret < 0) {
924                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
925                                strerror(-ret));
926                }
927        }
928#endif
929#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
930        return 0;
931#endif
932}
933
934/** Reserve a DPDK lcore ID for a thread globally.
935 *
936 * @param real If true allocate a real lcore, otherwise allocate a core which
937 * does not exist on the local machine.
938 * @param socket the prefered NUMA socket - only used if a real core is requested
939 * @return a valid core, which can later be used with dpdk_register_lcore() or a
940 * -1 if have run out of cores.
941 *
942 * If any thread is reading or freeing packets we need to register it here
943 * due to TLS caches in the memory pools.
944 */
945static int dpdk_reserve_lcore(bool real, int socket) {
946        int new_id = -1;
947        int i;
948        struct rte_config *cfg = rte_eal_get_configuration();
949        (void) socket;
950
951        pthread_mutex_lock(&dpdk_lock);
952        /* If 'reading packets' fill in cores from 0 up and bind affinity
953         * otherwise start from the MAX core (which is also the master) and work backwards
954         * in this case physical cores on the system will not exist so we don't bind
955         * these to any particular physical core */
956        if (real) {
957#ifdef HAVE_LIBNUMA
958                for (i = 0; i < RTE_MAX_LCORE; ++i) {
959                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
960                                new_id = i;
961                                if (!lcore_config[i].detected)
962                                        new_id = -1;
963                                break;
964                        }
965                }
966#endif
967                /* Retry without the the numa restriction */
968                if (new_id == -1) {
969                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
970                                if (!rte_lcore_is_enabled(i)) {
971                                        new_id = i;
972                                        if (!lcore_config[i].detected)
973                                                fprintf(stderr, "Warning the"
974                                                        " number of 'reading' "
975                                                        "threads exceed cores\n");
976                                        break;
977                                }
978                        }
979                }
980        } else {
981                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
982                        if (!rte_lcore_is_enabled(i)) {
983                                new_id = i;
984                                break;
985                        }
986                }
987        }
988
989        if (new_id != -1) {
990                /* Enable the core in global DPDK structs */
991                cfg->lcore_role[new_id] = ROLE_RTE;
992                cfg->lcore_count++;
993        }
994
995        pthread_mutex_unlock(&dpdk_lock);
996        return new_id;
997}
998
999/** Register a thread as a lcore
1000 * @param libtrace any error is set against libtrace on exit
1001 * @param real If this is a true lcore we will bind its affinty to the
1002 * requested core.
1003 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
1004 * @return 0, if successful otherwise -1 if an error occured (details are stored
1005 * in libtrace)
1006 *
1007 * @note This must be called from the thread being registered.
1008 */
1009static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
1010        int ret;
1011        RTE_PER_LCORE(_lcore_id) = lcore;
1012
1013        /* Set affinity bind to corresponding core */
1014        if (real) {
1015                cpu_set_t cpuset;
1016                CPU_ZERO(&cpuset);
1017                CPU_SET(rte_lcore_id(), &cpuset);
1018                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1019                if (ret != 0) {
1020                        trace_set_err(libtrace, errno, "Warning "
1021                                      "pthread_setaffinity_np failed");
1022                        return -1;
1023                }
1024        }
1025
1026        return 0;
1027}
1028
1029/** Allocates a new dpdk packet buffer memory pool.
1030 *
1031 * @param n The number of threads
1032 * @param pkt_size The packet size we need ot store
1033 * @param socket_id The NUMA socket id
1034 * @param A new mempool, if NULL query the DPDK library for the error code
1035 * see rte_mempool_create() documentation.
1036 *
1037 * This allocates a new pool or recycles an existing memory pool.
1038 * Call dpdk_free_memory() to free the memory.
1039 * We cannot delete memory so instead we store the pools, allowing them to be
1040 * re-used.
1041 */
1042static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1043                                             unsigned pkt_size,
1044                                             int socket_id) {
1045        struct rte_mempool *ret;
1046        size_t j,k;
1047        char name[MEMPOOL_NAME_LEN];
1048
1049        /* Add on packet size overheads */
1050        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1051
1052        pthread_mutex_lock(&dpdk_lock);
1053
1054        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1055                /* Best guess go for zero */
1056                socket_id = 0;
1057        }
1058
1059        /* Find a valid pool */
1060        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1061                if (mem_pools[socket_id][j]->size >= n &&
1062                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1063                        break;
1064                }
1065        }
1066
1067        /* Find the end (+1) of the list */
1068        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1069
1070        if (mem_pools[socket_id][j]) {
1071                ret = mem_pools[socket_id][j];
1072                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1073                mem_pools[socket_id][k-1] = NULL;
1074                mem_pools[socket_id][j] = NULL;
1075        } else {
1076                static uint32_t test = 10;
1077                test++;
1078                snprintf(name, MEMPOOL_NAME_LEN,
1079                         "libtrace_pool_%"PRIu32, test);
1080
1081                ret = rte_mempool_create(name, n, pkt_size,
1082                                         128, sizeof(struct rte_pktmbuf_pool_private),
1083                                         rte_pktmbuf_pool_init, NULL,
1084                                         rte_pktmbuf_init, NULL,
1085                                         socket_id, 0);
1086        }
1087
1088        pthread_mutex_unlock(&dpdk_lock);
1089        return ret;
1090}
1091
1092/** Stores the memory against the DPDK library.
1093 *
1094 * @param mempool The mempool to free
1095 * @param socket_id The NUMA socket this mempool was allocated upon.
1096 *
1097 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1098 * store the memory shared globally against the format.
1099 */
1100static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1101        size_t i;
1102        pthread_mutex_lock(&dpdk_lock);
1103
1104        /* We should have all entries back in the mempool */
1105        rte_mempool_audit(mempool);
1106        if (!rte_mempool_full(mempool)) {
1107                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1108                        "free all packets before finishing a trace\n",
1109                        rte_mempool_avail_count(mempool), mempool->size);
1110        }
1111
1112        /* Find the end (+1) of the list */
1113        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1114
1115        if (i >= RTE_MAX_LCORE) {
1116                fprintf(stderr, "Too many memory pools, dropping this one\n");
1117        } else {
1118                mem_pools[socket_id][i] = mempool;
1119        }
1120
1121        pthread_mutex_unlock(&dpdk_lock);
1122}
1123
1124/* Attach memory to the port and start (or restart) the port/s.
1125 */
1126static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1127                              char *err, int errlen, uint16_t rx_queues) {
1128        int ret, i;
1129        struct rte_eth_link link_info; /* Wait for link */
1130        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1131
1132        /* Already started */
1133        if (format_data->paused == DPDK_RUNNING)
1134                return 0;
1135
1136        /* First time started we need to alloc our memory, doing this here
1137         * rather than in environment setup because we don't have snaplen then */
1138        if (format_data->paused == DPDK_NEVER_STARTED) {
1139                if (format_data->snaplen == 0) {
1140                        format_data->snaplen = RX_MBUF_SIZE;
1141                        port_conf.rxmode.jumbo_frame = 0;
1142                        port_conf.rxmode.max_rx_pkt_len = 0;
1143                } else {
1144                        double expn;
1145
1146                        /* Use jumbo frames */
1147                        port_conf.rxmode.jumbo_frame = 1;
1148                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1149
1150                        /* Use less buffers if we're supporting jumbo frames
1151                         * otherwise we won't be able to allocate memory.
1152                         */
1153                        if (format_data->snaplen > 1500) {
1154                                format_data->nb_rx_buf /= 2;
1155                        }
1156
1157                        /* snaplen should be rounded up to next power of two
1158                         * to ensure enough memory is allocated for each
1159                         * mbuf :(
1160                         */
1161                        expn = ceil(log2((double)(format_data->snaplen)));
1162                        format_data->snaplen = pow(2, (int)expn);
1163                }
1164
1165#if GET_MAC_CRC_CHECKSUM
1166                /* This is additional overhead so make sure we allow space for this */
1167                format_data->snaplen += ETHER_CRC_LEN;
1168#endif
1169#if HAS_HW_TIMESTAMPS_82580
1170                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1171#endif
1172
1173                /* Create the mbuf pool, which is the place packets are allocated
1174                 * from - There is no free function (I cannot see one).
1175                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1176                 * allocate however that extra 1 packet is not used.
1177                 * (I assume <= vs < error some where in DPDK code)
1178                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1179                 * so that will fill the new buffer and wait until slots in the
1180                 * ring become available.
1181                 */
1182#if DEBUG
1183                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1184#endif
1185                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1186                                                              format_data->snaplen,
1187                                                              format_data->nic_numa_node);
1188
1189                if (format_data->pktmbuf_pool == NULL) {
1190                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1191                                 "pool failed: %s", strerror(rte_errno));
1192                        return -1;
1193                }
1194        }
1195
1196        /* Generate the hash key, based on the device */
1197        uint8_t rss_size = 52; // 52 for i40e, 40 for others, use the largest by default
1198        // In new versions DPDK we can query the size
1199#if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
1200        struct rte_eth_dev_info dev_info;
1201        rte_eth_dev_info_get(format_data->port, &dev_info);
1202        rss_size = dev_info.hash_key_size;
1203#endif
1204        if (rss_size != 0) {
1205                format_data->rss_key = malloc(rss_size);
1206                if (format_data->hasher_type == HASHER_BIDIRECTIONAL) {
1207                        toeplitz_ncreate_bikey(format_data->rss_key, rss_size);
1208                } else {
1209                        toeplitz_ncreate_unikey(format_data->rss_key, rss_size);
1210                }
1211                port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1212#if RTE_VERSION >= RTE_VERSION_NUM(1, 7, 0, 1)
1213                port_conf.rx_adv_conf.rss_conf.rss_key_len = rss_size;
1214#endif
1215        } else {
1216                fprintf(stderr, "DPDK couldn't configure RSS hashing!");
1217        }
1218
1219        /* ----------- Now do the setup for the port mapping ------------ */
1220        /* Order of calls must be
1221         * rte_eth_dev_configure()
1222         * rte_eth_tx_queue_setup()
1223         * rte_eth_rx_queue_setup()
1224         * rte_eth_dev_start()
1225         * other rte_eth calls
1226         */
1227
1228        /* This must be called first before another *eth* function
1229         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1230        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1231        if (ret < 0) {
1232                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1233                         " %"PRIu8" : %s", format_data->port,
1234                         strerror(-ret));
1235                return -1;
1236        }
1237#if DEBUG
1238        fprintf(stderr, "Doing dev configure\n");
1239#endif
1240        /* Initialise the TX queue a minimum value if using this port for
1241         * receiving. Otherwise a larger size if writing packets.
1242         */
1243        ret = rte_eth_tx_queue_setup(format_data->port,
1244                                     0 /* queue XXX */,
1245                                     format_data->nb_tx_buf,
1246                                     SOCKET_ID_ANY,
1247                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1248        if (ret < 0) {
1249                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1250                         " on port %d : %s", (int)format_data->port,
1251                         strerror(-ret));
1252                return -1;
1253        }
1254
1255        /* Attach memory to our RX queues */
1256        for (i=0; i < rx_queues; i++) {
1257                dpdk_per_stream_t *stream;
1258#if DEBUG
1259                fprintf(stderr, "Configuring queue %d\n", i);
1260#endif
1261
1262                /* Add storage for the stream */
1263                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1264                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1265                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1266                stream->queue_id = i;
1267
1268                if (stream->lcore == -1)
1269                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1270
1271                if (stream->lcore == -1) {
1272                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1273                                 ". Too many threads?");
1274                        return -1;
1275                }
1276
1277                if (stream->mempool == NULL) {
1278                        stream->mempool = dpdk_alloc_memory(
1279                                                  format_data->nb_rx_buf*2,
1280                                                  format_data->snaplen,
1281                                                  rte_lcore_to_socket_id(stream->lcore));
1282
1283                        if (stream->mempool == NULL) {
1284                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1285                                         "pool failed: %s", strerror(rte_errno));
1286                                return -1;
1287                        }
1288                }
1289
1290                /* Initialise the RX queue with some packets from memory */
1291                ret = rte_eth_rx_queue_setup(format_data->port,
1292                                             stream->queue_id,
1293                                             format_data->nb_rx_buf,
1294                                             format_data->nic_numa_node,
1295                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1296                                             stream->mempool);
1297                if (ret < 0) {
1298                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1299                                 " RX queue on port %d : %s",
1300                                 (int)format_data->port,
1301                                 strerror(-ret));
1302                        return -1;
1303                }
1304        }
1305
1306#if DEBUG
1307        fprintf(stderr, "Doing start device\n");
1308#endif
1309        rte_eth_stats_reset(format_data->port);
1310        /* Start device */
1311        ret = rte_eth_dev_start(format_data->port);
1312        if (ret < 0) {
1313                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1314                         strerror(-ret));
1315                return -1;
1316        }
1317
1318        /* Default promiscuous to on */
1319        if (format_data->promisc == -1)
1320                format_data->promisc = 1;
1321
1322        if (format_data->promisc == 1)
1323                rte_eth_promiscuous_enable(format_data->port);
1324        else
1325                rte_eth_promiscuous_disable(format_data->port);
1326
1327        /* We have now successfully started/unpased */
1328        format_data->paused = DPDK_RUNNING;
1329
1330
1331        /* Register a callback for link state changes */
1332        ret = rte_eth_dev_callback_register(format_data->port,
1333                                            RTE_ETH_EVENT_INTR_LSC,
1334                                            dpdk_lsc_callback,
1335                                            format_data);
1336#if DEBUG
1337        if (ret)
1338                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1339                        ret, strerror(-ret));
1340#endif
1341
1342        /* Get the current link status */
1343        rte_eth_link_get_nowait(format_data->port, &link_info);
1344        format_data->link_speed = link_info.link_speed;
1345#if DEBUG
1346        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1347                (int) link_info.link_duplex, (int) link_info.link_speed);
1348#endif
1349
1350        return 0;
1351}
1352
1353int dpdk_start_input (libtrace_t *libtrace) {
1354        char err[500];
1355        err[0] = 0;
1356
1357        /* Make sure we don't reserve an extra thread for this */
1358        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1359
1360        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1361                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1362                free(libtrace->format_data);
1363                libtrace->format_data = NULL;
1364                return -1;
1365        }
1366        return 0;
1367}
1368
1369static inline size_t dpdk_get_max_rx_queues (portid_t port_id) {
1370        struct rte_eth_dev_info dev_info;
1371        rte_eth_dev_info_get(port_id, &dev_info);
1372        return dev_info.max_rx_queues;
1373}
1374
1375static inline size_t dpdk_processor_count () {
1376        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1377        if (nb_cpu <= 0)
1378                return 1;
1379        else
1380                return (size_t) nb_cpu;
1381}
1382
1383int dpdk_pstart_input (libtrace_t *libtrace) {
1384        char err[500];
1385        int i=0, phys_cores=0;
1386        int tot = libtrace->perpkt_thread_count;
1387        libtrace_list_node_t *n;
1388        err[0] = 0;
1389
1390        if (rte_lcore_id() != rte_get_master_lcore())
1391                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1392                        " from the master DPDK thread!\n");
1393
1394        /* If the master is not on the last thread we move it there */
1395        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1396                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1397                        return -1;
1398        }
1399
1400        /* Don't exceed the number of cores in the system/detected by dpdk
1401         * We don't have to force this but performance wont be good if we don't */
1402        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1403                if (lcore_config[i].detected) {
1404                        if (rte_lcore_is_enabled(i)) {
1405#if DEBUG
1406                                fprintf(stderr, "Found core %d already in use!\n", i);
1407#endif
1408                        } else {
1409                                phys_cores++;
1410                        }
1411                }
1412        }
1413        /* If we are restarting we have already allocated some threads as such
1414         * we add these back to the count for this calculation */
1415        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1416                dpdk_per_stream_t * stream = n->data;
1417                if (stream->lcore != -1)
1418                        phys_cores++;
1419        }
1420
1421        tot = MIN(libtrace->perpkt_thread_count,
1422                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1423        tot = MIN(tot, phys_cores);
1424
1425#if DEBUG
1426        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1427                libtrace->perpkt_thread_count, phys_cores);
1428#endif
1429
1430        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1431                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1432                free(libtrace->format_data);
1433                libtrace->format_data = NULL;
1434                return -1;
1435        }
1436
1437        /* Make sure we only start the number that we should */
1438        libtrace->perpkt_thread_count = tot;
1439        return 0;
1440}
1441
1442/**
1443 * Register a thread with the DPDK system,
1444 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1445 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1446 * gives it.
1447 *
1448 * We then allow a mapper thread to be started on every real core as DPDK would,
1449 * we also bind these to the corresponding CPU cores.
1450 *
1451 * @param libtrace A pointer to the trace
1452 * @param reading True if the thread will be used to read packets, i.e. will
1453 *                call pread_packet(), false if thread used to process packet
1454 *                in any other manner including statistics functions.
1455 */
1456int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1457{
1458#if DEBUG
1459        char name[99];
1460        name[0] = 0;
1461#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1462        pthread_getname_np(pthread_self(),
1463                           name, sizeof(name));
1464#endif
1465#endif
1466        if (reading) {
1467                dpdk_per_stream_t *stream;
1468                /* Attach our thread */
1469                if(t->type == THREAD_PERPKT) {
1470                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1471                        if (t->format_data == NULL) {
1472                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1473                                              "Too many threads registered");
1474                                return -1;
1475                        }
1476                } else {
1477                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1478                }
1479                stream = t->format_data;
1480#if DEBUG
1481                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1482#endif
1483                return dpdk_register_lcore(libtrace, true, stream->lcore);
1484        } else {
1485                int lcore = dpdk_reserve_lcore(reading, 0);
1486                if (lcore == -1) {
1487                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1488                                      " for DPDK");
1489                        return -1;
1490                }
1491#if DEBUG
1492                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1493#endif
1494                return dpdk_register_lcore(libtrace, false, lcore);
1495        }
1496
1497        return 0;
1498}
1499
1500/**
1501 * Unregister a thread with the DPDK system.
1502 *
1503 * Only previously registered threads should be calling this just before
1504 * they are destroyed.
1505 */
1506void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1507{
1508        struct rte_config *cfg = rte_eal_get_configuration();
1509
1510        if (rte_lcore_id() >= RTE_MAX_LCORE) {
1511                fprintf(stderr, "Expected core id less than or equal to RTE_MAX_LCORE in "
1512                        "dpdk_punregister_thread()\n");
1513                return;
1514        }
1515        pthread_mutex_lock(&dpdk_lock);
1516        /* Skip if master */
1517        if (rte_lcore_id() == rte_get_master_lcore()) {
1518                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1519                pthread_mutex_unlock(&dpdk_lock);
1520                return;
1521        }
1522
1523        /* Disable this core in global DPDK structs */
1524        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1525        cfg->lcore_count--;
1526        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1527        if (cfg->lcore_count < 1) {
1528                fprintf(stderr, "You cannot unregister the master lcore in dpdk_punregister_thread()\n");
1529                return;
1530        }
1531        pthread_mutex_unlock(&dpdk_lock);
1532        return;
1533}
1534
1535static int dpdk_start_output(libtrace_out_t *libtrace)
1536{
1537        char err[500];
1538        err[0] = 0;
1539
1540        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1541                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1542                free(libtrace->format_data);
1543                libtrace->format_data = NULL;
1544                return -1;
1545        }
1546        return 0;
1547}
1548
1549int dpdk_pause_input(libtrace_t * libtrace) {
1550        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1551        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1552        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1553#if DEBUG
1554                fprintf(stderr, "Pausing DPDK port\n");
1555#endif
1556                rte_eth_dev_stop(FORMAT(libtrace)->port);
1557                FORMAT(libtrace)->paused = DPDK_PAUSED;
1558                /* Empty the queue of packets */
1559                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1560                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1561                }
1562                FORMAT(libtrace)->burst_offset = 0;
1563                FORMAT(libtrace)->burst_size = 0;
1564
1565                for (; tmp != NULL; tmp = tmp->next) {
1566                        dpdk_per_stream_t *stream = tmp->data;
1567                        stream->ts_last_sys = 0;
1568#if HAS_HW_TIMESTAMPS_82580
1569                        stream->ts_first_sys = 0;
1570#endif
1571                }
1572
1573        }
1574        return 0;
1575}
1576
1577static int dpdk_write_packet(libtrace_out_t *trace,
1578                             libtrace_packet_t *packet){
1579
1580        /* Check dpdk can write this type of packet */
1581        if (!dpdk_can_write(packet)) {
1582                return 0;
1583        }
1584
1585        struct rte_mbuf* m_buff[1];
1586
1587        int wirelen = trace_get_wire_length(packet);
1588        int caplen = trace_get_capture_length(packet);
1589
1590        /* Check for a checksum and remove it */
1591        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1592            wirelen == caplen)
1593                caplen -= ETHER_CRC_LEN;
1594
1595        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1596        if (m_buff[0] == NULL) {
1597                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1598                return -1;
1599        } else {
1600                int ret;
1601                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1602                do {
1603                        ret = rte_eth_tx_burst(FORMAT(trace)->port, 0 /*queue TODO*/, m_buff, 1);
1604                } while (ret != 1);
1605        }
1606
1607        return 0;
1608}
1609
1610int dpdk_fin_input(libtrace_t * libtrace) {
1611        libtrace_list_node_t * n;
1612        /* Free our memory structures */
1613        if (libtrace->format_data != NULL) {
1614
1615                if (FORMAT(libtrace)->port != 0xFF)
1616                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1617                                                        RTE_ETH_EVENT_INTR_LSC,
1618                                                        dpdk_lsc_callback,
1619                                                        FORMAT(libtrace));
1620                /* Close the device completely, device cannot be restarted */
1621                rte_eth_dev_close(FORMAT(libtrace)->port);
1622
1623                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1624                                 FORMAT(libtrace)->nic_numa_node);
1625
1626                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1627                        dpdk_per_stream_t * stream = n->data;
1628                        if (stream->mempool)
1629                                dpdk_free_memory(stream->mempool,
1630                                                 rte_lcore_to_socket_id(stream->lcore));
1631                }
1632
1633                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1634                /* filter here if we used it */
1635                if (FORMAT(libtrace)->rss_key)
1636                        free(FORMAT(libtrace)->rss_key);
1637                free(libtrace->format_data);
1638        }
1639
1640        return 0;
1641}
1642
1643
1644static int dpdk_fin_output(libtrace_out_t * libtrace) {
1645        /* Free our memory structures */
1646        if (libtrace->format_data != NULL) {
1647                /* Close the device completely, device cannot be restarted */
1648                if (FORMAT(libtrace)->port != 0xFF)
1649                        rte_eth_dev_close(FORMAT(libtrace)->port);
1650                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1651                /* filter here if we used it */
1652                free(libtrace->format_data);
1653        }
1654
1655        return 0;
1656}
1657
1658/**
1659 * Get the start of the additional header that we added to a packet.
1660 */
1661static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1662        if (!packet) {
1663                fprintf(stderr, "NULL packet passed into dpdk_addt_hdr()\n");
1664                return NULL;
1665        }
1666        if (!packet->buffer) {
1667                fprintf(stderr, "NULL packet buffer passed into dpdk_addt_hdr()\n");
1668                return NULL;
1669        }
1670        /* Our header sits straight after the mbuf header */
1671        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1672}
1673
1674static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1675        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1676        return hdr->cap_len;
1677}
1678
1679static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1680        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1681        if (size > hdr->cap_len) {
1682                /* Cannot make a packet bigger */
1683                return trace_get_capture_length(packet);
1684        }
1685
1686        /* Reset the cached capture length first*/
1687        packet->cached.capture_length = -1;
1688        hdr->cap_len = (uint32_t) size;
1689        return trace_get_capture_length(packet);
1690}
1691
1692static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1693        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1694        int org_cap_size; /* The original capture size */
1695        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1696                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1697                               sizeof(struct hw_timestamp_82580);
1698        } else {
1699                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1700        }
1701        if (hdr->flags & INCLUDES_CHECKSUM) {
1702                return org_cap_size;
1703        } else {
1704                /* DPDK packets are always TRACE_TYPE_ETH packets */
1705                return org_cap_size + ETHER_CRC_LEN;
1706        }
1707}
1708
1709int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1710        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1711        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1712                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1713                                sizeof(struct hw_timestamp_82580);
1714        else
1715                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1716}
1717
1718int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1719                               libtrace_packet_t *packet, void *buffer,
1720                               libtrace_rt_types_t rt_type, uint32_t flags) {
1721        if (!packet) {
1722                fprintf(stderr, "NULL packet passed into dpdk_prepare_packet()\n");
1723                return TRACE_ERR_NULL_PACKET;
1724        }
1725        if (packet->buffer != buffer &&
1726            packet->buf_control == TRACE_CTRL_PACKET) {
1727                free(packet->buffer);
1728        }
1729
1730        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1731                packet->buf_control = TRACE_CTRL_PACKET;
1732        else
1733                packet->buf_control = TRACE_CTRL_EXTERNAL;
1734
1735        packet->buffer = buffer;
1736        packet->header = buffer;
1737
1738        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1739        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1740        packet->type = rt_type;
1741        return 0;
1742}
1743
1744/**
1745 * Given a packet size and a link speed, computes the
1746 * time to transmit in nanoseconds.
1747 *
1748 * @param format_data The dpdk format data from which we get the link speed
1749 *        and if unset updates it in a thread safe manner
1750 * @param pkt_size The size of the packet in bytes
1751 * @return The wire time in nanoseconds
1752 */
1753static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1754        uint32_t wire_time;
1755        /* 20 extra bytes of interframe gap and preamble */
1756# if GET_MAC_CRC_CHECKSUM
1757        wire_time = ((pkt_size + 20) * 8000);
1758# else
1759        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1760# endif
1761
1762        /* Division is really slow and introduces a pipeline stall
1763         * The compiler will optimise this into magical multiplication and shifting
1764         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1765         */
1766retry_calc_wiretime:
1767        switch (format_data->link_speed) {
1768        case ETH_SPEED_NUM_40G:
1769                wire_time /=  ETH_SPEED_NUM_40G;
1770                break;
1771        case ETH_SPEED_NUM_20G:
1772                wire_time /= ETH_SPEED_NUM_20G;
1773                break;
1774        case ETH_SPEED_NUM_10G:
1775                wire_time /= ETH_SPEED_NUM_10G;
1776                break;
1777        case ETH_SPEED_NUM_1G:
1778                wire_time /= ETH_SPEED_NUM_1G;
1779                break;
1780        case 0:
1781                {
1782                /* Maybe the link was down originally, but now it should be up */
1783                struct rte_eth_link link = {0};
1784                rte_eth_link_get_nowait(format_data->port, &link);
1785                if (link.link_status && link.link_speed) {
1786                        format_data->link_speed = link.link_speed;
1787#ifdef DEBUG
1788                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1789#endif
1790                        goto retry_calc_wiretime;
1791                }
1792                /* We don't know the link speed, make sure numbers are counting up */
1793                wire_time = 1;
1794                break;
1795                }
1796        default:
1797                wire_time /= format_data->link_speed;
1798        }
1799        return wire_time;
1800}
1801
1802/**
1803 * Does any extra preperation to all captured packets
1804 * This includes adding our extra header to it with the timestamp,
1805 * and any snapping
1806 *
1807 * @param format_data The DPDK format data
1808 * @param plc The DPDK per lcore format data
1809 * @param pkts An array of size nb_pkts of DPDK packets
1810 */
1811static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1812                                   struct dpdk_per_stream_t *plc,
1813                                   struct rte_mbuf **pkts,
1814                                   size_t nb_pkts) {
1815        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1816        struct dpdk_addt_hdr *hdr;
1817        size_t i;
1818        uint64_t cur_sys_time_ns;
1819#if HAS_HW_TIMESTAMPS_82580
1820        struct hw_timestamp_82580 *hw_ts;
1821        uint64_t estimated_wraps;
1822#else
1823
1824#endif
1825
1826#if USE_CLOCK_GETTIME
1827        struct timespec cur_sys_time = {0};
1828        /* This looks terrible and I feel bad doing it. But it's OK
1829         * on new kernels, because this is a fast vsyscall */
1830        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1831        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1832#else
1833        struct timeval cur_sys_time = {0};
1834        /* Also a fast vsyscall */
1835        gettimeofday(&cur_sys_time, NULL);
1836        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1837#endif
1838
1839        /* The system clock is not perfect so when running
1840         * at linerate we could timestamp a packet in the past.
1841         * To avoid this we munge the timestamp to appear 1ns
1842         * after the previous packet. We should eventually catch up
1843         * to system time since a 64byte packet on a 10G link takes 67ns.
1844         *
1845         * Note with parallel readers timestamping packets
1846         * with duplicate stamps or out of order is unavoidable without
1847         * hardware timestamping from the NIC.
1848         */
1849#if !HAS_HW_TIMESTAMPS_82580
1850        if (plc->ts_last_sys >= cur_sys_time_ns) {
1851                cur_sys_time_ns = plc->ts_last_sys + 1;
1852        }
1853#endif
1854
1855        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1856        for (i = 0 ; i < nb_pkts ; ++i) {
1857
1858                /* We put our header straight after the dpdk header */
1859                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1860                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1861
1862#if GET_MAC_CRC_CHECKSUM
1863                /* Add back in the CRC sum */
1864                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1865                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1866                hdr->flags |= INCLUDES_CHECKSUM;
1867#endif
1868
1869                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1870
1871#if HAS_HW_TIMESTAMPS_82580
1872                /* The timestamp is sitting before our packet and is included in pkt_len */
1873                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1874                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1875                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1876
1877                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1878                 *
1879                 *        +----------+---+   +--------------+
1880                 *  82580 |    24    | 8 |   |      32      |
1881                 *        +----------+---+   +--------------+
1882                 *          reserved  \______ 40 bits _____/
1883                 *
1884                 * The 40 bit 82580 SYSTIM overflows every
1885                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1886                 *
1887                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1888                 * Endian (for the full 64 bits) i.e. picture is mirrored
1889                 */
1890
1891                /* Despite what the documentation says this is in Little
1892                 * Endian byteorder. Mask the reserved section out.
1893                 */
1894                hdr->timestamp = le64toh(hw_ts->timestamp) &
1895                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1896
1897                if (unlikely(plc->ts_first_sys == 0)) {
1898                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1899                        plc->ts_last_sys = plc->ts_first_sys;
1900                }
1901
1902                /* This will have serious problems if packets aren't read quickly
1903                 * that is within a couple of seconds because our clock cycles every
1904                 * 18 seconds */
1905                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1906                                  / (1ull<<TS_NBITS_82580);
1907
1908                /* Estimated_wraps gives the number of times the counter should have
1909                 * wrapped (however depending on value last time it could have wrapped
1910                 * twice more (if hw clock is close to its max value) or once less (allowing
1911                 * for a bit of variance between hw and sys clock). But if the clock
1912                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1913                if (unlikely(estimated_wraps >= 2)) {
1914                        /* 2 or more wrap arounds add all but the very last wrap */
1915                        plc->wrap_count += estimated_wraps - 1;
1916                }
1917
1918                /* Set the timestamp to the lowest possible value we're considering */
1919                hdr->timestamp += plc->ts_first_sys +
1920                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1921
1922                /* In most runs only the first if() will need evaluating - i.e our
1923                 * estimate is correct. */
1924                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1925                                              hdr->timestamp, MAXSKEW_82580))) {
1926                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1927                        plc->wrap_count++;
1928                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1929                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1930                                             hdr->timestamp, MAXSKEW_82580)) {
1931                                /* Failed to match estimated_wraps */
1932                                plc->wrap_count++;
1933                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1934                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1935                                                     hdr->timestamp, MAXSKEW_82580)) {
1936                                        if (estimated_wraps == 0) {
1937                                                /* 0 case Failed to match estimated_wraps+2 */
1938                                                printf("WARNING - Hardware Timestamp failed to"
1939                                                       " match using systemtime!\n");
1940                                                hdr->timestamp = cur_sys_time_ns;
1941                                        } else {
1942                                                /* Failed to match estimated_wraps+1 */
1943                                                plc->wrap_count++;
1944                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1945                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1946                                                                     hdr->timestamp, MAXSKEW_82580)) {
1947                                                        /* Failed to match estimated_wraps+2 */
1948                                                        printf("WARNING - Hardware Timestamp failed to"
1949                                                               " match using systemtime!!\n");
1950                                                }
1951                                        }
1952                                }
1953                        }
1954                }
1955#else
1956
1957                hdr->timestamp = cur_sys_time_ns;
1958                /* Offset the next packet by the wire time of previous */
1959                calculate_wire_time(format_data, hdr->cap_len);
1960
1961#endif
1962        }
1963
1964        plc->ts_last_sys = cur_sys_time_ns;
1965        return;
1966}
1967
1968
1969static void dpdk_fin_packet(libtrace_packet_t *packet)
1970{
1971        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1972                rte_pktmbuf_free(packet->buffer);
1973                packet->buffer = NULL;
1974        }
1975}
1976
1977/** Reads at least one packet or returns an error
1978 */
1979int dpdk_read_packet_stream (libtrace_t *libtrace,
1980                                           dpdk_per_stream_t *stream,
1981                                           libtrace_message_queue_t *mesg,
1982                                           struct rte_mbuf* pkts_burst[],
1983                                           size_t nb_packets) {
1984        size_t nb_rx; /* Number of rx packets we've recevied */
1985        while (1) {
1986                /* Poll for a batch of packets */
1987                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1988                                         stream->queue_id, pkts_burst, nb_packets);
1989                if (nb_rx > 0) {
1990                        /* Got some packets - otherwise we keep spining */
1991                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
1992                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1993                        return nb_rx;
1994                }
1995                /* Check the message queue this could be less than 0 */
1996                if (mesg && libtrace_message_queue_count(mesg) > 0)
1997                        return READ_MESSAGE;
1998                if ((nb_rx=is_halted(libtrace)) != (size_t) -1)
1999                        return nb_rx;
2000                /* Wait a while, polling on memory degrades performance
2001                 * This relieves the pressure on memory allowing the NIC to DMA */
2002                rte_delay_us(10);
2003        }
2004
2005        /* We'll never get here - but if we did it would be bad */
2006        return READ_ERROR;
2007}
2008
2009static int dpdk_pread_packets (libtrace_t *libtrace,
2010                                    libtrace_thread_t *t,
2011                                    libtrace_packet_t **packets,
2012                                    size_t nb_packets) {
2013        int nb_rx; /* Number of rx packets we've recevied */
2014        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2015        int i;
2016        dpdk_per_stream_t *stream = t->format_data;
2017        struct dpdk_addt_hdr * hdr;
2018
2019        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
2020                                         pkts_burst, nb_packets);
2021
2022        if (nb_rx > 0) {
2023                for (i = 0; i < nb_rx; ++i) {
2024                        if (packets[i]->buffer != NULL) {
2025                                /* The packet should always be finished */
2026                                if (packets[i]->buf_control != TRACE_CTRL_PACKET) {
2027                                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Expected packet buffer "
2028                                                "to be empty in dpdk_pread_packets()\n");
2029                                        return -1;
2030                                }
2031                                free(packets[i]->buffer);
2032                        }
2033                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2034                        packets[i]->type = TRACE_RT_DATA_DPDK;
2035                        packets[i]->buffer = pkts_burst[i];
2036                        packets[i]->trace = libtrace;
2037                        packets[i]->error = 1;
2038                        hdr = (struct dpdk_addt_hdr *) 
2039                                        ((struct rte_mbuf*) pkts_burst[i] + 1);
2040                        packets[i]->order = hdr->timestamp;
2041                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2042                }
2043        }
2044
2045        return nb_rx;
2046}
2047
2048int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2049        int nb_rx; /* Number of rx packets we've received */
2050        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2051
2052        /* Free the last packet buffer */
2053        if (packet->buffer != NULL) {
2054                /* The packet should always be finished */
2055                if (packet->buf_control != TRACE_CTRL_PACKET) {
2056                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Expected packet buffer to be "
2057                                "empty in dpdk_read_packet()\n");
2058                        return -1;
2059                }
2060                free(packet->buffer);
2061                packet->buffer = NULL;
2062        }
2063
2064        packet->buf_control = TRACE_CTRL_EXTERNAL;
2065        packet->type = TRACE_RT_DATA_DPDK;
2066
2067        /* Check if we already have some packets buffered */
2068        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2069                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2070                packet->trace = libtrace;
2071                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2072                return 1; // TODO should be bytes read, which essentially useless anyway
2073        }
2074
2075        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2076                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2077
2078        if (nb_rx > 0) {
2079                FORMAT(libtrace)->burst_size = nb_rx;
2080                FORMAT(libtrace)->burst_offset = 1;
2081                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2082                packet->trace = libtrace;
2083                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2084                return 1;
2085        }
2086        return nb_rx;
2087}
2088
2089static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2090        struct timeval tv;
2091        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2092
2093        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2094        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2095        return tv;
2096}
2097
2098static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2099        struct timespec ts;
2100        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2101
2102        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2103        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2104        return ts;
2105}
2106
2107static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2108        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2109}
2110
2111static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2112        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2113        return (libtrace_direction_t) hdr->direction;
2114}
2115
2116static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2117        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2118        hdr->direction = (uint8_t) direction;
2119        return (libtrace_direction_t) hdr->direction;
2120}
2121
2122void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2123        struct rte_eth_stats dev_stats = {0};
2124
2125        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2126                return;
2127
2128        /* Grab the current stats */
2129        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2130
2131        stats->captured_valid = true;
2132        stats->captured = dev_stats.ipackets;
2133
2134        stats->dropped_valid = true;
2135        stats->dropped = dev_stats.imissed;
2136
2137#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2138        /* DPDK commit 86057c fixes ensures missed does not get counted as
2139         * errors */
2140        stats->errors_valid = true;
2141        stats->errors = dev_stats.ierrors;
2142#else
2143        /* DPDK errors includes drops */
2144        stats->errors_valid = true;
2145        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2146#endif
2147        stats->received_valid = true;
2148        stats->received = dev_stats.ipackets + dev_stats.imissed;
2149
2150}
2151
2152/* Attempts to read a packet in a non-blocking fashion. If one is not
2153 * available a SLEEP event is returned. We do not have the ability to
2154 * create a select()able file descriptor in DPDK.
2155 */
2156libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2157                                            libtrace_packet_t *packet) {
2158        libtrace_eventobj_t event = {0,0,0.0,0};
2159        size_t nb_rx; /* Number of received packets we've read */
2160
2161        do {
2162
2163                /* No packets waiting in our buffer? Try and read some more */
2164                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
2165                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2166                                                 FORMAT_DATA_FIRST(trace)->queue_id,
2167                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
2168                        if (nb_rx > 0) {
2169                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
2170                                                FORMAT(trace)->burst_pkts, nb_rx);
2171                                FORMAT(trace)->burst_size = nb_rx;
2172                                FORMAT(trace)->burst_offset = 0;
2173                        }
2174                }
2175
2176                /* Now do we have packets waiting? */
2177                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
2178                        /* Free the last packet buffer */
2179                        if (packet->buffer != NULL) {
2180                                /* The packet should always be finished */
2181                                if (packet->buf_control != TRACE_CTRL_PACKET) {
2182                                        trace_set_err(trace, TRACE_ERR_BAD_PACKET, "Expected packet "
2183                                                "buffer to be empty in dpdk_trace_event()\n");
2184                                        event.type = TRACE_EVENT_TERMINATE;
2185                                        return event;
2186                                }
2187                                free(packet->buffer);
2188                                packet->buffer = NULL;
2189                        }
2190
2191                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2192                        packet->type = TRACE_RT_DATA_DPDK;
2193                        event.type = TRACE_EVENT_PACKET;
2194                        packet->buffer = FORMAT(trace)->burst_pkts[
2195                                             FORMAT(trace)->burst_offset++];
2196                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2197                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2198
2199                        /* XXX - Check this passes the filter trace_read_packet normally
2200                         * does this for us but this wont */
2201                        if (trace->filter) {
2202                                if (!trace_apply_filter(trace->filter, packet)) {
2203                                        /* Failed the filter so we loop for another packet */
2204                                        trace->filtered_packets ++;
2205                                        continue;
2206                                }
2207                        }
2208                        trace->accepted_packets ++;
2209                } else {
2210                        /* We only want to sleep for a very short time - we are non-blocking */
2211                        event.type = TRACE_EVENT_SLEEP;
2212                        event.seconds = 0.0001;
2213                        event.size = 0;
2214                }
2215
2216                /* If we get here we have our event */
2217                break;
2218        } while (1);
2219
2220        return event;
2221}
2222
2223static void dpdk_help(void) {
2224        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
2225        printf("Supported input URIs:\n");
2226        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2227        printf("\tThe -<coreid> is optional \n");
2228        printf("\t e.g. dpdk:0000:01:00.1\n");
2229        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2230        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2231        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2232        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2233        printf("\n");
2234        printf("Supported output URIs:\n");
2235        printf("\tSame format as the input URI.\n");
2236        printf("\t e.g. dpdk:0000:01:00.1\n");
2237        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2238        printf("\n");
2239}
2240
2241static struct libtrace_format_t dpdk = {
2242        "dpdk",
2243        "$Id$",
2244        TRACE_FORMAT_DPDK,
2245        NULL,                               /* probe filename */
2246        NULL,                               /* probe magic */
2247        dpdk_init_input,                    /* init_input */
2248        dpdk_config_input,                  /* config_input */
2249        dpdk_start_input,                   /* start_input */
2250        dpdk_pause_input,                   /* pause_input */
2251        dpdk_init_output,                   /* init_output */
2252        NULL,                               /* config_output */
2253        dpdk_start_output,                  /* start_ouput */
2254        dpdk_fin_input,                     /* fin_input */
2255        dpdk_fin_output,                    /* fin_output */
2256        dpdk_read_packet,                   /* read_packet */
2257        dpdk_prepare_packet,                /* prepare_packet */
2258        dpdk_fin_packet,                    /* fin_packet */
2259        dpdk_write_packet,                  /* write_packet */
2260        NULL,                               /* flush_output */
2261        dpdk_get_link_type,                 /* get_link_type */
2262        dpdk_get_direction,                 /* get_direction */
2263        dpdk_set_direction,                 /* set_direction */
2264        NULL,                               /* get_erf_timestamp */
2265        dpdk_get_timeval,                   /* get_timeval */
2266        dpdk_get_timespec,                  /* get_timespec */
2267        NULL,                               /* get_seconds */
2268        NULL,                               /* seek_erf */
2269        NULL,                               /* seek_timeval */
2270        NULL,                               /* seek_seconds */
2271        NULL,                               /* get_meta_section */
2272        dpdk_get_capture_length,            /* get_capture_length */
2273        dpdk_get_wire_length,               /* get_wire_length */
2274        dpdk_get_framing_length,            /* get_framing_length */
2275        dpdk_set_capture_length,            /* set_capture_length */
2276        NULL,                               /* get_received_packets */
2277        NULL,                               /* get_filtered_packets */
2278        NULL,                               /* get_dropped_packets */
2279        dpdk_get_stats,                     /* get_statistics */
2280        NULL,                               /* get_fd */
2281        dpdk_trace_event,                   /* trace_event */
2282        dpdk_help,                          /* help */
2283        NULL,                               /* next pointer */
2284        {true, 8},                          /* Live, NICs typically have 8 threads */
2285        dpdk_pstart_input,                  /* pstart_input */
2286        dpdk_pread_packets,                 /* pread_packets */
2287        dpdk_pause_input,                   /* ppause */
2288        dpdk_fin_input,                     /* p_fin */
2289        dpdk_pregister_thread,              /* pregister_thread */
2290        dpdk_punregister_thread,            /* punregister_thread */
2291        NULL                                /* get thread stats */
2292};
2293
2294void dpdk_constructor(void) {
2295        register_format(&dpdk);
2296}
Note: See TracBrowser for help on using the repository browser.