source: lib/format_dpdk.c @ c94f107

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since c94f107 was c94f107, checked in by Richard Sanger <rsanger@…>, 5 years ago

Return DPDK library version from libtrace help

  • Property mode set to 100644
File size: 76.1 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 *
26 * Kit capture format.
27 *
28 * Intel Data Plane Development Kit is a LIVE capture format.
29 *
30 * This format also supports writing which will write packets out to the
31 * network as a form of packet replay. This should not be confused with the
32 * RT protocol which is intended to transfer captured packet records between
33 * RT-speaking programs.
34 */
35
36#define _GNU_SOURCE
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include "hash_toeplitz.h"
44
45#ifdef HAVE_INTTYPES_H
46#  include <inttypes.h>
47#else
48# error "Can't find inttypes.h"
49#endif
50
51#include <stdlib.h>
52#include <assert.h>
53#include <unistd.h>
54#include <endian.h>
55#include <string.h>
56
57#if HAVE_LIBNUMA
58#include <numa.h>
59#endif
60
61/* We can deal with any minor differences by checking the RTE VERSION
62 * Typically DPDK backports some fixes (typically for building against
63 * newer kernels) to the older version of DPDK.
64 *
65 * These get released with the rX suffix. The following macros where added
66 * in these new releases.
67 *
68 * Below this is a log of version that required changes to the libtrace
69 * code (that we still attempt to support).
70 *
71 * DPDK 16.04 or newer is recommended.
72 * However 1.6 and newer are still likely supported.
73 */
74#include <rte_eal.h>
75#include <rte_version.h>
76#ifndef RTE_VERSION_NUM
77#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
78#endif
79#ifndef RTE_VER_PATCH_RELEASE
80#       define RTE_VER_PATCH_RELEASE 0
81#endif
82#ifndef RTE_VERSION
83#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
84        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
85#endif
86
87/* 1.6.0r2 :
88 *      rte_eal_pci_set_blacklist() is removed
89 *      device_list is renamed to pci_device_list
90 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
91 *      as such we do apply the whitelist before rte_eal_init.
92 *      This also works correctly with DPDK 1.6.0r2.
93 *
94 * Replaced by:
95 *      rte_devargs (we can simply whitelist)
96 */
97#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
98#       define DPDK_USE_BLACKLIST 1
99#else
100#       define DPDK_USE_BLACKLIST 0
101#endif
102
103/*
104 * 1.7.0 :
105 *      rte_pmd_init_all is removed
106 *
107 * Replaced by:
108 *      Nothing, no longer needed
109 */
110#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
111#       define DPDK_USE_PMD_INIT 1
112#else
113#       define DPDK_USE_PMD_INIT 0
114#endif
115
116/* 1.7.0-rc3 :
117 *
118 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
119 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
120 * it twice.
121 */
122#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
123#       define DPDK_USE_PCI_PROBE 1
124#else
125#       define DPDK_USE_PCI_PROBE 0
126#endif
127
128/* 1.8.0-rc1 :
129 * LOG LEVEL is a command line option which overrides what
130 * we previously set it to.
131 */
132#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
133#       define DPDK_USE_LOG_LEVEL 1
134#else
135#       define DPDK_USE_LOG_LEVEL 0
136#endif
137
138/* 1.8.0-rc2
139 * rx/tx_conf thresholds can be set to NULL in rte_eth_rx/tx_queue_setup
140 * this uses the default values, which are better tuned per device
141 * See issue #26
142 */
143#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 2)
144#       define DPDK_USE_NULL_QUEUE_CONFIG 1
145#else
146#       define DPDK_USE_NULL_QUEUE_CONFIG 0
147#endif
148
149/* 2.0.0-rc1
150 * Unifies RSS hash between cards
151 */
152#if RTE_VERSION >= RTE_VERSION_NUM(2, 0, 0, 1)
153#       define RX_RSS_FLAGS (ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | \
154                             ETH_RSS_SCTP)
155#else
156#       define RX_RSS_FLAGS (ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | \
157                             ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP |\
158                             ETH_RSS_IPV6_UDP)
159#endif
160
161/* v16.07-rc1 - deprecated
162 * rte_mempool_avail_count to replace rte_mempool_count
163 * rte_mempool_in_use_count to replace rte_mempool_free_count
164 */
165#if RTE_VERSION < RTE_VERSION_NUM(16, 7, 0, 1)
166#define rte_mempool_avail_count rte_mempool_count
167#define rte_mempool_in_use_count rte_mempool_free_count
168#endif
169
170#include <rte_per_lcore.h>
171#include <rte_debug.h>
172#include <rte_errno.h>
173#include <rte_common.h>
174#include <rte_log.h>
175#include <rte_memcpy.h>
176#include <rte_prefetch.h>
177#include <rte_branch_prediction.h>
178#include <rte_pci.h>
179#include <rte_ether.h>
180#include <rte_ethdev.h>
181#include <rte_ring.h>
182#include <rte_mempool.h>
183#include <rte_mbuf.h>
184#include <rte_launch.h>
185#include <rte_lcore.h>
186#include <rte_per_lcore.h>
187#include <rte_cycles.h>
188#include <pthread.h>
189#ifdef __FreeBSD__
190#include <pthread_np.h>
191#endif
192
193/* 16.04-rc3 ETH_LINK_SPEED_X are replaced with ETH_SPEED_NUM_X.
194 * ETH_LINK_SPEED_ are reused as flags, ugly.
195 * We use the new way in this code.
196 */
197#ifndef ETH_SPEED_NUM_1G
198        #define ETH_SPEED_NUM_1G ETH_LINK_SPEED_1000
199        #define ETH_SPEED_NUM_10G ETH_LINK_SPEED_10G
200        #define ETH_SPEED_NUM_20G ETH_LINK_SPEED_20G
201        #define ETH_SPEED_NUM_40G ETH_LINK_SPEED_40G
202#endif
203
204/* The default size of memory buffers to use - This is the max size of standard
205 * ethernet packet less the size of the MAC CHECKSUM */
206#define RX_MBUF_SIZE 1514
207
208/* The minimum number of memory buffers per queue tx or rx. Based on
209 * the requirement of the memory pool with 128 per thread buffers, needing
210 * at least 128*1.5 = 192 buffers. Our code allocates 128*2 to be safe.
211 */
212#define MIN_NB_BUF 128
213
214/* Number of receive memory buffers to use
215 * By default this is limited by driver to 4k and must be a multiple of 128.
216 * A modification can be made to the driver to remove this limit.
217 * This can be increased in the driver and here.
218 * Should be at least MIN_NB_BUF.
219 * We choose 2K rather than 4K because it enables the usage of sse vector
220 * drivers which are significantly faster than using the larger buffer.
221 */
222#define NB_RX_MBUF (4096/2)
223
224/* Number of send memory buffers to use.
225 * Same limits apply as those to NB_TX_MBUF.
226 */
227#define NB_TX_MBUF 1024
228
229/* The size of the PCI blacklist needs to be big enough to contain
230 * every PCI device address (listed by lspci every bus:device.function tuple).
231 */
232#define BLACK_LIST_SIZE 50
233
234/* The maximum number of characters the mempool name can be */
235#define MEMPOOL_NAME_LEN 20
236
237/* For single threaded libtrace we read packets as a batch/burst
238 * this is the maximum size of said burst */
239#define BURST_SIZE 32
240
241#define MBUF(x) ((struct rte_mbuf *) x)
242/* Get the original placement of the packet data */
243#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
244#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
245#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
246
247#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
248#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
249
250#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
251                        (uint64_t) tv.tv_usec*1000ull)
252#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
253                        (uint64_t) ts.tv_nsec)
254
255#if RTE_PKTMBUF_HEADROOM != 128
256#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
257         "any libtrace instance processing these packet must be have the" \
258         "same RTE_PKTMBUF_HEADROOM set"
259#endif
260
261/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
262 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
263 *
264 * Make sure you understand what these are doing before enabling them.
265 * They might make traces incompatible with other builds etc.
266 *
267 * These are also included to show how to do somethings which aren't
268 * obvious in the DPDK documentation.
269 */
270
271/* Print verbose messages to stderr */
272#define DEBUG 0
273
274/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
275 * only turn on if you know clock_gettime is a vsyscall on your system
276 * otherwise could be a large overhead. Again gettimeofday() should be
277 * vsyscall also if it's not you should seriously consider updating your
278 * kernel.
279 */
280#ifdef HAVE_CLOCK_GETTIME
281/* You can turn this on (set to 1) to prefer clock_gettime */
282#define USE_CLOCK_GETTIME 1
283#else
284/* DON'T CHANGE THIS !!! */
285#define USE_CLOCK_GETTIME 0
286#endif
287
288/* This is fairly safe to turn on - currently there appears to be a 'bug'
289 * in DPDK that will remove the checksum by making the packet appear 4bytes
290 * smaller than what it really is. Most formats don't include the checksum
291 * hence writing out a port such as int: ring: and dpdk: assumes there
292 * is no checksum and will attempt to write the checksum as part of the
293 * packet
294 */
295#define GET_MAC_CRC_CHECKSUM 0
296
297/* This requires a modification of the pmd drivers (inside Intel DPDK)
298 * TODO this requires updating (packet sizes are wrong TS most likely also)
299 */
300#define HAS_HW_TIMESTAMPS_82580 0
301
302#if HAS_HW_TIMESTAMPS_82580
303# define TS_NBITS_82580     40
304/* The maximum on the +ve or -ve side that we can be, make it half way */
305# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
306#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
307#endif
308
309static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
310/* Memory pools Per NUMA node */
311static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
312
313/* As per Intel 82580 specification - mismatch in 82580 datasheet
314 * it states ts is stored in Big Endian, however its actually Little */
315struct hw_timestamp_82580 {
316        uint64_t reserved;
317        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
318};
319
320enum paused_state {
321        DPDK_NEVER_STARTED,
322        DPDK_RUNNING,
323        DPDK_PAUSED,
324};
325
326struct dpdk_per_stream_t
327{
328        uint16_t queue_id;
329        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
330        struct rte_mempool *mempool;
331        int lcore;
332#if HAS_HW_TIMESTAMPS_82580
333        /* Timestamping only relevant to RX */
334        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
335        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
336#endif
337} ALIGN_STRUCT(CACHE_LINE_SIZE);
338
339#if HAS_HW_TIMESTAMPS_82580
340#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
341#else
342#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
343#endif
344
345typedef struct dpdk_per_stream_t dpdk_per_stream_t;
346
347/* Used by both input and output however some fields are not used
348 * for output */
349struct dpdk_format_data_t {
350        int8_t promisc; /* promiscuous mode - RX only */
351        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
352        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
353        uint8_t paused; /* See paused_state */
354        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
355        int snaplen; /* The snap length for the capture - RX only */
356        /* We always have to setup both rx and tx queues even if we don't want them */
357        int nb_rx_buf; /* The number of packet buffers in the rx ring */
358        int nb_tx_buf; /* The number of packet buffers in the tx ring */
359        int nic_numa_node; /* The NUMA node that the NIC is attached to */
360        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
361#if DPDK_USE_BLACKLIST
362        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
363        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
364#endif
365        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
366        uint8_t rss_key[40]; // This is the RSS KEY
367        /* To improve single-threaded performance we always batch reading
368         * packets, in a burst, otherwise the parallel library does this for us */
369        struct rte_mbuf* burst_pkts[BURST_SIZE];
370        int burst_size; /* The total number read in the burst */
371        int burst_offset; /* The offset we are into the burst */
372
373        /* Our parallel streams */
374        libtrace_list_t *per_stream;
375};
376
377enum dpdk_addt_hdr_flags {
378        INCLUDES_CHECKSUM = 0x1,
379        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
380};
381
382/**
383 * A structure placed in front of the packet where we can store
384 * additional information about the given packet.
385 * +--------------------------+
386 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
387 * +--------------------------+
388 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
389 * +--------------------------+
390 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
391 * +--------------------------+
392 * *   hw_timestamp_82580     * 16 bytes Optional
393 * +--------------------------+
394 * |       Packet data        | Variable Size
395 * |                          |
396 */
397struct dpdk_addt_hdr {
398        uint64_t timestamp;
399        uint8_t flags;
400        uint8_t direction;
401        uint8_t reserved1;
402        uint8_t reserved2;
403        uint32_t cap_len; /* The size to say the capture is */
404};
405
406/**
407 * We want to blacklist all devices except those on the whitelist
408 * (I say list, but yes it is only the one).
409 *
410 * The default behaviour of rte_pci_probe() will map every possible device
411 * to its DPDK driver. The DPDK driver will take the ethernet device
412 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
413 *
414 * So blacklist all devices except the one that we wish to use so that
415 * the others can still be used as standard ethernet ports.
416 *
417 * @return 0 if successful, otherwise -1 on error.
418 */
419#if DPDK_USE_BLACKLIST
420static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
421{
422        struct rte_pci_device *dev = NULL;
423        format_data->nb_blacklist = 0;
424
425        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
426
427        TAILQ_FOREACH(dev, &device_list, next) {
428        if (whitelist != NULL && whitelist->domain == dev->addr.domain
429            && whitelist->bus == dev->addr.bus
430            && whitelist->devid == dev->addr.devid
431            && whitelist->function == dev->addr.function)
432            continue;
433                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
434                                / sizeof (format_data->blacklist[0])) {
435                        fprintf(stderr, "Warning: too many devices to blacklist consider"
436                                        " increasing BLACK_LIST_SIZE");
437                        break;
438                }
439                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
440                ++format_data->nb_blacklist;
441        }
442
443        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
444        return 0;
445}
446#else /* DPDK_USE_BLACKLIST */
447#include <rte_devargs.h>
448static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
449{
450        char pci_str[20] = {0};
451        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
452                 whitelist->domain,
453                 whitelist->bus,
454                 whitelist->devid,
455                 whitelist->function);
456        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
457                return -1;
458        }
459        return 0;
460}
461#endif
462
463/**
464 * Parse the URI format as a pci address
465 * Fills in addr, note core is optional and is unchanged if
466 * a value for it is not provided.
467 *
468 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
469 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
470 */
471static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
472        int matches;
473        assert(str);
474        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
475                         &addr->domain, &addr->bus, &addr->devid,
476                         &addr->function, core);
477        if (matches >= 4) {
478                return 0;
479        } else {
480                return -1;
481        }
482}
483
484/**
485 * Convert a pci address to the numa node it is
486 * connected to.
487 *
488 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
489 * so we can call it before DPDK
490 *
491 * @return -1 if unknown otherwise a number 0 or higher of the numa node
492 */
493static int pci_to_numa(struct rte_pci_addr * dev_addr) {
494        char path[50] = {0};
495        FILE *file;
496
497        /* Read from the system */
498        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
499                 dev_addr->domain,
500                 dev_addr->bus,
501                 dev_addr->devid,
502                 dev_addr->function);
503
504        if((file = fopen(path, "r")) != NULL) {
505                int numa_node = -1;
506                fscanf(file, "%d", &numa_node);
507                fclose(file);
508                return numa_node;
509        }
510        return -1;
511}
512
513#if DEBUG
514/* For debugging */
515static inline void dump_configuration()
516{
517        struct rte_config * global_config;
518        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
519
520        if (nb_cpu <= 0) {
521                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
522                       " Falling back to the first core.");
523                nb_cpu = 1; /* fallback to just 1 core */
524        }
525        if (nb_cpu > RTE_MAX_LCORE)
526                nb_cpu = RTE_MAX_LCORE;
527
528        global_config = rte_eal_get_configuration();
529
530        if (global_config != NULL) {
531                int i;
532                fprintf(stderr, "Intel DPDK setup\n"
533                        "---Version      : %s\n"
534                        "---Master LCore : %"PRIu32"\n"
535                        "---LCore Count  : %"PRIu32"\n",
536                        rte_version(),
537                        global_config->master_lcore, global_config->lcore_count);
538
539                for (i = 0 ; i < nb_cpu; i++) {
540                        fprintf(stderr, "   ---Core %d : %s\n", i,
541                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
542                }
543
544                const char * proc_type;
545                switch (global_config->process_type) {
546                case RTE_PROC_AUTO:
547                        proc_type = "auto";
548                        break;
549                case RTE_PROC_PRIMARY:
550                        proc_type = "primary";
551                        break;
552                case RTE_PROC_SECONDARY:
553                        proc_type = "secondary";
554                        break;
555                case RTE_PROC_INVALID:
556                        proc_type = "invalid";
557                        break;
558                default:
559                        proc_type = "something worse than invalid!!";
560                }
561                fprintf(stderr, "---Process Type : %s\n", proc_type);
562        }
563
564}
565#endif
566
567/**
568 * Expects to be called from the master lcore and moves it to the given dpdk id
569 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
570 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
571 *               and not already in use.
572 * @return 0 is successful otherwise -1 on error.
573 */
574static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
575        struct rte_config *cfg = rte_eal_get_configuration();
576        cpu_set_t cpuset;
577        int i;
578
579        assert (core < RTE_MAX_LCORE);
580        assert (rte_get_master_lcore() == rte_lcore_id());
581
582        if (core == rte_lcore_id())
583                return 0;
584
585        /* Make sure we are not overwriting someone else */
586        assert(!rte_lcore_is_enabled(core));
587
588        /* Move the core */
589        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
590        cfg->lcore_role[core] = ROLE_RTE;
591        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
592        rte_eal_get_configuration()->master_lcore = core;
593        RTE_PER_LCORE(_lcore_id) = core;
594
595        /* Now change the affinity, either mapped to a single core or all accepted */
596        CPU_ZERO(&cpuset);
597
598        if (lcore_config[core].detected) {
599                CPU_SET(core, &cpuset);
600        } else {
601                for (i = 0; i < RTE_MAX_LCORE; ++i) {
602                        if (lcore_config[i].detected)
603                                CPU_SET(i, &cpuset);
604                }
605        }
606
607        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
608        if (i != 0) {
609                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
610                return -1;
611        }
612        return 0;
613}
614
615/**
616 * XXX This is very bad XXX
617 * But we have to do something to allow getopts nesting
618 * Luckly normally the format is last so it doesn't matter
619 * DPDK only supports modern systems so hopefully this
620 * will continue to work
621 */
622struct saved_getopts {
623        char *optarg;
624        int optind;
625        int opterr;
626        int optopt;
627};
628
629static void save_getopts(struct saved_getopts *opts) {
630        opts->optarg = optarg;
631        opts->optind = optind;
632        opts->opterr = opterr;
633        opts->optopt = optopt;
634}
635
636static void restore_getopts(struct saved_getopts *opts) {
637        optarg = opts->optarg;
638        optind = opts->optind;
639        opterr = opts->opterr;
640        optopt = opts->optopt;
641}
642
643static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
644                                        char * err, int errlen) {
645        int ret; /* Returned error codes */
646        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
647        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
648        char mem_map[20] = {0}; /* The memory name */
649        long nb_cpu; /* The number of CPUs in the system */
650        long my_cpu; /* The CPU number we want to bind to */
651        int i;
652        struct rte_config *cfg = rte_eal_get_configuration();
653        struct saved_getopts save_opts;
654
655        /* This initialises the Environment Abstraction Layer (EAL)
656         * If we had slave workers these are put into WAITING state
657         *
658         * Basically binds this thread to a fixed core, which we choose as
659         * the last core on the machine (assuming fewer interrupts mapped here).
660         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
661         * "-n" the number of memory channels into the CPU (hardware specific)
662         *      - Most likely to be half the number of ram slots in your machine.
663         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
664         * Controls where in memory packets are stored such that they are spread
665         * across the channels. We just use 1 to be safe.
666         *
667         * Using unique file prefixes mean separate memory is used, unlinking
668         * the two processes. However be careful we still cannot access a
669         * port that already in use.
670         */
671        char* argv[] = {"libtrace",
672                        "-c", cpu_number,
673                        "-n", "1",
674                        "--proc-type", "auto",
675                        "--file-prefix", mem_map,
676                        "-m", "512",
677#if DPDK_USE_LOG_LEVEL
678#       if DEBUG
679                        "--log-level", "8", /* RTE_LOG_DEBUG */
680#       else
681                        "--log-level", "5", /* RTE_LOG_WARNING */
682#       endif
683#endif
684                        NULL};
685        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
686
687#if DEBUG
688        rte_set_log_level(RTE_LOG_DEBUG);
689#else
690        rte_set_log_level(RTE_LOG_WARNING);
691#endif
692
693        /* Get the number of cpu cores in the system and use the last core
694         * on the correct numa node */
695        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
696        if (nb_cpu <= 0) {
697                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
698                       " Falling back to the first core.");
699                nb_cpu = 1; /* fallback to the first core */
700        }
701        if (nb_cpu > RTE_MAX_LCORE)
702                nb_cpu = RTE_MAX_LCORE;
703
704        my_cpu = -1;
705        /* This allows the user to specify the core - we would try to do this
706         * automatically but it's hard to tell that this is secondary
707         * before running rte_eal_init(...). Currently we are limited to 1
708         * instance per core due to the way memory is allocated. */
709        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
710                snprintf(err, errlen, "Failed to parse URI");
711                return -1;
712        }
713
714#if HAVE_LIBNUMA
715        format_data->nic_numa_node = pci_to_numa(&use_addr);
716        if (my_cpu < 0) {
717#if DEBUG
718                /* If we can assign to a core on the same numa node */
719                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
720#endif
721                if(format_data->nic_numa_node >= 0) {
722                        int max_node_cpu = -1;
723                        struct bitmask *mask = numa_allocate_cpumask();
724                        assert(mask);
725                        numa_node_to_cpus(format_data->nic_numa_node, mask);
726                        for (i = 0 ; i < nb_cpu; ++i) {
727                                if (numa_bitmask_isbitset(mask,i))
728                                        max_node_cpu = i+1;
729                        }
730                        my_cpu = max_node_cpu;
731                }
732        }
733#endif
734        if (my_cpu < 0) {
735                my_cpu = nb_cpu;
736        }
737
738
739        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
740                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
741
742        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
743                snprintf(err, errlen,
744                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
745                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
746                return -1;
747        }
748
749        /* Make our mask with all cores turned on this is so that DPDK
750         * gets all CPU info in older versions */
751        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
752        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
753
754#if !DPDK_USE_BLACKLIST
755        /* Black list all ports besides the one that we want to use */
756        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
757                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
758                         " are you sure the address is correct?: %s", strerror(-ret));
759                return -1;
760        }
761#endif
762
763        /* Give the memory map a unique name */
764        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
765        /* rte_eal_init it makes a call to getopt so we need to reset the
766         * global optind variable of getopt otherwise this fails */
767        save_getopts(&save_opts);
768        optind = 1;
769        if ((ret = rte_eal_init(argc, argv)) < 0) {
770                snprintf(err, errlen,
771                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
772                return -1;
773        }
774        restore_getopts(&save_opts);
775        // These are still running but will never do anything with DPDK v1.7 we
776        // should remove this XXX in the future
777        for(i = 0; i < RTE_MAX_LCORE; ++i) {
778                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
779                        cfg->lcore_role[i] = ROLE_OFF;
780                        cfg->lcore_count--;
781                }
782        }
783        // Only the master should be running
784        assert(cfg->lcore_count == 1);
785
786        // TODO XXX TODO
787        dpdk_move_master_lcore(NULL, my_cpu-1);
788
789#if DEBUG
790        dump_configuration();
791#endif
792
793#if DPDK_USE_PMD_INIT
794        /* This registers all available NICs with Intel DPDK
795         * These are not loaded until rte_eal_pci_probe() is called.
796         */
797        if ((ret = rte_pmd_init_all()) < 0) {
798                snprintf(err, errlen,
799                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
800                return -1;
801        }
802#endif
803
804#if DPDK_USE_BLACKLIST
805        /* Blacklist all ports besides the one that we want to use */
806        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
807                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
808                         " are you sure the address is correct?: %s", strerror(-ret));
809                return -1;
810        }
811#endif
812
813#if DPDK_USE_PCI_PROBE
814        /* This loads DPDK drivers against all ports that are not blacklisted */
815        if ((ret = rte_eal_pci_probe()) < 0) {
816                snprintf(err, errlen,
817                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
818                return -1;
819        }
820#endif
821
822        format_data->nb_ports = rte_eth_dev_count();
823
824        if (format_data->nb_ports != 1) {
825                snprintf(err, errlen,
826                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
827                         format_data->nb_ports);
828                return -1;
829        }
830
831        return 0;
832}
833
834static int dpdk_init_input (libtrace_t *libtrace) {
835        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
836        char err[500];
837        err[0] = 0;
838
839        libtrace->format_data = (struct dpdk_format_data_t *)
840                                malloc(sizeof(struct dpdk_format_data_t));
841        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
842        FORMAT(libtrace)->nb_ports = 0;
843        FORMAT(libtrace)->snaplen = 0; /* Use default */
844        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
845        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
846        FORMAT(libtrace)->nic_numa_node = -1;
847        FORMAT(libtrace)->promisc = -1;
848        FORMAT(libtrace)->pktmbuf_pool = NULL;
849#if DPDK_USE_BLACKLIST
850        FORMAT(libtrace)->nb_blacklist = 0;
851#endif
852        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
853        FORMAT(libtrace)->mempool_name[0] = 0;
854        memset(FORMAT(libtrace)->burst_pkts, 0,
855               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
856        FORMAT(libtrace)->burst_size = 0;
857        FORMAT(libtrace)->burst_offset = 0;
858
859        /* Make our first stream */
860        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
861        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
862
863        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
864                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
865                free(libtrace->format_data);
866                libtrace->format_data = NULL;
867                return -1;
868        }
869        return 0;
870}
871
872static int dpdk_init_output(libtrace_out_t *libtrace)
873{
874        char err[500];
875        err[0] = 0;
876
877        libtrace->format_data = (struct dpdk_format_data_t *)
878                                malloc(sizeof(struct dpdk_format_data_t));
879        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
880        FORMAT(libtrace)->nb_ports = 0;
881        FORMAT(libtrace)->snaplen = 0; /* Use default */
882        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
883        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
884        FORMAT(libtrace)->nic_numa_node = -1;
885        FORMAT(libtrace)->promisc = -1;
886        FORMAT(libtrace)->pktmbuf_pool = NULL;
887#if DPDK_USE_BLACKLIST
888        FORMAT(libtrace)->nb_blacklist = 0;
889#endif
890        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
891        FORMAT(libtrace)->mempool_name[0] = 0;
892        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
893        FORMAT(libtrace)->burst_size = 0;
894        FORMAT(libtrace)->burst_offset = 0;
895
896        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
897                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
898                free(libtrace->format_data);
899                libtrace->format_data = NULL;
900                return -1;
901        }
902        return 0;
903}
904
905/**
906 * Note here snaplen excludes the MAC checksum. Packets over
907 * the requested snaplen will be dropped. (Excluding MAC checksum)
908 *
909 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
910 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
911 * is set the maximum size of the returned packet would be 1518 otherwise
912 * 1514 would be the largest size possibly returned.
913 *
914 */
915static int dpdk_config_input (libtrace_t *libtrace,
916                              trace_option_t option,
917                              void *data) {
918        switch (option) {
919        case TRACE_OPTION_SNAPLEN:
920                /* Only support changing snaplen before a call to start is
921                 * made */
922                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
923                        FORMAT(libtrace)->snaplen=*(int*)data;
924                else
925                        return -1;
926                return 0;
927        case TRACE_OPTION_PROMISC:
928                FORMAT(libtrace)->promisc=*(int*)data;
929                return 0;
930        case TRACE_OPTION_HASHER:
931                switch (*((enum hasher_types *) data))
932                {
933                case HASHER_BALANCE:
934                case HASHER_UNIDIRECTIONAL:
935                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
936                        return 0;
937                case HASHER_BIDIRECTIONAL:
938                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
939                        return 0;
940                case HASHER_CUSTOM:
941                        // We don't support these
942                        return -1;
943                }
944                break;
945        case TRACE_OPTION_FILTER:
946                /* TODO filtering */
947        case TRACE_OPTION_META_FREQ:
948        case TRACE_OPTION_EVENT_REALTIME:
949                break;
950        /* Avoid default: so that future options will cause a warning
951         * here to remind us to implement it, or flag it as
952         * unimplementable
953         */
954        }
955
956        /* Don't set an error - trace_config will try to deal with the
957         * option and will set an error if it fails */
958        return -1;
959}
960
961/* Can set jumbo frames/ or limit the size of a frame by setting both
962 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
963 *
964 */
965static struct rte_eth_conf port_conf = {
966        .rxmode = {
967                .mq_mode = ETH_RSS,
968                .split_hdr_size = 0,
969                .header_split   = 0, /**< Header Split disabled */
970                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
971                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
972                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
973                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
974#if GET_MAC_CRC_CHECKSUM
975/* So it appears that if hw_strip_crc is turned off the driver will still
976 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
977 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
978 * So lets just add it back on when we receive the packet.
979 */
980                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
981#else
982/* By default strip the MAC checksum because it's a bit of a hack to
983 * actually read these. And don't want to rely on disabling this to actualy
984 * always cut off the checksum in the future
985 */
986                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
987#endif
988        },
989        .txmode = {
990                .mq_mode = ETH_DCB_NONE,
991        },
992        .rx_adv_conf = {
993                .rss_conf = {
994                        // .rss_key = &rss_key, // We set this per format
995                        .rss_hf = RX_RSS_FLAGS,
996                },
997        },
998        .intr_conf = {
999                .lsc = 1
1000        }
1001};
1002
1003static const struct rte_eth_rxconf rx_conf = {
1004        .rx_thresh = {
1005                .pthresh = 8,/* RX_PTHRESH prefetch */
1006                .hthresh = 8,/* RX_HTHRESH host */
1007                .wthresh = 4,/* RX_WTHRESH writeback */
1008        },
1009        .rx_free_thresh = 0,
1010        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
1011};
1012
1013static const struct rte_eth_txconf tx_conf = {
1014        .tx_thresh = {
1015                /*
1016                 * TX_PTHRESH prefetch
1017                 * Set on the NIC, if the number of unprocessed descriptors to queued on
1018                 * the card fall below this try grab at least hthresh more unprocessed
1019                 * descriptors.
1020                 */
1021                .pthresh = 36,
1022
1023                /* TX_HTHRESH host
1024                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
1025                 */
1026                .hthresh = 0,
1027
1028                /* TX_WTHRESH writeback
1029                 * Set on the NIC, the number of sent descriptors before writing back
1030                 * status to confirm the transmission. This is done more efficiently as
1031                 * a bulk DMA-transfer rather than writing one at a time.
1032                 * Similar to tx_free_thresh however this is applied to the NIC, where
1033                 * as tx_free_thresh is when DPDK will check these. This is extended
1034                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
1035                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
1036                 */
1037                .wthresh = 4,
1038        },
1039
1040        /* Used internally by DPDK rather than passed to the NIC. The number of
1041         * packet descriptors to send before checking for any responses written
1042         * back (to confirm the transmission). Default = 32 if set to 0)
1043         */
1044        .tx_free_thresh = 0,
1045
1046        /* This is the Report Status threshold, used by 10Gbit cards,
1047         * This signals the card to only write back status (such as
1048         * transmission successful) after this minimum number of transmit
1049         * descriptors are seen. The default is 32 (if set to 0) however if set
1050         * to greater than 1 TX wthresh must be set to zero, because this is kindof
1051         * a replacement. See the dpdk programmers guide for more restrictions.
1052         */
1053        .tx_rs_thresh = 1,
1054};
1055
1056/**
1057 * A callback for a link state change (LSC).
1058 *
1059 * Packets may be received before this notification. In fact the DPDK IGXBE
1060 * driver likes to put a delay upto 5sec before sending this.
1061 *
1062 * We use this to ensure the link speed is correct for our timestamp
1063 * calculations. Because packets might be received before the link up we still
1064 * update this when the packet is received.
1065 *
1066 * @param port The DPDK port
1067 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1068 * @param cb_arg The dpdk_format_data_t structure associated with the format
1069 */
1070static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1071                              void *cb_arg) {
1072        struct dpdk_format_data_t * format_data = cb_arg;
1073        struct rte_eth_link link_info;
1074        assert(event == RTE_ETH_EVENT_INTR_LSC);
1075        assert(port == format_data->port);
1076
1077        rte_eth_link_get_nowait(port, &link_info);
1078
1079        if (link_info.link_status)
1080                format_data->link_speed = link_info.link_speed;
1081        else
1082                format_data->link_speed = 0;
1083
1084#if DEBUG
1085        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1086                link_info.link_status ? "up" : "down",
1087                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1088                                          "full-duplex" : "half-duplex",
1089                (int) link_info.link_speed);
1090#endif
1091
1092        /* Turns out DPDK drivers might not come back up if the link speed
1093         * changes. So we reset the autoneg procedure. This is very unsafe
1094         * we have have threads reading packets and we stop the port. */
1095#if 0
1096        if (!link_info.link_status) {
1097                int ret;
1098                rte_eth_dev_stop(port);
1099                ret = rte_eth_dev_start(port);
1100                if (ret < 0) {
1101                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1102                                strerror(-ret));
1103                }
1104        }
1105#endif
1106}
1107
1108/** Reserve a DPDK lcore ID for a thread globally.
1109 *
1110 * @param real If true allocate a real lcore, otherwise allocate a core which
1111 * does not exist on the local machine.
1112 * @param socket the prefered NUMA socket - only used if a real core is requested
1113 * @return a valid core, which can later be used with dpdk_register_lcore() or a
1114 * -1 if have run out of cores.
1115 *
1116 * If any thread is reading or freeing packets we need to register it here
1117 * due to TLS caches in the memory pools.
1118 */
1119static int dpdk_reserve_lcore(bool real, int socket) {
1120        int new_id = -1;
1121        int i;
1122        struct rte_config *cfg = rte_eal_get_configuration();
1123        (void) socket;
1124
1125        pthread_mutex_lock(&dpdk_lock);
1126        /* If 'reading packets' fill in cores from 0 up and bind affinity
1127         * otherwise start from the MAX core (which is also the master) and work backwards
1128         * in this case physical cores on the system will not exist so we don't bind
1129         * these to any particular physical core */
1130        if (real) {
1131#if HAVE_LIBNUMA
1132                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1133                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
1134                                new_id = i;
1135                                if (!lcore_config[i].detected)
1136                                        new_id = -1;
1137                                break;
1138                        }
1139                }
1140#endif
1141                /* Retry without the the numa restriction */
1142                if (new_id == -1) {
1143                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1144                                if (!rte_lcore_is_enabled(i)) {
1145                                        new_id = i;
1146                                        if (!lcore_config[i].detected)
1147                                                fprintf(stderr, "Warning the"
1148                                                        " number of 'reading' "
1149                                                        "threads exceed cores\n");
1150                                        break;
1151                                }
1152                        }
1153                }
1154        } else {
1155                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1156                        if (!rte_lcore_is_enabled(i)) {
1157                                new_id = i;
1158                                break;
1159                        }
1160                }
1161        }
1162
1163        if (new_id != -1) {
1164                /* Enable the core in global DPDK structs */
1165                cfg->lcore_role[new_id] = ROLE_RTE;
1166                cfg->lcore_count++;
1167        }
1168
1169        pthread_mutex_unlock(&dpdk_lock);
1170        return new_id;
1171}
1172
1173/** Register a thread as a lcore
1174 * @param libtrace any error is set against libtrace on exit
1175 * @param real If this is a true lcore we will bind its affinty to the
1176 * requested core.
1177 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
1178 * @return 0, if successful otherwise -1 if an error occured (details are stored
1179 * in libtrace)
1180 *
1181 * @note This must be called from the thread being registered.
1182 */
1183static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
1184        int ret;
1185        RTE_PER_LCORE(_lcore_id) = lcore;
1186
1187        /* Set affinity bind to corresponding core */
1188        if (real) {
1189                cpu_set_t cpuset;
1190                CPU_ZERO(&cpuset);
1191                CPU_SET(rte_lcore_id(), &cpuset);
1192                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1193                if (ret != 0) {
1194                        trace_set_err(libtrace, errno, "Warning "
1195                                      "pthread_setaffinity_np failed");
1196                        return -1;
1197                }
1198        }
1199
1200        return 0;
1201}
1202
1203/** Allocates a new dpdk packet buffer memory pool.
1204 *
1205 * @param n The number of threads
1206 * @param pkt_size The packet size we need ot store
1207 * @param socket_id The NUMA socket id
1208 * @param A new mempool, if NULL query the DPDK library for the error code
1209 * see rte_mempool_create() documentation.
1210 *
1211 * This allocates a new pool or recycles an existing memory pool.
1212 * Call dpdk_free_memory() to free the memory.
1213 * We cannot delete memory so instead we store the pools, allowing them to be
1214 * re-used.
1215 */
1216static struct rte_mempool *dpdk_alloc_memory(unsigned n,
1217                                             unsigned pkt_size,
1218                                             int socket_id) {
1219        struct rte_mempool *ret;
1220        size_t j,k;
1221        char name[MEMPOOL_NAME_LEN];
1222
1223        /* Add on packet size overheads */
1224        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1225
1226        pthread_mutex_lock(&dpdk_lock);
1227
1228        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
1229                /* Best guess go for zero */
1230                socket_id = 0;
1231        }
1232
1233        /* Find a valid pool */
1234        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
1235                if (mem_pools[socket_id][j]->size >= n &&
1236                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
1237                        break;
1238                }
1239        }
1240
1241        /* Find the end (+1) of the list */
1242        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
1243
1244        if (mem_pools[socket_id][j]) {
1245                ret = mem_pools[socket_id][j];
1246                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
1247                mem_pools[socket_id][k-1] = NULL;
1248                mem_pools[socket_id][j] = NULL;
1249        } else {
1250                static uint32_t test = 10;
1251                test++;
1252                snprintf(name, MEMPOOL_NAME_LEN,
1253                         "libtrace_pool_%"PRIu32, test);
1254
1255                ret = rte_mempool_create(name, n, pkt_size,
1256                                         128, sizeof(struct rte_pktmbuf_pool_private),
1257                                         rte_pktmbuf_pool_init, NULL,
1258                                         rte_pktmbuf_init, NULL,
1259                                         socket_id, 0);
1260        }
1261
1262        pthread_mutex_unlock(&dpdk_lock);
1263        return ret;
1264}
1265
1266/** Stores the memory against the DPDK library.
1267 *
1268 * @param mempool The mempool to free
1269 * @param socket_id The NUMA socket this mempool was allocated upon.
1270 *
1271 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
1272 * store the memory shared globally against the format.
1273 */
1274static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
1275        size_t i;
1276        pthread_mutex_lock(&dpdk_lock);
1277
1278        /* We should have all entries back in the mempool */
1279        rte_mempool_audit(mempool);
1280        if (!rte_mempool_full(mempool)) {
1281                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
1282                        "free all packets before finishing a trace\n",
1283                        rte_mempool_avail_count(mempool), mempool->size);
1284        }
1285
1286        /* Find the end (+1) of the list */
1287        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
1288
1289        if (i >= RTE_MAX_LCORE) {
1290                fprintf(stderr, "Too many memory pools, dropping this one\n");
1291        } else {
1292                mem_pools[socket_id][i] = mempool;
1293        }
1294
1295        pthread_mutex_unlock(&dpdk_lock);
1296}
1297
1298/* Attach memory to the port and start (or restart) the port/s.
1299 */
1300static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1301                              char *err, int errlen, uint16_t rx_queues) {
1302        int ret, i;
1303        struct rte_eth_link link_info; /* Wait for link */
1304        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
1305
1306        /* Already started */
1307        if (format_data->paused == DPDK_RUNNING)
1308                return 0;
1309
1310        /* First time started we need to alloc our memory, doing this here
1311         * rather than in environment setup because we don't have snaplen then */
1312        if (format_data->paused == DPDK_NEVER_STARTED) {
1313                if (format_data->snaplen == 0) {
1314                        format_data->snaplen = RX_MBUF_SIZE;
1315                        port_conf.rxmode.jumbo_frame = 0;
1316                        port_conf.rxmode.max_rx_pkt_len = 0;
1317                } else {
1318                        /* Use jumbo frames */
1319                        port_conf.rxmode.jumbo_frame = 1;
1320                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1321                }
1322
1323#if GET_MAC_CRC_CHECKSUM
1324                /* This is additional overhead so make sure we allow space for this */
1325                format_data->snaplen += ETHER_CRC_LEN;
1326#endif
1327#if HAS_HW_TIMESTAMPS_82580
1328                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1329#endif
1330
1331                /* Create the mbuf pool, which is the place packets are allocated
1332                 * from - There is no free function (I cannot see one).
1333                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1334                 * allocate however that extra 1 packet is not used.
1335                 * (I assume <= vs < error some where in DPDK code)
1336                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1337                 * so that will fill the new buffer and wait until slots in the
1338                 * ring become available.
1339                 */
1340#if DEBUG
1341                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1342#endif
1343                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
1344                                                              format_data->snaplen,
1345                                                              format_data->nic_numa_node);
1346
1347                if (format_data->pktmbuf_pool == NULL) {
1348                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1349                                 "pool failed: %s", strerror(rte_errno));
1350                        return -1;
1351                }
1352        }
1353
1354        /* ----------- Now do the setup for the port mapping ------------ */
1355        /* Order of calls must be
1356         * rte_eth_dev_configure()
1357         * rte_eth_tx_queue_setup()
1358         * rte_eth_rx_queue_setup()
1359         * rte_eth_dev_start()
1360         * other rte_eth calls
1361         */
1362
1363        /* This must be called first before another *eth* function
1364         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1365        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1366        if (ret < 0) {
1367                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1368                         " %"PRIu8" : %s", format_data->port,
1369                         strerror(-ret));
1370                return -1;
1371        }
1372#if DEBUG
1373        fprintf(stderr, "Doing dev configure\n");
1374#endif
1375        /* Initialise the TX queue a minimum value if using this port for
1376         * receiving. Otherwise a larger size if writing packets.
1377         */
1378        ret = rte_eth_tx_queue_setup(format_data->port,
1379                                     0 /* queue XXX */,
1380                                     format_data->nb_tx_buf,
1381                                     SOCKET_ID_ANY,
1382                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
1383        if (ret < 0) {
1384                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1385                         " on port %"PRIu8" : %s", format_data->port,
1386                         strerror(-ret));
1387                return -1;
1388        }
1389
1390        /* Attach memory to our RX queues */
1391        for (i=0; i < rx_queues; i++) {
1392                dpdk_per_stream_t *stream;
1393#if DEBUG
1394                fprintf(stderr, "Configuring queue %d\n", i);
1395#endif
1396
1397                /* Add storage for the stream */
1398                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1399                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1400                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1401                stream->queue_id = i;
1402
1403                if (stream->lcore == -1)
1404                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
1405
1406                if (stream->lcore == -1) {
1407                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
1408                                 ". Too many threads?");
1409                        return -1;
1410                }
1411
1412                if (stream->mempool == NULL) {
1413                        stream->mempool = dpdk_alloc_memory(
1414                                                  format_data->nb_rx_buf*2,
1415                                                  format_data->snaplen,
1416                                                  rte_lcore_to_socket_id(stream->lcore));
1417
1418                        if (stream->mempool == NULL) {
1419                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1420                                         "pool failed: %s", strerror(rte_errno));
1421                                return -1;
1422                        }
1423                }
1424
1425                /* Initialise the RX queue with some packets from memory */
1426                ret = rte_eth_rx_queue_setup(format_data->port,
1427                                             stream->queue_id,
1428                                             format_data->nb_rx_buf,
1429                                             format_data->nic_numa_node,
1430                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
1431                                             stream->mempool);
1432                if (ret < 0) {
1433                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1434                                 " RX queue on port %"PRIu8" : %s",
1435                                 format_data->port,
1436                                 strerror(-ret));
1437                        return -1;
1438                }
1439        }
1440
1441#if DEBUG
1442        fprintf(stderr, "Doing start device\n");
1443#endif
1444        rte_eth_stats_reset(format_data->port);
1445        /* Start device */
1446        ret = rte_eth_dev_start(format_data->port);
1447        if (ret < 0) {
1448                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1449                         strerror(-ret));
1450                return -1;
1451        }
1452
1453        /* Default promiscuous to on */
1454        if (format_data->promisc == -1)
1455                format_data->promisc = 1;
1456
1457        if (format_data->promisc == 1)
1458                rte_eth_promiscuous_enable(format_data->port);
1459        else
1460                rte_eth_promiscuous_disable(format_data->port);
1461
1462        /* We have now successfully started/unpased */
1463        format_data->paused = DPDK_RUNNING;
1464
1465
1466        /* Register a callback for link state changes */
1467        ret = rte_eth_dev_callback_register(format_data->port,
1468                                            RTE_ETH_EVENT_INTR_LSC,
1469                                            dpdk_lsc_callback,
1470                                            format_data);
1471#if DEBUG
1472        if (ret)
1473                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1474                        ret, strerror(-ret));
1475#endif
1476
1477        /* Get the current link status */
1478        rte_eth_link_get_nowait(format_data->port, &link_info);
1479        format_data->link_speed = link_info.link_speed;
1480#if DEBUG
1481        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1482                (int) link_info.link_duplex, (int) link_info.link_speed);
1483#endif
1484
1485        return 0;
1486}
1487
1488static int dpdk_start_input (libtrace_t *libtrace) {
1489        char err[500];
1490        err[0] = 0;
1491
1492        /* Make sure we don't reserve an extra thread for this */
1493        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
1494
1495        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1496                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1497                free(libtrace->format_data);
1498                libtrace->format_data = NULL;
1499                return -1;
1500        }
1501        return 0;
1502}
1503
1504static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1505        struct rte_eth_dev_info dev_info;
1506        rte_eth_dev_info_get(port_id, &dev_info);
1507        return dev_info.max_rx_queues;
1508}
1509
1510static inline size_t dpdk_processor_count () {
1511        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1512        if (nb_cpu <= 0)
1513                return 1;
1514        else
1515                return (size_t) nb_cpu;
1516}
1517
1518static int dpdk_pstart_input (libtrace_t *libtrace) {
1519        char err[500];
1520        int i=0, phys_cores=0;
1521        int tot = libtrace->perpkt_thread_count;
1522        libtrace_list_node_t *n;
1523        err[0] = 0;
1524
1525        if (rte_lcore_id() != rte_get_master_lcore())
1526                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1527                        " from the master DPDK thread!\n");
1528
1529        /* If the master is not on the last thread we move it there */
1530        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1531                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1532                        return -1;
1533        }
1534
1535        /* Don't exceed the number of cores in the system/detected by dpdk
1536         * We don't have to force this but performance wont be good if we don't */
1537        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1538                if (lcore_config[i].detected) {
1539                        if (rte_lcore_is_enabled(i)) {
1540#if DEBUG
1541                                fprintf(stderr, "Found core %d already in use!\n", i);
1542#endif
1543                        } else {
1544                                phys_cores++;
1545                        }
1546                }
1547        }
1548        /* If we are restarting we have already allocated some threads as such
1549         * we add these back to the count for this calculation */
1550        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
1551                dpdk_per_stream_t * stream = n->data;
1552                if (stream->lcore != -1)
1553                        phys_cores++;
1554        }
1555
1556        tot = MIN(libtrace->perpkt_thread_count,
1557                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1558        tot = MIN(tot, phys_cores);
1559
1560#if DEBUG
1561        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1562                libtrace->perpkt_thread_count, phys_cores);
1563#endif
1564
1565        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1566                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1567                free(libtrace->format_data);
1568                libtrace->format_data = NULL;
1569                return -1;
1570        }
1571
1572        /* Make sure we only start the number that we should */
1573        libtrace->perpkt_thread_count = tot;
1574        return 0;
1575}
1576
1577/**
1578 * Register a thread with the DPDK system,
1579 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1580 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1581 * gives it.
1582 *
1583 * We then allow a mapper thread to be started on every real core as DPDK would,
1584 * we also bind these to the corresponding CPU cores.
1585 *
1586 * @param libtrace A pointer to the trace
1587 * @param reading True if the thread will be used to read packets, i.e. will
1588 *                call pread_packet(), false if thread used to process packet
1589 *                in any other manner including statistics functions.
1590 */
1591static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1592{
1593#if DEBUG
1594        char name[99];
1595        name[0] = 0;
1596#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
1597        pthread_getname_np(pthread_self(),
1598                           name, sizeof(name));
1599#endif
1600#endif
1601        if (reading) {
1602                dpdk_per_stream_t *stream;
1603                /* Attach our thread */
1604                if(t->type == THREAD_PERPKT) {
1605                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1606                        if (t->format_data == NULL) {
1607                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1608                                              "Too many threads registered");
1609                                return -1;
1610                        }
1611                } else {
1612                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1613                }
1614                stream = t->format_data;
1615#if DEBUG
1616                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
1617#endif
1618                return dpdk_register_lcore(libtrace, true, stream->lcore);
1619        } else {
1620                int lcore = dpdk_reserve_lcore(reading, 0);
1621                if (lcore == -1) {
1622                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1623                                      " for DPDK");
1624                        return -1;
1625                }
1626#if DEBUG
1627                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
1628#endif
1629                return dpdk_register_lcore(libtrace, false, lcore);
1630        }
1631
1632        return 0;
1633}
1634
1635/**
1636 * Unregister a thread with the DPDK system.
1637 *
1638 * Only previously registered threads should be calling this just before
1639 * they are destroyed.
1640 */
1641static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1642{
1643        struct rte_config *cfg = rte_eal_get_configuration();
1644
1645        assert(rte_lcore_id() < RTE_MAX_LCORE);
1646        pthread_mutex_lock(&dpdk_lock);
1647        /* Skip if master */
1648        if (rte_lcore_id() == rte_get_master_lcore()) {
1649                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1650                pthread_mutex_unlock(&dpdk_lock);
1651                return;
1652        }
1653
1654        /* Disable this core in global DPDK structs */
1655        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1656        cfg->lcore_count--;
1657        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1658        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1659        pthread_mutex_unlock(&dpdk_lock);
1660        return;
1661}
1662
1663static int dpdk_start_output(libtrace_out_t *libtrace)
1664{
1665        char err[500];
1666        err[0] = 0;
1667
1668        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1669                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1670                free(libtrace->format_data);
1671                libtrace->format_data = NULL;
1672                return -1;
1673        }
1674        return 0;
1675}
1676
1677static int dpdk_pause_input(libtrace_t * libtrace) {
1678        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1679        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1680        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1681#if DEBUG
1682                fprintf(stderr, "Pausing DPDK port\n");
1683#endif
1684                rte_eth_dev_stop(FORMAT(libtrace)->port);
1685                FORMAT(libtrace)->paused = DPDK_PAUSED;
1686                /* Empty the queue of packets */
1687                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1688                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1689                }
1690                FORMAT(libtrace)->burst_offset = 0;
1691                FORMAT(libtrace)->burst_size = 0;
1692
1693                for (; tmp != NULL; tmp = tmp->next) {
1694                        dpdk_per_stream_t *stream = tmp->data;
1695                        stream->ts_last_sys = 0;
1696#if HAS_HW_TIMESTAMPS_82580
1697                        stream->ts_first_sys = 0;
1698#endif
1699                }
1700
1701        }
1702        return 0;
1703}
1704
1705static int dpdk_write_packet(libtrace_out_t *trace,
1706                             libtrace_packet_t *packet){
1707        struct rte_mbuf* m_buff[1];
1708
1709        int wirelen = trace_get_wire_length(packet);
1710        int caplen = trace_get_capture_length(packet);
1711
1712        /* Check for a checksum and remove it */
1713        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1714            wirelen == caplen)
1715                caplen -= ETHER_CRC_LEN;
1716
1717        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1718        if (m_buff[0] == NULL) {
1719                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1720                return -1;
1721        } else {
1722                int ret;
1723                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1724                do {
1725                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1726                } while (ret != 1);
1727        }
1728
1729        return 0;
1730}
1731
1732static int dpdk_fin_input(libtrace_t * libtrace) {
1733        libtrace_list_node_t * n;
1734        /* Free our memory structures */
1735        if (libtrace->format_data != NULL) {
1736
1737                if (FORMAT(libtrace)->port != 0xFF)
1738                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1739                                                        RTE_ETH_EVENT_INTR_LSC,
1740                                                        dpdk_lsc_callback,
1741                                                        FORMAT(libtrace));
1742                /* Close the device completely, device cannot be restarted */
1743                rte_eth_dev_close(FORMAT(libtrace)->port);
1744
1745                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
1746                                 FORMAT(libtrace)->nic_numa_node);
1747
1748                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
1749                        dpdk_per_stream_t * stream = n->data;
1750                        if (stream->mempool)
1751                                dpdk_free_memory(stream->mempool,
1752                                                 rte_lcore_to_socket_id(stream->lcore));
1753                }
1754
1755                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1756                /* filter here if we used it */
1757                free(libtrace->format_data);
1758        }
1759
1760        return 0;
1761}
1762
1763
1764static int dpdk_fin_output(libtrace_out_t * libtrace) {
1765        /* Free our memory structures */
1766        if (libtrace->format_data != NULL) {
1767                /* Close the device completely, device cannot be restarted */
1768                if (FORMAT(libtrace)->port != 0xFF)
1769                        rte_eth_dev_close(FORMAT(libtrace)->port);
1770                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1771                /* filter here if we used it */
1772                free(libtrace->format_data);
1773        }
1774
1775        return 0;
1776}
1777
1778/**
1779 * Get the start of the additional header that we added to a packet.
1780 */
1781static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1782        assert(packet);
1783        assert(packet->buffer);
1784        /* Our header sits straight after the mbuf header */
1785        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1786}
1787
1788static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1789        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1790        return hdr->cap_len;
1791}
1792
1793static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1794        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1795        if (size > hdr->cap_len) {
1796                /* Cannot make a packet bigger */
1797                return trace_get_capture_length(packet);
1798        }
1799
1800        /* Reset the cached capture length first*/
1801        packet->capture_length = -1;
1802        hdr->cap_len = (uint32_t) size;
1803        return trace_get_capture_length(packet);
1804}
1805
1806static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1807        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1808        int org_cap_size; /* The original capture size */
1809        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1810                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1811                               sizeof(struct hw_timestamp_82580);
1812        } else {
1813                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1814        }
1815        if (hdr->flags & INCLUDES_CHECKSUM) {
1816                return org_cap_size;
1817        } else {
1818                /* DPDK packets are always TRACE_TYPE_ETH packets */
1819                return org_cap_size + ETHER_CRC_LEN;
1820        }
1821}
1822
1823static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1824        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1825        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1826                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1827                                sizeof(struct hw_timestamp_82580);
1828        else
1829                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1830}
1831
1832static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1833                               libtrace_packet_t *packet, void *buffer,
1834                               libtrace_rt_types_t rt_type, uint32_t flags) {
1835        assert(packet);
1836        if (packet->buffer != buffer &&
1837            packet->buf_control == TRACE_CTRL_PACKET) {
1838                free(packet->buffer);
1839        }
1840
1841        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1842                packet->buf_control = TRACE_CTRL_PACKET;
1843        else
1844                packet->buf_control = TRACE_CTRL_EXTERNAL;
1845
1846        packet->buffer = buffer;
1847        packet->header = buffer;
1848
1849        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1850        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1851        packet->type = rt_type;
1852        return 0;
1853}
1854
1855/**
1856 * Given a packet size and a link speed, computes the
1857 * time to transmit in nanoseconds.
1858 *
1859 * @param format_data The dpdk format data from which we get the link speed
1860 *        and if unset updates it in a thread safe manner
1861 * @param pkt_size The size of the packet in bytes
1862 * @return The wire time in nanoseconds
1863 */
1864static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1865        uint32_t wire_time;
1866        /* 20 extra bytes of interframe gap and preamble */
1867# if GET_MAC_CRC_CHECKSUM
1868        wire_time = ((pkt_size + 20) * 8000);
1869# else
1870        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1871# endif
1872
1873        /* Division is really slow and introduces a pipeline stall
1874         * The compiler will optimise this into magical multiplication and shifting
1875         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1876         */
1877retry_calc_wiretime:
1878        switch (format_data->link_speed) {
1879        case ETH_SPEED_NUM_40G:
1880                wire_time /=  ETH_SPEED_NUM_40G;
1881                break;
1882        case ETH_SPEED_NUM_20G:
1883                wire_time /= ETH_SPEED_NUM_20G;
1884                break;
1885        case ETH_SPEED_NUM_10G:
1886                wire_time /= ETH_SPEED_NUM_10G;
1887                break;
1888        case ETH_SPEED_NUM_1G:
1889                wire_time /= ETH_SPEED_NUM_1G;
1890                break;
1891        case 0:
1892                {
1893                /* Maybe the link was down originally, but now it should be up */
1894                struct rte_eth_link link = {0};
1895                rte_eth_link_get_nowait(format_data->port, &link);
1896                if (link.link_status && link.link_speed) {
1897                        format_data->link_speed = link.link_speed;
1898#ifdef DEBUG
1899                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1900#endif
1901                        goto retry_calc_wiretime;
1902                }
1903                /* We don't know the link speed, make sure numbers are counting up */
1904                wire_time = 1;
1905                break;
1906                }
1907        default:
1908                wire_time /= format_data->link_speed;
1909        }
1910        return wire_time;
1911}
1912
1913/**
1914 * Does any extra preperation to all captured packets
1915 * This includes adding our extra header to it with the timestamp,
1916 * and any snapping
1917 *
1918 * @param format_data The DPDK format data
1919 * @param plc The DPDK per lcore format data
1920 * @param pkts An array of size nb_pkts of DPDK packets
1921 */
1922static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1923                                   struct dpdk_per_stream_t *plc,
1924                                   struct rte_mbuf **pkts,
1925                                   size_t nb_pkts) {
1926        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1927        struct dpdk_addt_hdr *hdr;
1928        size_t i;
1929        uint64_t cur_sys_time_ns;
1930#if HAS_HW_TIMESTAMPS_82580
1931        struct hw_timestamp_82580 *hw_ts;
1932        uint64_t estimated_wraps;
1933#else
1934
1935#endif
1936
1937#if USE_CLOCK_GETTIME
1938        struct timespec cur_sys_time = {0};
1939        /* This looks terrible and I feel bad doing it. But it's OK
1940         * on new kernels, because this is a fast vsyscall */
1941        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1942        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1943#else
1944        struct timeval cur_sys_time = {0};
1945        /* Also a fast vsyscall */
1946        gettimeofday(&cur_sys_time, NULL);
1947        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1948#endif
1949
1950        /* The system clock is not perfect so when running
1951         * at linerate we could timestamp a packet in the past.
1952         * To avoid this we munge the timestamp to appear 1ns
1953         * after the previous packet. We should eventually catch up
1954         * to system time since a 64byte packet on a 10G link takes 67ns.
1955         *
1956         * Note with parallel readers timestamping packets
1957         * with duplicate stamps or out of order is unavoidable without
1958         * hardware timestamping from the NIC.
1959         */
1960#if !HAS_HW_TIMESTAMPS_82580
1961        if (plc->ts_last_sys >= cur_sys_time_ns) {
1962                cur_sys_time_ns = plc->ts_last_sys + 1;
1963        }
1964#endif
1965
1966        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1967        for (i = 0 ; i < nb_pkts ; ++i) {
1968
1969                /* We put our header straight after the dpdk header */
1970                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1971                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1972
1973#if GET_MAC_CRC_CHECKSUM
1974                /* Add back in the CRC sum */
1975                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1976                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1977                hdr->flags |= INCLUDES_CHECKSUM;
1978#endif
1979
1980                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1981
1982#if HAS_HW_TIMESTAMPS_82580
1983                /* The timestamp is sitting before our packet and is included in pkt_len */
1984                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1985                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1986                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1987
1988                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1989                 *
1990                 *        +----------+---+   +--------------+
1991                 *  82580 |    24    | 8 |   |      32      |
1992                 *        +----------+---+   +--------------+
1993                 *          reserved  \______ 40 bits _____/
1994                 *
1995                 * The 40 bit 82580 SYSTIM overflows every
1996                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1997                 *
1998                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1999                 * Endian (for the full 64 bits) i.e. picture is mirrored
2000                 */
2001
2002                /* Despite what the documentation says this is in Little
2003                 * Endian byteorder. Mask the reserved section out.
2004                 */
2005                hdr->timestamp = le64toh(hw_ts->timestamp) &
2006                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
2007
2008                if (unlikely(plc->ts_first_sys == 0)) {
2009                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
2010                        plc->ts_last_sys = plc->ts_first_sys;
2011                }
2012
2013                /* This will have serious problems if packets aren't read quickly
2014                 * that is within a couple of seconds because our clock cycles every
2015                 * 18 seconds */
2016                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
2017                                  / (1ull<<TS_NBITS_82580);
2018
2019                /* Estimated_wraps gives the number of times the counter should have
2020                 * wrapped (however depending on value last time it could have wrapped
2021                 * twice more (if hw clock is close to its max value) or once less (allowing
2022                 * for a bit of variance between hw and sys clock). But if the clock
2023                 * shouldn't have wrapped once then don't allow it to go backwards in time */
2024                if (unlikely(estimated_wraps >= 2)) {
2025                        /* 2 or more wrap arounds add all but the very last wrap */
2026                        plc->wrap_count += estimated_wraps - 1;
2027                }
2028
2029                /* Set the timestamp to the lowest possible value we're considering */
2030                hdr->timestamp += plc->ts_first_sys +
2031                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
2032
2033                /* In most runs only the first if() will need evaluating - i.e our
2034                 * estimate is correct. */
2035                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
2036                                              hdr->timestamp, MAXSKEW_82580))) {
2037                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
2038                        plc->wrap_count++;
2039                        hdr->timestamp += (1ull<<TS_NBITS_82580);
2040                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
2041                                             hdr->timestamp, MAXSKEW_82580)) {
2042                                /* Failed to match estimated_wraps */
2043                                plc->wrap_count++;
2044                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2045                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2046                                                     hdr->timestamp, MAXSKEW_82580)) {
2047                                        if (estimated_wraps == 0) {
2048                                                /* 0 case Failed to match estimated_wraps+2 */
2049                                                printf("WARNING - Hardware Timestamp failed to"
2050                                                       " match using systemtime!\n");
2051                                                hdr->timestamp = cur_sys_time_ns;
2052                                        } else {
2053                                                /* Failed to match estimated_wraps+1 */
2054                                                plc->wrap_count++;
2055                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
2056                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
2057                                                                     hdr->timestamp, MAXSKEW_82580)) {
2058                                                        /* Failed to match estimated_wraps+2 */
2059                                                        printf("WARNING - Hardware Timestamp failed to"
2060                                                               " match using systemtime!!\n");
2061                                                }
2062                                        }
2063                                }
2064                        }
2065                }
2066#else
2067
2068                hdr->timestamp = cur_sys_time_ns;
2069                /* Offset the next packet by the wire time of previous */
2070                calculate_wire_time(format_data, hdr->cap_len);
2071
2072#endif
2073        }
2074
2075        plc->ts_last_sys = cur_sys_time_ns;
2076        return;
2077}
2078
2079
2080static void dpdk_fin_packet(libtrace_packet_t *packet)
2081{
2082        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2083                rte_pktmbuf_free(packet->buffer);
2084                packet->buffer = NULL;
2085        }
2086}
2087
2088/** Reads at least one packet or returns an error
2089 */
2090static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
2091                                           dpdk_per_stream_t *stream,
2092                                           libtrace_message_queue_t *mesg,
2093                                           struct rte_mbuf* pkts_burst[],
2094                                           size_t nb_packets) {
2095        size_t nb_rx; /* Number of rx packets we've recevied */
2096        while (1) {
2097                /* Poll for a batch of packets */
2098                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2099                                         stream->queue_id, pkts_burst, nb_packets);
2100                if (nb_rx > 0) {
2101                        /* Got some packets - otherwise we keep spining */
2102                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
2103                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2104                        return nb_rx;
2105                }
2106                /* Check the message queue this could be less than 0 */
2107                if (mesg && libtrace_message_queue_count(mesg) > 0)
2108                        return READ_MESSAGE;
2109                if (libtrace_halt)
2110                        return READ_EOF;
2111                /* Wait a while, polling on memory degrades performance
2112                 * This relieves the pressure on memory allowing the NIC to DMA */
2113                rte_delay_us(10);
2114        }
2115
2116        /* We'll never get here - but if we did it would be bad */
2117        return READ_ERROR;
2118}
2119
2120static int dpdk_pread_packets (libtrace_t *libtrace,
2121                                    libtrace_thread_t *t,
2122                                    libtrace_packet_t **packets,
2123                                    size_t nb_packets) {
2124        int nb_rx; /* Number of rx packets we've recevied */
2125        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2126        int i;
2127        dpdk_per_stream_t *stream = t->format_data;
2128
2129        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
2130                                         pkts_burst, nb_packets);
2131
2132        if (nb_rx > 0) {
2133                for (i = 0; i < nb_rx; ++i) {
2134                        if (packets[i]->buffer != NULL) {
2135                                /* The packet should always be finished */
2136                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
2137                                free(packets[i]->buffer);
2138                        }
2139                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2140                        packets[i]->type = TRACE_RT_DATA_DPDK;
2141                        packets[i]->buffer = pkts_burst[i];
2142                        packets[i]->trace = libtrace;
2143                        packets[i]->error = 1;
2144                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
2145                }
2146        }
2147
2148        return nb_rx;
2149}
2150
2151static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2152        int nb_rx; /* Number of rx packets we've received */
2153        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
2154
2155        /* Free the last packet buffer */
2156        if (packet->buffer != NULL) {
2157                /* The packet should always be finished */
2158                assert(packet->buf_control == TRACE_CTRL_PACKET);
2159                free(packet->buffer);
2160                packet->buffer = NULL;
2161        }
2162
2163        packet->buf_control = TRACE_CTRL_EXTERNAL;
2164        packet->type = TRACE_RT_DATA_DPDK;
2165
2166        /* Check if we already have some packets buffered */
2167        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2168                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2169                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2170                return 1; // TODO should be bytes read, which essentially useless anyway
2171        }
2172
2173        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
2174                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2175
2176        if (nb_rx > 0) {
2177                FORMAT(libtrace)->burst_size = nb_rx;
2178                FORMAT(libtrace)->burst_offset = 1;
2179                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2180                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2181                return 1;
2182        }
2183        return nb_rx;
2184}
2185
2186static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2187        struct timeval tv;
2188        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2189
2190        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2191        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2192        return tv;
2193}
2194
2195static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2196        struct timespec ts;
2197        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2198
2199        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2200        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2201        return ts;
2202}
2203
2204static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2205        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2206}
2207
2208static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2209        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2210        return (libtrace_direction_t) hdr->direction;
2211}
2212
2213static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2214        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2215        hdr->direction = (uint8_t) direction;
2216        return (libtrace_direction_t) hdr->direction;
2217}
2218
2219static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2220        struct rte_eth_stats dev_stats = {0};
2221
2222        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2223                return;
2224
2225        /* Grab the current stats */
2226        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2227
2228        stats->captured_valid = true;
2229        stats->captured = dev_stats.ipackets;
2230
2231        stats->dropped_valid = true;
2232        stats->dropped = dev_stats.imissed;
2233
2234#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
2235        /* DPDK commit 86057c fixes ensures missed does not get counted as
2236         * errors */
2237        stats->errors_valid = true;
2238        stats->errors = dev_stats.ierrors;
2239#else
2240        /* DPDK errors includes drops */
2241        stats->errors_valid = true;
2242        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2243#endif
2244        stats->received_valid = true;
2245        stats->received = dev_stats.ipackets + dev_stats.imissed;
2246
2247}
2248
2249/* Attempts to read a packet in a non-blocking fashion. If one is not
2250 * available a SLEEP event is returned. We do not have the ability to
2251 * create a select()able file descriptor in DPDK.
2252 */
2253static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2254                                            libtrace_packet_t *packet) {
2255        libtrace_eventobj_t event = {0,0,0.0,0};
2256        size_t nb_rx; /* Number of received packets we've read */
2257
2258        do {
2259
2260                /* No packets waiting in our buffer? Try and read some more */
2261                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
2262                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2263                                                 FORMAT_DATA_FIRST(trace)->queue_id,
2264                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
2265                        if (nb_rx > 0) {
2266                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
2267                                                FORMAT(trace)->burst_pkts, nb_rx);
2268                                FORMAT(trace)->burst_size = nb_rx;
2269                                FORMAT(trace)->burst_offset = 0;
2270                        }
2271                }
2272
2273                /* Now do we have packets waiting? */
2274                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
2275                        /* Free the last packet buffer */
2276                        if (packet->buffer != NULL) {
2277                                /* The packet should always be finished */
2278                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2279                                free(packet->buffer);
2280                                packet->buffer = NULL;
2281                        }
2282
2283                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2284                        packet->type = TRACE_RT_DATA_DPDK;
2285                        event.type = TRACE_EVENT_PACKET;
2286                        packet->buffer = FORMAT(trace)->burst_pkts[
2287                                             FORMAT(trace)->burst_offset++];
2288                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2289                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2290
2291                        /* XXX - Check this passes the filter trace_read_packet normally
2292                         * does this for us but this wont */
2293                        if (trace->filter) {
2294                                if (!trace_apply_filter(trace->filter, packet)) {
2295                                        /* Failed the filter so we loop for another packet */
2296                                        trace->filtered_packets ++;
2297                                        continue;
2298                                }
2299                        }
2300                        trace->accepted_packets ++;
2301                } else {
2302                        /* We only want to sleep for a very short time - we are non-blocking */
2303                        event.type = TRACE_EVENT_SLEEP;
2304                        event.seconds = 0.0001;
2305                        event.size = 0;
2306                }
2307
2308                /* If we get here we have our event */
2309                break;
2310        } while (1);
2311
2312        return event;
2313}
2314
2315static void dpdk_help(void) {
2316        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
2317        printf("Supported input URIs:\n");
2318        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2319        printf("\tThe -<coreid> is optional \n");
2320        printf("\t e.g. dpdk:0000:01:00.1\n");
2321        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2322        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2323        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2324        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2325        printf("\n");
2326        printf("Supported output URIs:\n");
2327        printf("\tSame format as the input URI.\n");
2328        printf("\t e.g. dpdk:0000:01:00.1\n");
2329        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2330        printf("\n");
2331}
2332
2333static struct libtrace_format_t dpdk = {
2334        "dpdk",
2335        "$Id$",
2336        TRACE_FORMAT_DPDK,
2337        NULL,                               /* probe filename */
2338        NULL,                               /* probe magic */
2339        dpdk_init_input,                    /* init_input */
2340        dpdk_config_input,                  /* config_input */
2341        dpdk_start_input,                   /* start_input */
2342        dpdk_pause_input,                   /* pause_input */
2343        dpdk_init_output,                   /* init_output */
2344        NULL,                               /* config_output */
2345        dpdk_start_output,                  /* start_ouput */
2346        dpdk_fin_input,                     /* fin_input */
2347        dpdk_fin_output,                    /* fin_output */
2348        dpdk_read_packet,                   /* read_packet */
2349        dpdk_prepare_packet,                /* prepare_packet */
2350        dpdk_fin_packet,                    /* fin_packet */
2351        dpdk_write_packet,                  /* write_packet */
2352        dpdk_get_link_type,                 /* get_link_type */
2353        dpdk_get_direction,                 /* get_direction */
2354        dpdk_set_direction,                 /* set_direction */
2355        NULL,                               /* get_erf_timestamp */
2356        dpdk_get_timeval,                   /* get_timeval */
2357        dpdk_get_timespec,                  /* get_timespec */
2358        NULL,                               /* get_seconds */
2359        NULL,                               /* seek_erf */
2360        NULL,                               /* seek_timeval */
2361        NULL,                               /* seek_seconds */
2362        dpdk_get_capture_length,            /* get_capture_length */
2363        dpdk_get_wire_length,               /* get_wire_length */
2364        dpdk_get_framing_length,            /* get_framing_length */
2365        dpdk_set_capture_length,            /* set_capture_length */
2366        NULL,                               /* get_received_packets */
2367        NULL,                               /* get_filtered_packets */
2368        NULL,                               /* get_dropped_packets */
2369        dpdk_get_stats,                     /* get_statistics */
2370        NULL,                               /* get_fd */
2371        dpdk_trace_event,                   /* trace_event */
2372        dpdk_help,                          /* help */
2373        NULL,                               /* next pointer */
2374        {true, 8},                          /* Live, NICs typically have 8 threads */
2375        dpdk_pstart_input,                  /* pstart_input */
2376        dpdk_pread_packets,                 /* pread_packets */
2377        dpdk_pause_input,                   /* ppause */
2378        dpdk_fin_input,                     /* p_fin */
2379        dpdk_pregister_thread,              /* pregister_thread */
2380        dpdk_punregister_thread,            /* punregister_thread */
2381        NULL                                /* get thread stats */
2382};
2383
2384void dpdk_constructor(void) {
2385        register_format(&dpdk);
2386}
Note: See TracBrowser for help on using the repository browser.