source: lib/format_dpdk.c @ 32ee9b2

cachetimestampsdeveloprc-4.0.4ringdecrementfixringperformance
Last change on this file since 32ee9b2 was 32ee9b2, checked in by Shane Alcock <salcock@…>, 3 years ago

Add new trace_flush_output() to public API

Can be used to force a libtrace output to dump any buffered output
to disk immediately.

Note that if the file is compressed or the output trace format
requires a trailer, the flushed file will still not be properly
readable afterwards as this will not result in any trailers
being written. You'll still have to close the file for that.

Mainly this is useful for ensuring that output file sizes grow
over time in situations where the amount of output is relatively
small, rather than staying stuck at 0 bytes until we either reach
1MB of output or the file is closed. For instance, you could have
a timer that calls trace_flush_output() every 30 seconds so that
the output file size will grow if any packets were written in the
last 30 seconds.

  • Property mode set to 100644
File size: 70.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 *
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44#include "format_dpdk.h"
45
46#ifdef HAVE_INTTYPES_H
47#  include <inttypes.h>
48#else
49# error "Can't find inttypes.h"
50#endif
51
52#include <stdlib.h>
53#include <assert.h>
54#include <unistd.h>
55#include <endian.h>
56#include <string.h>
57#include <math.h>
58
59#if HAVE_LIBNUMA
60#include <numa.h>
61#endif
62
63#define MBUF(x) ((struct rte_mbuf *) x)
64/* Get the original placement of the packet data */
65#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
66#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
67#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
68
69#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
70#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
71
72#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
73                        (uint64_t) tv.tv_usec*1000ull)
74#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
75                        (uint64_t) ts.tv_nsec)
76
77#if RTE_PKTMBUF_HEADROOM != 128
78#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
79         "any libtrace instance processing these packet must be have the" \
80         "same RTE_PKTMBUF_HEADROOM set"
81#endif
82
83
84static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
85/* Memory pools Per NUMA node */
86static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
87
88/* Used by both input and output however some fields are not used
89 * for output */
90struct dpdk_format_data_t {
91        int8_t promisc; /* promiscuous mode - RX only */
92        uint8_t paused; /* See paused_state */
93        portid_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
94        portid_t nb_ports; /* Total number of usable ports on system should be 1 */
95        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
96        int snaplen; /* The snap length for the capture - RX only */
97        /* We always have to setup both rx and tx queues even if we don't want them */
98        int nb_rx_buf; /* The number of packet buffers in the rx ring */
99        int nb_tx_buf; /* The number of packet buffers in the tx ring */
100        int nic_numa_node; /* The NUMA node that the NIC is attached to */
101        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
102#if DPDK_USE_BLACKLIST
103        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
104        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
105#endif
106        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
107        enum hasher_types hasher_type;
108        uint8_t *rss_key;
109        /* To improve single-threaded performance we always batch reading
110         * packets, in a burst, otherwise the parallel library does this for us */
111        struct rte_mbuf* burst_pkts[BURST_SIZE];
112        int burst_size; /* The total number read in the burst */
113        int burst_offset; /* The offset we are into the burst */
114
115        /* Our parallel streams */
116        libtrace_list_t *per_stream;
117};
118
119enum dpdk_addt_hdr_flags {
120        INCLUDES_CHECKSUM = 0x1,
121        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
122};
123
124/**
125 * A structure placed in front of the packet where we can store
126 * additional information about the given packet.
127 * +--------------------------+
128 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
129 * +--------------------------+
130 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
131 * +--------------------------+
132 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
133 * +--------------------------+
134 * *   hw_timestamp_82580     * 16 bytes Optional
135 * +--------------------------+
136 * |       Packet data        | Variable Size
137 * |                          |
138 */
139struct dpdk_addt_hdr {
140        uint64_t timestamp;
141        uint8_t flags;
142        uint8_t direction;
143        uint8_t reserved1;
144        uint8_t reserved2;
145        uint32_t cap_len; /* The size to say the capture is */
146};
147
148/**
149 * We want to blacklist all devices except those on the whitelist
150 * (I say list, but yes it is only the one).
151 *
152 * The default behaviour of rte_pci_probe() will map every possible device
153 * to its DPDK driver. The DPDK driver will take the ethernet device
154 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
155 *
156 * So blacklist all devices except the one that we wish to use so that
157 * the others can still be used as standard ethernet ports.
158 *
159 * @return 0 if successful, otherwise -1 on error.
160 */
161#if DPDK_USE_BLACKLIST
162static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
163{
164        struct rte_pci_device *dev = NULL;
165        format_data->nb_blacklist = 0;
166
167        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
168
169        TAILQ_FOREACH(dev, &device_list, next) {
170        if (whitelist != NULL && whitelist->domain == dev->addr.domain
171            && whitelist->bus == dev->addr.bus
172            && whitelist->devid == dev->addr.devid
173            && whitelist->function == dev->addr.function)
174            continue;
175                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
176                                / sizeof (format_data->blacklist[0])) {
177                        fprintf(stderr, "Warning: too many devices to blacklist consider"
178                                        " increasing BLACK_LIST_SIZE");
179                        break;
180                }
181                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
182                ++format_data->nb_blacklist;
183        }
184
185        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
186        return 0;
187}
188#else /* DPDK_USE_BLACKLIST */
189#include <rte_devargs.h>
190static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
191{
192        char pci_str[20] = {0};
193        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
194                 whitelist->domain,
195                 whitelist->bus,
196                 whitelist->devid,
197                 whitelist->function);
198        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
199                return -1;
200        }
201        return 0;
202}
203#endif
204
205/**
206 * Parse the URI format as a pci address
207 * Fills in addr, note core is optional and is unchanged if
208 * a value for it is not provided.
209 *
210 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
211 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
212 */
213static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
214        int matches;
215        assert(str);
216#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
217        matches = sscanf(str, "%8"SCNx32":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
218                         &addr->domain, &addr->bus, &addr->devid,
219                         &addr->function, core);
220#else
221        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
222                         &addr->domain, &addr->bus, &addr->devid,
223                         &addr->function, core);
224#endif
225        if (matches >= 4) {
226                return 0;
227        } else {
228                return -1;
229        }
230}
231
232/**
233 * Convert a pci address to the numa node it is
234 * connected to.
235 *
236 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
237 * so we can call it before DPDK
238 *
239 * @return -1 if unknown otherwise a number 0 or higher of the numa node
240 */
241static int pci_to_numa(struct rte_pci_addr * dev_addr) {
242        char path[50] = {0};
243        FILE *file;
244
245        /* Read from the system */
246        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
247                 dev_addr->domain,
248                 dev_addr->bus,
249                 dev_addr->devid,
250                 dev_addr->function);
251
252        if((file = fopen(path, "r")) != NULL) {
253                int numa_node = -1;
254                fscanf(file, "%d", &numa_node);
255                fclose(file);
256                return numa_node;
257        }
258        return -1;
259}
260
261#if DEBUG
262/* For debugging */
263static inline void dump_configuration()
264{
265        struct rte_config * global_config;
266        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
267
268        if (nb_cpu <= 0) {
269                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
270                       " Falling back to the first core.");
271                nb_cpu = 1; /* fallback to just 1 core */
272        }
273        if (nb_cpu > RTE_MAX_LCORE)
274                nb_cpu = RTE_MAX_LCORE;
275
276        global_config = rte_eal_get_configuration();
277
278        if (global_config != NULL) {
279                int i;
280                fprintf(stderr, "Intel DPDK setup\n"
281                        "---Version      : %s\n"
282                        "---Master LCore : %"PRIu32"\n"
283                        "---LCore Count  : %"PRIu32"\n",
284                        rte_version(),
285                        global_config->master_lcore, global_config->lcore_count);
286
287                for (i = 0 ; i < nb_cpu; i++) {
288                        fprintf(stderr, "   ---Core %d : %s\n", i,
289                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
290                }
291
292                const char * proc_type;
293                switch (global_config->process_type) {
294                case RTE_PROC_AUTO:
295                        proc_type = "auto";
296                        break;
297                case RTE_PROC_PRIMARY:
298                        proc_type = "primary";
299                        break;
300                case RTE_PROC_SECONDARY:
301                        proc_type = "secondary";
302                        break;
303                case RTE_PROC_INVALID:
304                        proc_type = "invalid";
305                        break;
306                default:
307                        proc_type = "something worse than invalid!!";
308                }
309                fprintf(stderr, "---Process Type : %s\n", proc_type);
310        }
311
312}
313#endif
314
315/**
316 * Expects to be called from the master lcore and moves it to the given dpdk id
317 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
318 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
319 *               and not already in use.
320 * @return 0 is successful otherwise -1 on error.
321 */
322static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
323        struct rte_config *cfg = rte_eal_get_configuration();
324        cpu_set_t cpuset;
325        int i;
326
327        assert (core < RTE_MAX_LCORE);
328        assert (rte_get_master_lcore() == rte_lcore_id());
329
330        if (core == rte_lcore_id())
331                return 0;
332
333        /* Make sure we are not overwriting someone else */
334        assert(!rte_lcore_is_enabled(core));
335
336        /* Move the core */
337        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
338        cfg->lcore_role[core] = ROLE_RTE;
339        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
340        rte_eal_get_configuration()->master_lcore = core;
341        RTE_PER_LCORE(_lcore_id) = core;
342
343        /* Now change the affinity, either mapped to a single core or all accepted */
344        CPU_ZERO(&cpuset);
345
346        if (lcore_config[core].detected) {
347                CPU_SET(core, &cpuset);
348        } else {
349                for (i = 0; i < RTE_MAX_LCORE; ++i) {
350                        if (lcore_config[i].detected)
351                                CPU_SET(i, &cpuset);
352                }
353        }
354
355        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
356        if (i != 0) {
357                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
358                return -1;
359        }
360        return 0;
361}
362
363/**
364 * XXX This is very bad XXX
365 * But we have to do something to allow getopts nesting
366 * Luckly normally the format is last so it doesn't matter
367 * DPDK only supports modern systems so hopefully this
368 * will continue to work
369 */
370struct saved_getopts {
371        char *optarg;
372        int optind;
373        int opterr;
374        int optopt;
375};
376
377static void save_getopts(struct saved_getopts *opts) {
378        opts->optarg = optarg;
379        opts->optind = optind;
380        opts->opterr = opterr;
381        opts->optopt = optopt;
382}
383
384static void restore_getopts(struct saved_getopts *opts) {
385        optarg = opts->optarg;
386        optind = opts->optind;
387        opterr = opts->opterr;
388        optopt = opts->optopt;
389}
390
391static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
392                                        char * err, int errlen) {
393        int ret; /* Returned error codes */
394        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
395        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
396        char mem_map[20] = {0}; /* The memory name */
397        long nb_cpu; /* The number of CPUs in the system */
398        long my_cpu; /* The CPU number we want to bind to */
399        int i;
400        struct rte_config *cfg = rte_eal_get_configuration();
401        struct saved_getopts save_opts;
402
403        /* This initialises the Environment Abstraction Layer (EAL)
404         * If we had slave workers these are put into WAITING state
405         *
406         * Basically binds this thread to a fixed core, which we choose as
407         * the last core on the machine (assuming fewer interrupts mapped here).
408         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
409         * "-n" the number of memory channels into the CPU (hardware specific)
410         *      - Most likely to be half the number of ram slots in your machine.
411         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
412         * Controls where in memory packets are stored such that they are spread
413         * across the channels. We just use 1 to be safe.
414         *
415         * Using unique file prefixes mean separate memory is used, unlinking
416         * the two processes. However be careful we still cannot access a
417         * port that already in use.
418         */
419        char* argv[] = {"libtrace",
420                        "-c", cpu_number,
421                        "-n", "1",
422                        "--proc-type", "auto",
423                        "--file-prefix", mem_map,
424                        "-m", "512",
425#if DPDK_USE_LOG_LEVEL
426#       if DEBUG
427                        "--log-level", "8", /* RTE_LOG_DEBUG */
428#       else
429                        "--log-level", "5", /* RTE_LOG_WARNING */
430#       endif
431#endif
432                        NULL};
433        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
434
435#if DEBUG
436        rte_log_set_global_level(RTE_LOG_DEBUG);
437#else
438        rte_log_set_global_level(RTE_LOG_WARNING);
439#endif
440
441        /* Get the number of cpu cores in the system and use the last core
442         * on the correct numa node */
443        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
444        if (nb_cpu <= 0) {
445                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
446                       " Falling back to the first core.");
447                nb_cpu = 1; /* fallback to the first core */
448        }
449        if (nb_cpu > RTE_MAX_LCORE)
450                nb_cpu = RTE_MAX_LCORE;
451
452        my_cpu = -1;
453        /* This allows the user to specify the core - we would try to do this
454         * automatically but it's hard to tell that this is secondary
455         * before running rte_eal_init(...). Currently we are limited to 1
456         * instance per core due to the way memory is allocated. */
457        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
458                snprintf(err, errlen, "Failed to parse URI");
459                return -1;
460        }
461
462#if HAVE_LIBNUMA
463        format_data->nic_numa_node = pci_to_numa(&use_addr);
464        if (my_cpu < 0) {
465#if DEBUG
466                /* If we can assign to a core on the same numa node */
467                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
468#endif
469                if(format_data->nic_numa_node >= 0) {
470                        int max_node_cpu = -1;
471                        struct bitmask *mask = numa_allocate_cpumask();
472                        assert(mask);
473                        numa_node_to_cpus(format_data->nic_numa_node, mask);
474                        for (i = 0 ; i < nb_cpu; ++i) {
475                                if (numa_bitmask_isbitset(mask,i))
476                                        max_node_cpu = i+1;
477                        }
478                        my_cpu = max_node_cpu;
479                }
480        }
481#endif
482        if (my_cpu < 0) {
483                my_cpu = nb_cpu;
484        }
485
486
487        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
488                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
489
490        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
491                snprintf(err, errlen,
492                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
493                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
494                return -1;
495        }
496
497        /* Make our mask with all cores turned on this is so that DPDK
498         * gets all CPU info in older versions */
499        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
500        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
501
502#if !DPDK_USE_BLACKLIST
503        /* Black list all ports besides the one that we want to use */
504        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
505                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
506                         " are you sure the address is correct?: %s", strerror(-ret));
507                return -1;
508        }
509#endif
510
511        /* Give the memory map a unique name */
512        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
513        /* rte_eal_init it makes a call to getopt so we need to reset the
514         * global optind variable of getopt otherwise this fails */
515        save_getopts(&save_opts);
516        optind = 1;
517        if ((ret = rte_eal_init(argc, argv)) < 0) {
518                snprintf(err, errlen,
519                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
520                return -1;
521        }
522        restore_getopts(&save_opts);
523        // These are still running but will never do anything with DPDK v1.7 we
524        // should remove this XXX in the future
525        for(i = 0; i < RTE_MAX_LCORE; ++i) {
526                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
527                        cfg->lcore_role[i] = ROLE_OFF;
528                        cfg->lcore_count--;
529                }
530        }
531        // Only the master should be running
532        assert(cfg->lcore_count == 1);
533
534        // TODO XXX TODO
535        dpdk_move_master_lcore(NULL, my_cpu-1);
536
537#if DEBUG
538        dump_configuration();
539#endif
540
541#if DPDK_USE_PMD_INIT
542        /* This registers all available NICs with Intel DPDK
543         * These are not loaded until rte_eal_pci_probe() is called.
544         */
545        if ((ret = rte_pmd_init_all()) < 0) {
546                snprintf(err, errlen,
547                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
548                return -1;
549        }
550#endif
551
552#if DPDK_USE_BLACKLIST
553        /* Blacklist all ports besides the one that we want to use */
554        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
555                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
556                         " are you sure the address is correct?: %s", strerror(-ret));
557                return -1;
558        }
559#endif
560
561#if DPDK_USE_PCI_PROBE
562        /* This loads DPDK drivers against all ports that are not blacklisted */
563        if ((ret = rte_eal_pci_probe()) < 0) {
564                snprintf(err, errlen,
565                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
566                return -1;
567        }
568#endif
569
570        format_data->nb_ports = rte_eth_dev_count();
571
572        if (format_data->nb_ports != 1) {
573                snprintf(err, errlen,
574                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
575                         format_data->nb_ports);
576                return -1;
577        }
578
579        return 0;
580}
581
582int dpdk_init_input (libtrace_t *libtrace) {
583        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
584        char err[500];
585        err[0] = 0;
586
587        libtrace->format_data = (struct dpdk_format_data_t *)
588                                malloc(sizeof(struct dpdk_format_data_t));
589        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
590        FORMAT(libtrace)->nb_ports = 0;
591        FORMAT(libtrace)->snaplen = 0; /* Use default */
592        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
593        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
594        FORMAT(libtrace)->nic_numa_node = -1;
595        FORMAT(libtrace)->promisc = -1;
596        FORMAT(libtrace)->pktmbuf_pool = NULL;
597#if DPDK_USE_BLACKLIST
598        FORMAT(libtrace)->nb_blacklist = 0;
599#endif
600        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
601        FORMAT(libtrace)->mempool_name[0] = 0;
602        memset(FORMAT(libtrace)->burst_pkts, 0,
603               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
604        FORMAT(libtrace)->burst_size = 0;
605        FORMAT(libtrace)->burst_offset = 0;
606        FORMAT(libtrace)->hasher_type = HASHER_BALANCE;
607        FORMAT(libtrace)->rss_key = NULL;
608
609        /* Make our first stream */
610        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
611        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
612
613        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
614                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
615                free(libtrace->format_data);
616                libtrace->format_data = NULL;
617                return -1;
618        }
619        return 0;
620}
621
622static int dpdk_init_output(libtrace_out_t *libtrace)
623{
624        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
625        char err[500];
626        err[0] = 0;
627
628        libtrace->format_data = (struct dpdk_format_data_t *)
629                                malloc(sizeof(struct dpdk_format_data_t));
630        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
631        FORMAT(libtrace)->nb_ports = 0;
632        FORMAT(libtrace)->snaplen = 0; /* Use default */
633        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
634        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
635        FORMAT(libtrace)->nic_numa_node = -1;
636        FORMAT(libtrace)->promisc = -1;
637        FORMAT(libtrace)->pktmbuf_pool = NULL;
638#if DPDK_USE_BLACKLIST
639        FORMAT(libtrace)->nb_blacklist = 0;
640#endif
641        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
642        FORMAT(libtrace)->mempool_name[0] = 0;
643        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
644        FORMAT(libtrace)->burst_size = 0;
645        FORMAT(libtrace)->burst_offset = 0;
646
647        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
648        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
649
650        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
651                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
652                free(libtrace->format_data);
653                libtrace->format_data = NULL;
654                return -1;
655        }
656        return 0;
657}
658
659/**
660 * Note here snaplen excludes the MAC checksum. Packets over
661 * the requested snaplen will be dropped. (Excluding MAC checksum)
662 *
663 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
664 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
665 * is set the maximum size of the returned packet would be 1518 otherwise
666 * 1514 would be the largest size possibly returned.
667 *
668 */
669int dpdk_config_input (libtrace_t *libtrace,
670                              trace_option_t option,
671                              void *data) {
672        switch (option) {
673        case TRACE_OPTION_SNAPLEN:
674                /* Only support changing snaplen before a call to start is
675                 * made */
676                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
677                        FORMAT(libtrace)->snaplen=*(int*)data;
678                else
679                        return -1;
680                return 0;
681        case TRACE_OPTION_PROMISC:
682                FORMAT(libtrace)->promisc=*(int*)data;
683                return 0;
684        case TRACE_OPTION_HASHER:
685                switch (*((enum hasher_types *) data))
686                {
687                case HASHER_BALANCE:
688                case HASHER_UNIDIRECTIONAL:
689                case HASHER_BIDIRECTIONAL:
690                        FORMAT(libtrace)->hasher_type = *(enum hasher_types*)data;
691                        if (FORMAT(libtrace)->rss_key)
692                                free(FORMAT(libtrace)->rss_key);
693                        FORMAT(libtrace)->rss_key = NULL;
694                        return 0;
695                case HASHER_CUSTOM:
696                        // Let libtrace do this
697                        return -1;
698                }
699                break;
700        case TRACE_OPTION_FILTER:
701                /* TODO filtering */
702        case TRACE_OPTION_META_FREQ:
703        case TRACE_OPTION_EVENT_REALTIME:
704                break;
705        /* Avoid default: so that future options will cause a warning
706         * here to remind us to implement it, or flag it as
707         * unimplementable
708         */
709        }
710
711        /* Don't set an error - trace_config will try to deal with the
712         * option and will set an error if it fails */
713        return -1;
714}
715
716/* Can set jumbo frames/ or limit the size of a frame by setting both
717 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
718 *
719 */
720static struct rte_eth_conf port_conf = {
721        .rxmode = {
722                .mq_mode = ETH_RSS,
723                .split_hdr_size = 0,
724                .header_split   = 0, /**< Header Split disabled */
725                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
726                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
727                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
728                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
729#if GET_MAC_CRC_CHECKSUM
730/* So it appears that if hw_strip_crc is turned off the driver will still
731 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
732 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
733 * So lets just add it back on when we receive the packet.
734 */
735                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
736#else
737/* By default strip the MAC checksum because it's a bit of a hack to
738 * actually read these. And don't want to rely on disabling this to actualy
739 * always cut off the checksum in the future
740 */
741                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
742#endif
743        },
744        .txmode = {
745                .mq_mode = ETH_DCB_NONE,
746        },
747        .rx_adv_conf = {
748                .rss_conf = {
749                        .rss_hf = RX_RSS_FLAGS,
750                },
751        },
752        .intr_conf = {
753                .lsc = 1
754        }
755};
756
757static const struct rte_eth_rxconf rx_conf = {
758        .rx_thresh = {
759                .pthresh = 8,/* RX_PTHRESH prefetch */
760                .hthresh = 8,/* RX_HTHRESH host */
761                .wthresh = 4,/* RX_WTHRESH writeback */
762        },
763        .rx_free_thresh = 0,
764        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
765};
766
767static const struct rte_eth_txconf tx_conf = {
768        .tx_thresh = {
769                /*
770                 * TX_PTHRESH prefetch
771                 * Set on the NIC, if the number of unprocessed descriptors to queued on
772                 * the card fall below this try grab at least hthresh more unprocessed
773                 * descriptors.
774                 */
775                .pthresh = 36,
776
777                /* TX_HTHRESH host
778                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
779                 */
780                .hthresh = 0,
781
782                /* TX_WTHRESH writeback
783                 * Set on the NIC, the number of sent descriptors before writing back
784                 * status to confirm the transmission. This is done more efficiently as
785                 * a bulk DMA-transfer rather than writing one at a time.
786                 * Similar to tx_free_thresh however this is applied to the NIC, where
787                 * as tx_free_thresh is when DPDK will check these. This is extended
788                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
789                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
790                 */
791                .wthresh = 4,
792        },
793
794        /* Used internally by DPDK rather than passed to the NIC. The number of
795         * packet descriptors to send before checking for any responses written
796         * back (to confirm the transmission). Default = 32 if set to 0)
797         */
798        .tx_free_thresh = 0,
799
800        /* This is the Report Status threshold, used by 10Gbit cards,
801         * This signals the card to only write back status (such as
802         * transmission successful) after this minimum number of transmit
803         * descriptors are seen. The default is 32 (if set to 0) however if set
804         * to greater than 1 TX wthresh must be set to zero, because this is kindof
805         * a replacement. See the dpdk programmers guide for more restrictions.
806         */
807        .tx_rs_thresh = 1,
808};
809
810/**
811 * A callback for a link state change (LSC).
812 *
813 * Packets may be received before this notification. In fact the DPDK IGXBE
814 * driver likes to put a delay upto 5sec before sending this.
815 *
816 * We use this to ensure the link speed is correct for our timestamp
817 * calculations. Because packets might be received before the link up we still
818 * update this when the packet is received.
819 *
820 * @param port The DPDK port
821 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
822 * @param cb_arg The dpdk_format_data_t structure associated with the format
823 */
824#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
825static int dpdk_lsc_callback(portid_t port, enum rte_eth_event_type event,
826                              void *cb_arg, void *retparam UNUSED) {
827#else
828static void dpdk_lsc_callback(portid_t port, enum rte_eth_event_type event,
829                              void *cb_arg) {
830#endif
831        struct dpdk_format_data_t * format_data = cb_arg;
832        struct rte_eth_link link_info;
833        assert(event == RTE_ETH_EVENT_INTR_LSC);
834        assert(port == format_data->port);
835
836        rte_eth_link_get_nowait(port, &link_info);
837
838        if (link_info.link_status)
839                format_data->link_speed = link_info.link_speed;
840        else
841                format_data->link_speed = 0;
842
843#if DEBUG
844        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
845                link_info.link_status ? "up" : "down",
846                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
847                                          "full-duplex" : "half-duplex",
848                (int) link_info.link_speed);
849#endif
850
851        /* Turns out DPDK drivers might not come back up if the link speed
852         * changes. So we reset the autoneg procedure. This is very unsafe
853         * we have have threads reading packets and we stop the port. */
854#if 0
855        if (!link_info.link_status) {
856                int ret;
857                rte_eth_dev_stop(port);
858                ret = rte_eth_dev_start(port);
859                if (ret < 0) {
860                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
861                                strerror(-ret));
862                }
863        }
864#endif
865#if RTE_VERSION >= RTE_VERSION_NUM(17, 8, 0, 1)
866        return 0;
867#endif
868}
869
870/** Reserve a DPDK lcore ID for a thread globally.
871 *
872 * @param real If true allocate a real lcore, otherwise allocate a core which
873 * does not exist on the local machine.
874 * @param socket the prefered NUMA socket - only used if a real core is requested
875 * @return a valid core, which can later be used with dpdk_register_lcore() or a
876 * -1 if have run out of cores.
877 *
878 * If any thread is reading or freeing packets we need to register it here
879 * due to TLS caches in the memory pools.
880 */
881static int dpdk_reserve_lcore(bool real, int socket) {
882        int new_id = -1;
883        int i;
884        struct rte_config *cfg = rte_eal_get_configuration();
885        (void) socket;
886
887        pthread_mutex_lock(&dpdk_lock);
888        /* If 'reading packets' fill in cores from 0 up and bind affinity
889         * otherwise start from the MAX core (which is also the master) and work backwards
890         * in this case physical cores on the system will not exist so we don't bind
891         * these to any particular physical core */
892        if (real) {
893#if HAVE_LIBNUMA
894                for (i = 0; i < RTE_MAX_LCORE; ++i) {
895                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
896                                new_id = i;
897                                if (!lcore_config[i].detected)
898                                        new_id = -1;
899                                break;
900                        }
901                }
902#endif
903                /* Retry without the the numa restriction */
904                if (new_id == -1) {
905                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
906                                if (!rte_lcore_is_enabled(i)) {
907                                        new_id = i;
908                                        if (!lcore_config[i].detected)
909                                                fprintf(stderr, "Warning the"
910                                                        " number of 'reading' "
911                                                        "threads exceed cores\n");
912                                        break;
913                                }
914                        }
915                }
916        } else {
917                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
918                        if (!rte_lcore_is_enabled(i)) {
919                                new_id = i;
920                                break;
921                        }
922                }
923        }
924
925        if (new_id != -1) {
926                /* Enable the core in global DPDK structs */
927                cfg->lcore_role[new_id] = ROLE_RTE;
928                cfg->lcore_count++;
929        }
930
931        pthread_mutex_unlock(&dpdk_lock);
932        return new_id;
933}
934
935/** Register a thread as a lcore
936 * @param libtrace any error is set against libtrace on exit
937 * @param real If this is a true lcore we will bind its affinty to the
938 * requested core.
939 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
940 * @return 0, if successful otherwise -1 if an error occured (details are stored
941 * in libtrace)
942 *
943 * @note This must be called from the thread being registered.
944 */
945static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
946        int ret;
947        RTE_PER_LCORE(_lcore_id) = lcore;
948
949        /* Set affinity bind to corresponding core */
950        if (real) {
951                cpu_set_t cpuset;
952                CPU_ZERO(&cpuset);
953                CPU_SET(rte_lcore_id(), &cpuset);
954                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
955                if (ret != 0) {
956                        trace_set_err(libtrace, errno, "Warning "
957                                      "pthread_setaffinity_np failed");
958                        return -1;
959                }
960        }
961
962        return 0;
963}
964
965/** Allocates a new dpdk packet buffer memory pool.
966 *
967 * @param n The number of threads
968 * @param pkt_size The packet size we need ot store
969 * @param socket_id The NUMA socket id
970 * @param A new mempool, if NULL query the DPDK library for the error code
971 * see rte_mempool_create() documentation.
972 *
973 * This allocates a new pool or recycles an existing memory pool.
974 * Call dpdk_free_memory() to free the memory.
975 * We cannot delete memory so instead we store the pools, allowing them to be
976 * re-used.
977 */
978static struct rte_mempool *dpdk_alloc_memory(unsigned n,
979                                             unsigned pkt_size,
980                                             int socket_id) {
981        struct rte_mempool *ret;
982        size_t j,k;
983        char name[MEMPOOL_NAME_LEN];
984
985        /* Add on packet size overheads */
986        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
987
988        pthread_mutex_lock(&dpdk_lock);
989
990        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
991                /* Best guess go for zero */
992                socket_id = 0;
993        }
994
995        /* Find a valid pool */
996        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
997                if (mem_pools[socket_id][j]->size >= n &&
998                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
999                        break;
1000                }
1001        }
1002
1003        /* Find the end (+1) of the list */
1004        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1005
1006        if (mem_pools[socket_id][j]) {
1007                ret = mem_pools[socket_id][j];
1008                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1009                mem_pools[socket_id][k-1] = NULL;
1010                mem_pools[socket_id][j] = NULL;
1011        } else {
1012                static uint32_t test = 10;
1013                test++;
1014                snprintf(name, MEMPOOL_NAME_LEN,
1015                         "libtrace_pool_%"PRIu32, test);
1016
1017                ret = rte_mempool_create(name, n, pkt_size,
1018                                         128, sizeof(struct rte_pktmbuf_pool_private),
1019                                         rte_pktmbuf_pool_init, NULL,
1020                                         rte_pktmbuf_init, NULL,
1021                                         socket_id, 0);
1022        }
1023
1024        pthread_mutex_unlock(&dpdk_lock);
1025        return ret;
1026}
1027
1028/** Stores the memory against the DPDK library.
1029 *
1030 * @param mempool The mempool to free
1031 * @param socket_id The NUMA socket this mempool was allocated upon.
1032 *
1033 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1034 * store the memory shared globally against the format.
1035 */
1036static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1037        size_t i;
1038        pthread_mutex_lock(&dpdk_lock);
1039
1040        /* We should have all entries back in the mempool */
1041        rte_mempool_audit(mempool);
1042        if (!rte_mempool_full(mempool)) {
1043                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1044                        "free all packets before finishing a trace\n",
1045                        rte_mempool_avail_count(mempool), mempool->size);
1046        }
1047
1048        /* Find the end (+1) of the list */
1049        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1050
1051        if (i >= RTE_MAX_LCORE) {
1052                fprintf(stderr, "Too many memory pools, dropping this one\n");
1053        } else {
1054                mem_pools[socket_id][i] = mempool;
1055        }
1056
1057        pthread_mutex_unlock(&dpdk_lock);
1058}
1059
1060/* Attach memory to the port and start (or restart) the port/s.
1061 */
1062static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1063                              char *err, int errlen, uint16_t rx_queues) {
1064        int ret, i;
1065        struct rte_eth_link link_info; /* Wait for link */
1066        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1067
1068        /* Already started */
1069        if (format_data->paused == DPDK_RUNNING)
1070                return 0;
1071
1072        /* First time started we need to alloc our memory, doing this here
1073         * rather than in environment setup because we don't have snaplen then */
1074        if (format_data->paused == DPDK_NEVER_STARTED) {
1075                if (format_data->snaplen == 0) {
1076                        format_data->snaplen = RX_MBUF_SIZE;
1077                        port_conf.rxmode.jumbo_frame = 0;
1078                        port_conf.rxmode.max_rx_pkt_len = 0;
1079                } else {
1080                        double expn;
1081
1082                        /* Use jumbo frames */
1083                        port_conf.rxmode.jumbo_frame = 1;
1084                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1085
1086                        /* Use less buffers if we're supporting jumbo frames
1087                         * otherwise we won't be able to allocate memory.
1088                         */
1089                        if (format_data->snaplen > 1500) {
1090                                format_data->nb_rx_buf /= 2;
1091                        }
1092
1093                        /* snaplen should be rounded up to next power of two
1094                         * to ensure enough memory is allocated for each
1095                         * mbuf :(
1096                         */
1097                        expn = ceil(log2((double)(format_data->snaplen)));
1098                        format_data->snaplen = pow(2, (int)expn);
1099                }
1100
1101#if GET_MAC_CRC_CHECKSUM
1102                /* This is additional overhead so make sure we allow space for this */
1103                format_data->snaplen += ETHER_CRC_LEN;
1104#endif
1105#if HAS_HW_TIMESTAMPS_82580
1106                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1107#endif
1108
1109                /* Create the mbuf pool, which is the place packets are allocated
1110                 * from - There is no free function (I cannot see one).
1111                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1112                 * allocate however that extra 1 packet is not used.
1113                 * (I assume <= vs < error some where in DPDK code)
1114                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1115                 * so that will fill the new buffer and wait until slots in the
1116                 * ring become available.
1117                 */
1118#if DEBUG
1119                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1120#endif
1121                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1122                                                              format_data->snaplen,
1123                                                              format_data->nic_numa_node);
1124
1125                if (format_data->pktmbuf_pool == NULL) {
1126                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1127                                 "pool failed: %s", strerror(rte_errno));
1128                        return -1;
1129                }
1130        }
1131
1132        /* Generate the hash key, based on the device */
1133        uint8_t rss_size = 52; // 52 for i40e, 40 for others, use the largest by default
1134        // In new versions DPDK we can query the size
1135#if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
1136        struct rte_eth_dev_info dev_info;
1137        rte_eth_dev_info_get(format_data->port, &dev_info);
1138        rss_size = dev_info.hash_key_size;
1139#endif
1140        if (rss_size != 0) {
1141                format_data->rss_key = malloc(rss_size);
1142                if (format_data->hasher_type == HASHER_BIDIRECTIONAL) {
1143                        toeplitz_ncreate_bikey(format_data->rss_key, rss_size);
1144                } else {
1145                        toeplitz_ncreate_unikey(format_data->rss_key, rss_size);
1146                }
1147                port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1148#if RTE_VERSION >= RTE_VERSION_NUM(1, 7, 0, 1)
1149                port_conf.rx_adv_conf.rss_conf.rss_key_len = rss_size;
1150#endif
1151        } else {
1152                fprintf(stderr, "DPDK couldn't configure RSS hashing!");
1153        }
1154
1155        /* ----------- Now do the setup for the port mapping ------------ */
1156        /* Order of calls must be
1157         * rte_eth_dev_configure()
1158         * rte_eth_tx_queue_setup()
1159         * rte_eth_rx_queue_setup()
1160         * rte_eth_dev_start()
1161         * other rte_eth calls
1162         */
1163
1164        /* This must be called first before another *eth* function
1165         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1166        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1167        if (ret < 0) {
1168                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1169                         " %"PRIu8" : %s", format_data->port,
1170                         strerror(-ret));
1171                return -1;
1172        }
1173#if DEBUG
1174        fprintf(stderr, "Doing dev configure\n");
1175#endif
1176        /* Initialise the TX queue a minimum value if using this port for
1177         * receiving. Otherwise a larger size if writing packets.
1178         */
1179        ret = rte_eth_tx_queue_setup(format_data->port,
1180                                     0 /* queue XXX */,
1181                                     format_data->nb_tx_buf,
1182                                     SOCKET_ID_ANY,
1183                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1184        if (ret < 0) {
1185                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1186                         " on port %d : %s", (int)format_data->port,
1187                         strerror(-ret));
1188                return -1;
1189        }
1190
1191        /* Attach memory to our RX queues */
1192        for (i=0; i < rx_queues; i++) {
1193                dpdk_per_stream_t *stream;
1194#if DEBUG
1195                fprintf(stderr, "Configuring queue %d\n", i);
1196#endif
1197
1198                /* Add storage for the stream */
1199                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1200                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1201                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1202                stream->queue_id = i;
1203
1204                if (stream->lcore == -1)
1205                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1206
1207                if (stream->lcore == -1) {
1208                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1209                                 ". Too many threads?");
1210                        return -1;
1211                }
1212
1213                if (stream->mempool == NULL) {
1214                        stream->mempool = dpdk_alloc_memory(
1215                                                  format_data->nb_rx_buf*2,
1216                                                  format_data->snaplen,
1217                                                  rte_lcore_to_socket_id(stream->lcore));
1218
1219                        if (stream->mempool == NULL) {
1220                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1221                                         "pool failed: %s", strerror(rte_errno));
1222                                return -1;
1223                        }
1224                }
1225
1226                /* Initialise the RX queue with some packets from memory */
1227                ret = rte_eth_rx_queue_setup(format_data->port,
1228                                             stream->queue_id,
1229                                             format_data->nb_rx_buf,
1230                                             format_data->nic_numa_node,
1231                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1232                                             stream->mempool);
1233                if (ret < 0) {
1234                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1235                                 " RX queue on port %d : %s",
1236                                 (int)format_data->port,
1237                                 strerror(-ret));
1238                        return -1;
1239                }
1240        }
1241
1242#if DEBUG
1243        fprintf(stderr, "Doing start device\n");
1244#endif
1245        rte_eth_stats_reset(format_data->port);
1246        /* Start device */
1247        ret = rte_eth_dev_start(format_data->port);
1248        if (ret < 0) {
1249                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1250                         strerror(-ret));
1251                return -1;
1252        }
1253
1254        /* Default promiscuous to on */
1255        if (format_data->promisc == -1)
1256                format_data->promisc = 1;
1257
1258        if (format_data->promisc == 1)
1259                rte_eth_promiscuous_enable(format_data->port);
1260        else
1261                rte_eth_promiscuous_disable(format_data->port);
1262
1263        /* We have now successfully started/unpased */
1264        format_data->paused = DPDK_RUNNING;
1265
1266
1267        /* Register a callback for link state changes */
1268        ret = rte_eth_dev_callback_register(format_data->port,
1269                                            RTE_ETH_EVENT_INTR_LSC,
1270                                            dpdk_lsc_callback,
1271                                            format_data);
1272#if DEBUG
1273        if (ret)
1274                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1275                        ret, strerror(-ret));
1276#endif
1277
1278        /* Get the current link status */
1279        rte_eth_link_get_nowait(format_data->port, &link_info);
1280        format_data->link_speed = link_info.link_speed;
1281#if DEBUG
1282        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1283                (int) link_info.link_duplex, (int) link_info.link_speed);
1284#endif
1285
1286        return 0;
1287}
1288
1289int dpdk_start_input (libtrace_t *libtrace) {
1290        char err[500];
1291        err[0] = 0;
1292
1293        /* Make sure we don't reserve an extra thread for this */
1294        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1295
1296        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1297                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1298                free(libtrace->format_data);
1299                libtrace->format_data = NULL;
1300                return -1;
1301        }
1302        return 0;
1303}
1304
1305static inline size_t dpdk_get_max_rx_queues (portid_t port_id) {
1306        struct rte_eth_dev_info dev_info;
1307        rte_eth_dev_info_get(port_id, &dev_info);
1308        return dev_info.max_rx_queues;
1309}
1310
1311static inline size_t dpdk_processor_count () {
1312        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1313        if (nb_cpu <= 0)
1314                return 1;
1315        else
1316                return (size_t) nb_cpu;
1317}
1318
1319int dpdk_pstart_input (libtrace_t *libtrace) {
1320        char err[500];
1321        int i=0, phys_cores=0;
1322        int tot = libtrace->perpkt_thread_count;
1323        libtrace_list_node_t *n;
1324        err[0] = 0;
1325
1326        if (rte_lcore_id() != rte_get_master_lcore())
1327                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1328                        " from the master DPDK thread!\n");
1329
1330        /* If the master is not on the last thread we move it there */
1331        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1332                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1333                        return -1;
1334        }
1335
1336        /* Don't exceed the number of cores in the system/detected by dpdk
1337         * We don't have to force this but performance wont be good if we don't */
1338        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1339                if (lcore_config[i].detected) {
1340                        if (rte_lcore_is_enabled(i)) {
1341#if DEBUG
1342                                fprintf(stderr, "Found core %d already in use!\n", i);
1343#endif
1344                        } else {
1345                                phys_cores++;
1346                        }
1347                }
1348        }
1349        /* If we are restarting we have already allocated some threads as such
1350         * we add these back to the count for this calculation */
1351        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1352                dpdk_per_stream_t * stream = n->data;
1353                if (stream->lcore != -1)
1354                        phys_cores++;
1355        }
1356
1357        tot = MIN(libtrace->perpkt_thread_count,
1358                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1359        tot = MIN(tot, phys_cores);
1360
1361#if DEBUG
1362        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1363                libtrace->perpkt_thread_count, phys_cores);
1364#endif
1365
1366        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1367                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1368                free(libtrace->format_data);
1369                libtrace->format_data = NULL;
1370                return -1;
1371        }
1372
1373        /* Make sure we only start the number that we should */
1374        libtrace->perpkt_thread_count = tot;
1375        return 0;
1376}
1377
1378/**
1379 * Register a thread with the DPDK system,
1380 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1381 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1382 * gives it.
1383 *
1384 * We then allow a mapper thread to be started on every real core as DPDK would,
1385 * we also bind these to the corresponding CPU cores.
1386 *
1387 * @param libtrace A pointer to the trace
1388 * @param reading True if the thread will be used to read packets, i.e. will
1389 *                call pread_packet(), false if thread used to process packet
1390 *                in any other manner including statistics functions.
1391 */
1392int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1393{
1394#if DEBUG
1395        char name[99];
1396        name[0] = 0;
1397#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1398        pthread_getname_np(pthread_self(),
1399                           name, sizeof(name));
1400#endif
1401#endif
1402        if (reading) {
1403                dpdk_per_stream_t *stream;
1404                /* Attach our thread */
1405                if(t->type == THREAD_PERPKT) {
1406                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1407                        if (t->format_data == NULL) {
1408                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1409                                              "Too many threads registered");
1410                                return -1;
1411                        }
1412                } else {
1413                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1414                }
1415                stream = t->format_data;
1416#if DEBUG
1417                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1418#endif
1419                return dpdk_register_lcore(libtrace, true, stream->lcore);
1420        } else {
1421                int lcore = dpdk_reserve_lcore(reading, 0);
1422                if (lcore == -1) {
1423                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1424                                      " for DPDK");
1425                        return -1;
1426                }
1427#if DEBUG
1428                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1429#endif
1430                return dpdk_register_lcore(libtrace, false, lcore);
1431        }
1432
1433        return 0;
1434}
1435
1436/**
1437 * Unregister a thread with the DPDK system.
1438 *
1439 * Only previously registered threads should be calling this just before
1440 * they are destroyed.
1441 */
1442void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1443{
1444        struct rte_config *cfg = rte_eal_get_configuration();
1445
1446        assert(rte_lcore_id() < RTE_MAX_LCORE);
1447        pthread_mutex_lock(&dpdk_lock);
1448        /* Skip if master */
1449        if (rte_lcore_id() == rte_get_master_lcore()) {
1450                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1451                pthread_mutex_unlock(&dpdk_lock);
1452                return;
1453        }
1454
1455        /* Disable this core in global DPDK structs */
1456        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1457        cfg->lcore_count--;
1458        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1459        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1460        pthread_mutex_unlock(&dpdk_lock);
1461        return;
1462}
1463
1464static int dpdk_start_output(libtrace_out_t *libtrace)
1465{
1466        char err[500];
1467        err[0] = 0;
1468
1469        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1470                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1471                free(libtrace->format_data);
1472                libtrace->format_data = NULL;
1473                return -1;
1474        }
1475        return 0;
1476}
1477
1478int dpdk_pause_input(libtrace_t * libtrace) {
1479        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1480        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1481        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1482#if DEBUG
1483                fprintf(stderr, "Pausing DPDK port\n");
1484#endif
1485                rte_eth_dev_stop(FORMAT(libtrace)->port);
1486                FORMAT(libtrace)->paused = DPDK_PAUSED;
1487                /* Empty the queue of packets */
1488                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1489                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1490                }
1491                FORMAT(libtrace)->burst_offset = 0;
1492                FORMAT(libtrace)->burst_size = 0;
1493
1494                for (; tmp != NULL; tmp = tmp->next) {
1495                        dpdk_per_stream_t *stream = tmp->data;
1496                        stream->ts_last_sys = 0;
1497#if HAS_HW_TIMESTAMPS_82580
1498                        stream->ts_first_sys = 0;
1499#endif
1500                }
1501
1502        }
1503        return 0;
1504}
1505
1506static int dpdk_write_packet(libtrace_out_t *trace,
1507                             libtrace_packet_t *packet){
1508        struct rte_mbuf* m_buff[1];
1509
1510        int wirelen = trace_get_wire_length(packet);
1511        int caplen = trace_get_capture_length(packet);
1512
1513        /* Check for a checksum and remove it */
1514        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1515            wirelen == caplen)
1516                caplen -= ETHER_CRC_LEN;
1517
1518        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1519        if (m_buff[0] == NULL) {
1520                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1521                return -1;
1522        } else {
1523                int ret;
1524                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1525                do {
1526                        ret = rte_eth_tx_burst(FORMAT(trace)->port, 0 /*queue TODO*/, m_buff, 1);
1527                } while (ret != 1);
1528        }
1529
1530        return 0;
1531}
1532
1533int dpdk_fin_input(libtrace_t * libtrace) {
1534        libtrace_list_node_t * n;
1535        /* Free our memory structures */
1536        if (libtrace->format_data != NULL) {
1537
1538                if (FORMAT(libtrace)->port != 0xFF)
1539                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1540                                                        RTE_ETH_EVENT_INTR_LSC,
1541                                                        dpdk_lsc_callback,
1542                                                        FORMAT(libtrace));
1543                /* Close the device completely, device cannot be restarted */
1544                rte_eth_dev_close(FORMAT(libtrace)->port);
1545
1546                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1547                                 FORMAT(libtrace)->nic_numa_node);
1548
1549                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1550                        dpdk_per_stream_t * stream = n->data;
1551                        if (stream->mempool)
1552                                dpdk_free_memory(stream->mempool,
1553                                                 rte_lcore_to_socket_id(stream->lcore));
1554                }
1555
1556                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1557                /* filter here if we used it */
1558                if (FORMAT(libtrace)->rss_key)
1559                        free(FORMAT(libtrace)->rss_key);
1560                free(libtrace->format_data);
1561        }
1562
1563        return 0;
1564}
1565
1566
1567static int dpdk_fin_output(libtrace_out_t * libtrace) {
1568        /* Free our memory structures */
1569        if (libtrace->format_data != NULL) {
1570                /* Close the device completely, device cannot be restarted */
1571                if (FORMAT(libtrace)->port != 0xFF)
1572                        rte_eth_dev_close(FORMAT(libtrace)->port);
1573                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1574                /* filter here if we used it */
1575                free(libtrace->format_data);
1576        }
1577
1578        return 0;
1579}
1580
1581/**
1582 * Get the start of the additional header that we added to a packet.
1583 */
1584static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1585        assert(packet);
1586        assert(packet->buffer);
1587        /* Our header sits straight after the mbuf header */
1588        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1589}
1590
1591static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1592        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1593        return hdr->cap_len;
1594}
1595
1596static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1597        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1598        if (size > hdr->cap_len) {
1599                /* Cannot make a packet bigger */
1600                return trace_get_capture_length(packet);
1601        }
1602
1603        /* Reset the cached capture length first*/
1604        packet->capture_length = -1;
1605        hdr->cap_len = (uint32_t) size;
1606        return trace_get_capture_length(packet);
1607}
1608
1609static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1610        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1611        int org_cap_size; /* The original capture size */
1612        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1613                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1614                               sizeof(struct hw_timestamp_82580);
1615        } else {
1616                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1617        }
1618        if (hdr->flags & INCLUDES_CHECKSUM) {
1619                return org_cap_size;
1620        } else {
1621                /* DPDK packets are always TRACE_TYPE_ETH packets */
1622                return org_cap_size + ETHER_CRC_LEN;
1623        }
1624}
1625
1626int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1627        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1628        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1629                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1630                                sizeof(struct hw_timestamp_82580);
1631        else
1632                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1633}
1634
1635int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1636                               libtrace_packet_t *packet, void *buffer,
1637                               libtrace_rt_types_t rt_type, uint32_t flags) {
1638        assert(packet);
1639        if (packet->buffer != buffer &&
1640            packet->buf_control == TRACE_CTRL_PACKET) {
1641                free(packet->buffer);
1642        }
1643
1644        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1645                packet->buf_control = TRACE_CTRL_PACKET;
1646        else
1647                packet->buf_control = TRACE_CTRL_EXTERNAL;
1648
1649        packet->buffer = buffer;
1650        packet->header = buffer;
1651
1652        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1653        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1654        packet->type = rt_type;
1655        return 0;
1656}
1657
1658/**
1659 * Given a packet size and a link speed, computes the
1660 * time to transmit in nanoseconds.
1661 *
1662 * @param format_data The dpdk format data from which we get the link speed
1663 *        and if unset updates it in a thread safe manner
1664 * @param pkt_size The size of the packet in bytes
1665 * @return The wire time in nanoseconds
1666 */
1667static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1668        uint32_t wire_time;
1669        /* 20 extra bytes of interframe gap and preamble */
1670# if GET_MAC_CRC_CHECKSUM
1671        wire_time = ((pkt_size + 20) * 8000);
1672# else
1673        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1674# endif
1675
1676        /* Division is really slow and introduces a pipeline stall
1677         * The compiler will optimise this into magical multiplication and shifting
1678         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1679         */
1680retry_calc_wiretime:
1681        switch (format_data->link_speed) {
1682        case ETH_SPEED_NUM_40G:
1683                wire_time /=  ETH_SPEED_NUM_40G;
1684                break;
1685        case ETH_SPEED_NUM_20G:
1686                wire_time /= ETH_SPEED_NUM_20G;
1687                break;
1688        case ETH_SPEED_NUM_10G:
1689                wire_time /= ETH_SPEED_NUM_10G;
1690                break;
1691        case ETH_SPEED_NUM_1G:
1692                wire_time /= ETH_SPEED_NUM_1G;
1693                break;
1694        case 0:
1695                {
1696                /* Maybe the link was down originally, but now it should be up */
1697                struct rte_eth_link link = {0};
1698                rte_eth_link_get_nowait(format_data->port, &link);
1699                if (link.link_status && link.link_speed) {
1700                        format_data->link_speed = link.link_speed;
1701#ifdef DEBUG
1702                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1703#endif
1704                        goto retry_calc_wiretime;
1705                }
1706                /* We don't know the link speed, make sure numbers are counting up */
1707                wire_time = 1;
1708                break;
1709                }
1710        default:
1711                wire_time /= format_data->link_speed;
1712        }
1713        return wire_time;
1714}
1715
1716/**
1717 * Does any extra preperation to all captured packets
1718 * This includes adding our extra header to it with the timestamp,
1719 * and any snapping
1720 *
1721 * @param format_data The DPDK format data
1722 * @param plc The DPDK per lcore format data
1723 * @param pkts An array of size nb_pkts of DPDK packets
1724 */
1725static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1726                                   struct dpdk_per_stream_t *plc,
1727                                   struct rte_mbuf **pkts,
1728                                   size_t nb_pkts) {
1729        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1730        struct dpdk_addt_hdr *hdr;
1731        size_t i;
1732        uint64_t cur_sys_time_ns;
1733#if HAS_HW_TIMESTAMPS_82580
1734        struct hw_timestamp_82580 *hw_ts;
1735        uint64_t estimated_wraps;
1736#else
1737
1738#endif
1739
1740#if USE_CLOCK_GETTIME
1741        struct timespec cur_sys_time = {0};
1742        /* This looks terrible and I feel bad doing it. But it's OK
1743         * on new kernels, because this is a fast vsyscall */
1744        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1745        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1746#else
1747        struct timeval cur_sys_time = {0};
1748        /* Also a fast vsyscall */
1749        gettimeofday(&cur_sys_time, NULL);
1750        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1751#endif
1752
1753        /* The system clock is not perfect so when running
1754         * at linerate we could timestamp a packet in the past.
1755         * To avoid this we munge the timestamp to appear 1ns
1756         * after the previous packet. We should eventually catch up
1757         * to system time since a 64byte packet on a 10G link takes 67ns.
1758         *
1759         * Note with parallel readers timestamping packets
1760         * with duplicate stamps or out of order is unavoidable without
1761         * hardware timestamping from the NIC.
1762         */
1763#if !HAS_HW_TIMESTAMPS_82580
1764        if (plc->ts_last_sys >= cur_sys_time_ns) {
1765                cur_sys_time_ns = plc->ts_last_sys + 1;
1766        }
1767#endif
1768
1769        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1770        for (i = 0 ; i < nb_pkts ; ++i) {
1771
1772                /* We put our header straight after the dpdk header */
1773                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1774                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1775
1776#if GET_MAC_CRC_CHECKSUM
1777                /* Add back in the CRC sum */
1778                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1779                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1780                hdr->flags |= INCLUDES_CHECKSUM;
1781#endif
1782
1783                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1784
1785#if HAS_HW_TIMESTAMPS_82580
1786                /* The timestamp is sitting before our packet and is included in pkt_len */
1787                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1788                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1789                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1790
1791                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1792                 *
1793                 *        +----------+---+   +--------------+
1794                 *  82580 |    24    | 8 |   |      32      |
1795                 *        +----------+---+   +--------------+
1796                 *          reserved  \______ 40 bits _____/
1797                 *
1798                 * The 40 bit 82580 SYSTIM overflows every
1799                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1800                 *
1801                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1802                 * Endian (for the full 64 bits) i.e. picture is mirrored
1803                 */
1804
1805                /* Despite what the documentation says this is in Little
1806                 * Endian byteorder. Mask the reserved section out.
1807                 */
1808                hdr->timestamp = le64toh(hw_ts->timestamp) &
1809                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1810
1811                if (unlikely(plc->ts_first_sys == 0)) {
1812                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1813                        plc->ts_last_sys = plc->ts_first_sys;
1814                }
1815
1816                /* This will have serious problems if packets aren't read quickly
1817                 * that is within a couple of seconds because our clock cycles every
1818                 * 18 seconds */
1819                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1820                                  / (1ull<<TS_NBITS_82580);
1821
1822                /* Estimated_wraps gives the number of times the counter should have
1823                 * wrapped (however depending on value last time it could have wrapped
1824                 * twice more (if hw clock is close to its max value) or once less (allowing
1825                 * for a bit of variance between hw and sys clock). But if the clock
1826                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1827                if (unlikely(estimated_wraps >= 2)) {
1828                        /* 2 or more wrap arounds add all but the very last wrap */
1829                        plc->wrap_count += estimated_wraps - 1;
1830                }
1831
1832                /* Set the timestamp to the lowest possible value we're considering */
1833                hdr->timestamp += plc->ts_first_sys +
1834                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1835
1836                /* In most runs only the first if() will need evaluating - i.e our
1837                 * estimate is correct. */
1838                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1839                                              hdr->timestamp, MAXSKEW_82580))) {
1840                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1841                        plc->wrap_count++;
1842                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1843                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1844                                             hdr->timestamp, MAXSKEW_82580)) {
1845                                /* Failed to match estimated_wraps */
1846                                plc->wrap_count++;
1847                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1848                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1849                                                     hdr->timestamp, MAXSKEW_82580)) {
1850                                        if (estimated_wraps == 0) {
1851                                                /* 0 case Failed to match estimated_wraps+2 */
1852                                                printf("WARNING - Hardware Timestamp failed to"
1853                                                       " match using systemtime!\n");
1854                                                hdr->timestamp = cur_sys_time_ns;
1855                                        } else {
1856                                                /* Failed to match estimated_wraps+1 */
1857                                                plc->wrap_count++;
1858                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1859                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1860                                                                     hdr->timestamp, MAXSKEW_82580)) {
1861                                                        /* Failed to match estimated_wraps+2 */
1862                                                        printf("WARNING - Hardware Timestamp failed to"
1863                                                               " match using systemtime!!\n");
1864                                                }
1865                                        }
1866                                }
1867                        }
1868                }
1869#else
1870
1871                hdr->timestamp = cur_sys_time_ns;
1872                /* Offset the next packet by the wire time of previous */
1873                calculate_wire_time(format_data, hdr->cap_len);
1874
1875#endif
1876        }
1877
1878        plc->ts_last_sys = cur_sys_time_ns;
1879        return;
1880}
1881
1882
1883static void dpdk_fin_packet(libtrace_packet_t *packet)
1884{
1885        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1886                rte_pktmbuf_free(packet->buffer);
1887                packet->buffer = NULL;
1888        }
1889}
1890
1891/** Reads at least one packet or returns an error
1892 */
1893int dpdk_read_packet_stream (libtrace_t *libtrace,
1894                                           dpdk_per_stream_t *stream,
1895                                           libtrace_message_queue_t *mesg,
1896                                           struct rte_mbuf* pkts_burst[],
1897                                           size_t nb_packets) {
1898        size_t nb_rx; /* Number of rx packets we've recevied */
1899        while (1) {
1900                /* Poll for a batch of packets */
1901                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1902                                         stream->queue_id, pkts_burst, nb_packets);
1903                if (nb_rx > 0) {
1904                        /* Got some packets - otherwise we keep spining */
1905                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
1906                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1907                        return nb_rx;
1908                }
1909                /* Check the message queue this could be less than 0 */
1910                if (mesg && libtrace_message_queue_count(mesg) > 0)
1911                        return READ_MESSAGE;
1912                if ((nb_rx=is_halted(libtrace)) != (size_t) -1)
1913                        return nb_rx;
1914                /* Wait a while, polling on memory degrades performance
1915                 * This relieves the pressure on memory allowing the NIC to DMA */
1916                rte_delay_us(10);
1917        }
1918
1919        /* We'll never get here - but if we did it would be bad */
1920        return READ_ERROR;
1921}
1922
1923static int dpdk_pread_packets (libtrace_t *libtrace,
1924                                    libtrace_thread_t *t,
1925                                    libtrace_packet_t **packets,
1926                                    size_t nb_packets) {
1927        int nb_rx; /* Number of rx packets we've recevied */
1928        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
1929        int i;
1930        dpdk_per_stream_t *stream = t->format_data;
1931        struct dpdk_addt_hdr * hdr;
1932
1933        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
1934                                         pkts_burst, nb_packets);
1935
1936        if (nb_rx > 0) {
1937                for (i = 0; i < nb_rx; ++i) {
1938                        if (packets[i]->buffer != NULL) {
1939                                /* The packet should always be finished */
1940                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
1941                                free(packets[i]->buffer);
1942                        }
1943                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
1944                        packets[i]->type = TRACE_RT_DATA_DPDK;
1945                        packets[i]->buffer = pkts_burst[i];
1946                        packets[i]->trace = libtrace;
1947                        packets[i]->error = 1;
1948                        hdr = (struct dpdk_addt_hdr *) 
1949                                        ((struct rte_mbuf*) pkts_burst[i] + 1);
1950                        packets[i]->order = hdr->timestamp;
1951                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
1952                }
1953        }
1954
1955        return nb_rx;
1956}
1957
1958int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1959        int nb_rx; /* Number of rx packets we've received */
1960        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
1961
1962        /* Free the last packet buffer */
1963        if (packet->buffer != NULL) {
1964                /* The packet should always be finished */
1965                assert(packet->buf_control == TRACE_CTRL_PACKET);
1966                free(packet->buffer);
1967                packet->buffer = NULL;
1968        }
1969
1970        packet->buf_control = TRACE_CTRL_EXTERNAL;
1971        packet->type = TRACE_RT_DATA_DPDK;
1972
1973        /* Check if we already have some packets buffered */
1974        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
1975                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
1976                packet->trace = libtrace;
1977                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1978                return 1; // TODO should be bytes read, which essentially useless anyway
1979        }
1980
1981        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
1982                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
1983
1984        if (nb_rx > 0) {
1985                FORMAT(libtrace)->burst_size = nb_rx;
1986                FORMAT(libtrace)->burst_offset = 1;
1987                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
1988                packet->trace = libtrace;
1989                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1990                return 1;
1991        }
1992        return nb_rx;
1993}
1994
1995static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1996        struct timeval tv;
1997        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1998
1999        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2000        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2001        return tv;
2002}
2003
2004static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2005        struct timespec ts;
2006        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2007
2008        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2009        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2010        return ts;
2011}
2012
2013static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2014        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2015}
2016
2017static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2018        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2019        return (libtrace_direction_t) hdr->direction;
2020}
2021
2022static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2023        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2024        hdr->direction = (uint8_t) direction;
2025        return (libtrace_direction_t) hdr->direction;
2026}
2027
2028void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2029        struct rte_eth_stats dev_stats = {0};
2030
2031        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2032                return;
2033
2034        /* Grab the current stats */
2035        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2036
2037        stats->captured_valid = true;
2038        stats->captured = dev_stats.ipackets;
2039
2040        stats->dropped_valid = true;
2041        stats->dropped = dev_stats.imissed;
2042
2043#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2044        /* DPDK commit 86057c fixes ensures missed does not get counted as
2045         * errors */
2046        stats->errors_valid = true;
2047        stats->errors = dev_stats.ierrors;
2048#else
2049        /* DPDK errors includes drops */
2050        stats->errors_valid = true;
2051        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2052#endif
2053        stats->received_valid = true;
2054        stats->received = dev_stats.ipackets + dev_stats.imissed;
2055
2056}
2057
2058/* Attempts to read a packet in a non-blocking fashion. If one is not
2059 * available a SLEEP event is returned. We do not have the ability to
2060 * create a select()able file descriptor in DPDK.
2061 */
2062libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2063                                            libtrace_packet_t *packet) {
2064        libtrace_eventobj_t event = {0,0,0.0,0};
2065        size_t nb_rx; /* Number of received packets we've read */
2066
2067        do {
2068
2069                /* No packets waiting in our buffer? Try and read some more */
2070                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
2071                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2072                                                 FORMAT_DATA_FIRST(trace)->queue_id,
2073                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
2074                        if (nb_rx > 0) {
2075                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
2076                                                FORMAT(trace)->burst_pkts, nb_rx);
2077                                FORMAT(trace)->burst_size = nb_rx;
2078                                FORMAT(trace)->burst_offset = 0;
2079                        }
2080                }
2081
2082                /* Now do we have packets waiting? */
2083                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
2084                        /* Free the last packet buffer */
2085                        if (packet->buffer != NULL) {
2086                                /* The packet should always be finished */
2087                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2088                                free(packet->buffer);
2089                                packet->buffer = NULL;
2090                        }
2091
2092                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2093                        packet->type = TRACE_RT_DATA_DPDK;
2094                        event.type = TRACE_EVENT_PACKET;
2095                        packet->buffer = FORMAT(trace)->burst_pkts[
2096                                             FORMAT(trace)->burst_offset++];
2097                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2098                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2099
2100                        /* XXX - Check this passes the filter trace_read_packet normally
2101                         * does this for us but this wont */
2102                        if (trace->filter) {
2103                                if (!trace_apply_filter(trace->filter, packet)) {
2104                                        /* Failed the filter so we loop for another packet */
2105                                        trace->filtered_packets ++;
2106                                        continue;
2107                                }
2108                        }
2109                        trace->accepted_packets ++;
2110                } else {
2111                        /* We only want to sleep for a very short time - we are non-blocking */
2112                        event.type = TRACE_EVENT_SLEEP;
2113                        event.seconds = 0.0001;
2114                        event.size = 0;
2115                }
2116
2117                /* If we get here we have our event */
2118                break;
2119        } while (1);
2120
2121        return event;
2122}
2123
2124static void dpdk_help(void) {
2125        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
2126        printf("Supported input URIs:\n");
2127        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2128        printf("\tThe -<coreid> is optional \n");
2129        printf("\t e.g. dpdk:0000:01:00.1\n");
2130        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2131        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2132        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2133        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2134        printf("\n");
2135        printf("Supported output URIs:\n");
2136        printf("\tSame format as the input URI.\n");
2137        printf("\t e.g. dpdk:0000:01:00.1\n");
2138        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2139        printf("\n");
2140}
2141
2142static struct libtrace_format_t dpdk = {
2143        "dpdk",
2144        "$Id$",
2145        TRACE_FORMAT_DPDK,
2146        NULL,                               /* probe filename */
2147        NULL,                               /* probe magic */
2148        dpdk_init_input,                    /* init_input */
2149        dpdk_config_input,                  /* config_input */
2150        dpdk_start_input,                   /* start_input */
2151        dpdk_pause_input,                   /* pause_input */
2152        dpdk_init_output,                   /* init_output */
2153        NULL,                               /* config_output */
2154        dpdk_start_output,                  /* start_ouput */
2155        dpdk_fin_input,                     /* fin_input */
2156        dpdk_fin_output,                    /* fin_output */
2157        dpdk_read_packet,                   /* read_packet */
2158        dpdk_prepare_packet,                /* prepare_packet */
2159        dpdk_fin_packet,                    /* fin_packet */
2160        dpdk_write_packet,                  /* write_packet */
2161        NULL,                               /* flush_output */
2162        dpdk_get_link_type,                 /* get_link_type */
2163        dpdk_get_direction,                 /* get_direction */
2164        dpdk_set_direction,                 /* set_direction */
2165        NULL,                               /* get_erf_timestamp */
2166        dpdk_get_timeval,                   /* get_timeval */
2167        dpdk_get_timespec,                  /* get_timespec */
2168        NULL,                               /* get_seconds */
2169        NULL,                               /* seek_erf */
2170        NULL,                               /* seek_timeval */
2171        NULL,                               /* seek_seconds */
2172        dpdk_get_capture_length,            /* get_capture_length */
2173        dpdk_get_wire_length,               /* get_wire_length */
2174        dpdk_get_framing_length,            /* get_framing_length */
2175        dpdk_set_capture_length,            /* set_capture_length */
2176        NULL,                               /* get_received_packets */
2177        NULL,                               /* get_filtered_packets */
2178        NULL,                               /* get_dropped_packets */
2179        dpdk_get_stats,                     /* get_statistics */
2180        NULL,                               /* get_fd */
2181        dpdk_trace_event,                   /* trace_event */
2182        dpdk_help,                          /* help */
2183        NULL,                               /* next pointer */
2184        {true, 8},                          /* Live, NICs typically have 8 threads */
2185        dpdk_pstart_input,                  /* pstart_input */
2186        dpdk_pread_packets,                 /* pread_packets */
2187        dpdk_pause_input,                   /* ppause */
2188        dpdk_fin_input,                     /* p_fin */
2189        dpdk_pregister_thread,              /* pregister_thread */
2190        dpdk_punregister_thread,            /* punregister_thread */
2191        NULL                                /* get thread stats */
2192};
2193
2194void dpdk_constructor(void) {
2195        register_format(&dpdk);
2196}
Note: See TracBrowser for help on using the repository browser.