source: lib/format_dpdk.c @ 12ae766

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 12ae766 was 12ae766, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Move pthread register outside of the libtrace lock.
DAG does a decent amount of work in this step, which
can be done in parallel. Instead let formats grab a
lock if they need it.

  • Property mode set to 100644
File size: 74.5 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#define _GNU_SOURCE
44
45#include "config.h"
46#include "libtrace.h"
47#include "libtrace_int.h"
48#include "format_helper.h"
49#include "libtrace_arphrd.h"
50#include "hash_toeplitz.h"
51
52#ifdef HAVE_INTTYPES_H
53#  include <inttypes.h>
54#else
55# error "Can't find inttypes.h"
56#endif
57
58#include <stdlib.h>
59#include <assert.h>
60#include <unistd.h>
61#include <endian.h>
62#include <string.h>
63
64#if HAVE_LIBNUMA
65#include <numa.h>
66#endif
67
68/* We can deal with any minor differences by checking the RTE VERSION
69 * Typically DPDK backports some fixes (typically for building against
70 * newer kernels) to the older version of DPDK.
71 *
72 * These get released with the rX suffix. The following macros where added
73 * in these new releases.
74 *
75 * Below this is a log of version that required changes to the libtrace
76 * code (that we still attempt to support).
77 *
78 * Currently 1.5 to 1.7 is supported.
79 */
80#include <rte_eal.h>
81#include <rte_version.h>
82#ifndef RTE_VERSION_NUM
83#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
84#endif
85#ifndef RTE_VER_PATCH_RELEASE
86#       define RTE_VER_PATCH_RELEASE 0
87#endif
88#ifndef RTE_VERSION
89#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
90        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
91#endif
92
93/* 1.6.0r2 :
94 *      rte_eal_pci_set_blacklist() is removed
95 *      device_list is renamed to pci_device_list
96 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
97 *      as such we do apply the whitelist before rte_eal_init.
98 *      This also works correctly with DPDK 1.6.0r2.
99 *
100 * Replaced by:
101 *      rte_devargs (we can simply whitelist)
102 */
103#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
104#       define DPDK_USE_BLACKLIST 1
105#else
106#       define DPDK_USE_BLACKLIST 0
107#endif
108
109/*
110 * 1.7.0 :
111 *      rte_pmd_init_all is removed
112 *
113 * Replaced by:
114 *      Nothing, no longer needed
115 */
116#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
117#       define DPDK_USE_PMD_INIT 1
118#else
119#       define DPDK_USE_PMD_INIT 0
120#endif
121
122#include <rte_per_lcore.h>
123#include <rte_debug.h>
124#include <rte_errno.h>
125#include <rte_common.h>
126#include <rte_log.h>
127#include <rte_memcpy.h>
128#include <rte_prefetch.h>
129#include <rte_branch_prediction.h>
130#include <rte_pci.h>
131#include <rte_ether.h>
132#include <rte_ethdev.h>
133#include <rte_ring.h>
134#include <rte_mempool.h>
135#include <rte_mbuf.h>
136#include <rte_launch.h>
137#include <rte_lcore.h>
138#include <rte_per_lcore.h>
139#include <rte_cycles.h>
140#include <pthread.h>
141
142/* The default size of memory buffers to use - This is the max size of standard
143 * ethernet packet less the size of the MAC CHECKSUM */
144#define RX_MBUF_SIZE 1514
145
146/* The minimum number of memory buffers per queue tx or rx. Search for
147 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
148 */
149#define MIN_NB_BUF 64
150
151/* Number of receive memory buffers to use
152 * By default this is limited by driver to 4k and must be a multiple of 128.
153 * A modification can be made to the driver to remove this limit.
154 * This can be increased in the driver and here.
155 * Should be at least MIN_NB_BUF.
156 */
157#define NB_RX_MBUF 4096
158
159/* Number of send memory buffers to use.
160 * Same limits apply as those to NB_TX_MBUF.
161 */
162#define NB_TX_MBUF 1024
163
164/* The size of the PCI blacklist needs to be big enough to contain
165 * every PCI device address (listed by lspci every bus:device.function tuple).
166 */
167#define BLACK_LIST_SIZE 50
168
169/* The maximum number of characters the mempool name can be */
170#define MEMPOOL_NAME_LEN 20
171
172/* For single threaded libtrace we read packets as a batch/burst
173 * this is the maximum size of said burst */
174#define BURST_SIZE 50
175
176#define MBUF(x) ((struct rte_mbuf *) x)
177/* Get the original placement of the packet data */
178#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
179#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
180#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
181
182#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
183                        (uint64_t) tv.tv_usec*1000ull)
184#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
185                        (uint64_t) ts.tv_nsec)
186
187#if RTE_PKTMBUF_HEADROOM != 128
188#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
189         "any libtrace instance processing these packet must be have the" \
190         "same RTE_PKTMBUF_HEADROOM set"
191#endif
192
193/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
194 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
195 *
196 * Make sure you understand what these are doing before enabling them.
197 * They might make traces incompatable with other builds etc.
198 *
199 * These are also included to show how to do somethings which aren't
200 * obvious in the DPDK documentation.
201 */
202
203/* Print verbose messages to stderr */
204#define DEBUG 0
205
206/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
207 * only turn on if you know clock_gettime is a vsyscall on your system
208 * overwise could be a large overhead. Again gettimeofday() should be
209 * vsyscall also if it's not you should seriously consider updating your
210 * kernel.
211 */
212#ifdef HAVE_LIBRT
213/* You can turn this on (set to 1) to prefer clock_gettime */
214#define USE_CLOCK_GETTIME 1
215#else
216/* DONT CHANGE THIS !!! */
217#define USE_CLOCK_GETTIME 1
218#endif
219
220/* This is fairly safe to turn on - currently there appears to be a 'bug'
221 * in DPDK that will remove the checksum by making the packet appear 4bytes
222 * smaller than what it really is. Most formats don't include the checksum
223 * hence writing out a port such as int: ring: and dpdk: assumes there
224 * is no checksum and will attempt to write the checksum as part of the
225 * packet
226 */
227#define GET_MAC_CRC_CHECKSUM 0
228
229/* This requires a modification of the pmd drivers (inside Intel DPDK)
230 * TODO this requires updating (packet sizes are wrong TS most likely also)
231 */
232#define HAS_HW_TIMESTAMPS_82580 0
233
234#if HAS_HW_TIMESTAMPS_82580
235# define TS_NBITS_82580     40
236/* The maximum on the +ve or -ve side that we can be, make it half way */
237# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
238#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
239#endif
240
241/* As per Intel 82580 specification - mismatch in 82580 datasheet
242 * it states ts is stored in Big Endian, however its actually Little */
243struct hw_timestamp_82580 {
244    uint64_t reserved;
245    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
246};
247
248enum paused_state {
249    DPDK_NEVER_STARTED,
250    DPDK_RUNNING,
251    DPDK_PAUSED,
252};
253
254struct dpdk_per_lcore_t
255{
256        uint16_t queue_id;
257        uint8_t port;
258        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
259#if HAS_HW_TIMESTAMPS_82580
260        /* Timestamping only relevent to RX */
261        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
262        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
263#endif
264};
265
266/* Used by both input and output however some fields are not used
267 * for output */
268struct dpdk_format_data_t {
269    int8_t promisc; /* promiscuous mode - RX only */
270    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
271    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
272    uint8_t paused; /* See paused_state */
273    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
274    uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
275    int snaplen; /* The snap length for the capture - RX only */
276    /* We always have to setup both rx and tx queues even if we don't want them */
277    int nb_rx_buf; /* The number of packet buffers in the rx ring */
278    int nb_tx_buf; /* The number of packet buffers in the tx ring */
279    int nic_numa_node; /* The NUMA node that the NIC is attached to */
280    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
281#if DPDK_USE_BLACKLIST
282    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
283        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
284#endif
285    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
286    uint8_t rss_key[40]; // This is the RSS KEY
287    /* To improve performance we always batch reading packets, in a burst */
288    struct rte_mbuf* burst_pkts[BURST_SIZE];
289    int burst_size; /* The total number read in the burst */
290    int burst_offset; /* The offset we are into the burst */
291        // DPDK normally seems to have a limit of 8 queues for a given card
292        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
293};
294
295enum dpdk_addt_hdr_flags {
296    INCLUDES_CHECKSUM = 0x1,
297    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
298};
299
300/**
301 * A structure placed in front of the packet where we can store
302 * additional information about the given packet.
303 * +--------------------------+
304 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
305 * +--------------------------+
306 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
307 * +--------------------------+
308 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
309 * +--------------------------+
310 * *   hw_timestamp_82580     * 16 bytes Optional
311 * +--------------------------+
312 * |       Packet data        | Variable Size
313 * |                          |
314 */
315struct dpdk_addt_hdr {
316    uint64_t timestamp;
317    uint8_t flags;
318    uint8_t direction;
319    uint8_t reserved1;
320    uint8_t reserved2;
321    uint32_t cap_len; /* The size to say the capture is */
322};
323
324/**
325 * We want to blacklist all devices except those on the whitelist
326 * (I say list, but yes it is only the one).
327 *
328 * The default behaviour of rte_pci_probe() will map every possible device
329 * to its DPDK driver. The DPDK driver will take the ethernet device
330 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
331 *
332 * So blacklist all devices except the one that we wish to use so that
333 * the others can still be used as standard ethernet ports.
334 *
335 * @return 0 if successful, otherwise -1 on error.
336 */
337#if DPDK_USE_BLACKLIST
338static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
339{
340        struct rte_pci_device *dev = NULL;
341        format_data->nb_blacklist = 0;
342
343        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
344
345        TAILQ_FOREACH(dev, &device_list, next) {
346        if (whitelist != NULL && whitelist->domain == dev->addr.domain
347            && whitelist->bus == dev->addr.bus
348            && whitelist->devid == dev->addr.devid
349            && whitelist->function == dev->addr.function)
350            continue;
351                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
352                                / sizeof (format_data->blacklist[0])) {
353                        fprintf(stderr, "Warning: too many devices to blacklist consider"
354                                        " increasing BLACK_LIST_SIZE");
355                        break;
356                }
357                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
358                ++format_data->nb_blacklist;
359        }
360
361        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
362        return 0;
363}
364#else /* DPDK_USE_BLACKLIST */
365#include <rte_devargs.h>
366static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
367{
368        char pci_str[20] = {0};
369        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
370                 whitelist->domain,
371                 whitelist->bus,
372                 whitelist->devid,
373                 whitelist->function);
374        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
375                return -1;
376        }
377        return 0;
378}
379#endif
380
381/**
382 * Parse the URI format as a pci address
383 * Fills in addr, note core is optional and is unchanged if
384 * a value for it is not provided.
385 *
386 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
387 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
388 */
389static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
390    int matches;
391    assert(str);
392    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld", &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
393    if (matches >= 4) {
394        return 0;
395    } else {
396        return -1;
397    }
398}
399
400/**
401 * Convert a pci address to the numa node it is
402 * connected to.
403 *
404 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
405 * so we can call it before DPDK
406 *
407 * @return -1 if unknown otherwise a number 0 or higher of the numa node
408 */
409static int pci_to_numa(struct rte_pci_addr * dev_addr) {
410        char path[50] = {0};
411        FILE *file;
412
413        /* Read from the system */
414        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
415                 dev_addr->domain,
416                 dev_addr->bus,
417                 dev_addr->devid,
418                 dev_addr->function);
419
420        if((file = fopen(path, "r")) != NULL) {
421                int numa_node = -1;
422                fscanf(file, "%d", &numa_node);
423                fclose(file);
424                return numa_node;
425        }
426        return -1;
427}
428
429#if DEBUG
430/* For debugging */
431static inline void dump_configuration()
432{
433    struct rte_config * global_config;
434    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
435
436    if (nb_cpu <= 0) {
437        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
438        nb_cpu = 1; /* fallback to just 1 core */
439    }
440    if (nb_cpu > RTE_MAX_LCORE)
441        nb_cpu = RTE_MAX_LCORE;
442
443    global_config = rte_eal_get_configuration();
444
445    if (global_config != NULL) {
446        int i;
447        fprintf(stderr, "Intel DPDK setup\n"
448               "---Version      : %s\n"
449               "---Master LCore : %"PRIu32"\n"
450               "---LCore Count  : %"PRIu32"\n",
451               rte_version(),
452               global_config->master_lcore, global_config->lcore_count);
453
454        for (i = 0 ; i < nb_cpu; i++) {
455            fprintf(stderr, "   ---Core %d : %s\n", i,
456                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
457        }
458
459        const char * proc_type;
460        switch (global_config->process_type) {
461            case RTE_PROC_AUTO:
462                proc_type = "auto";
463                break;
464            case RTE_PROC_PRIMARY:
465                proc_type = "primary";
466                break;
467            case RTE_PROC_SECONDARY:
468                proc_type = "secondary";
469                break;
470            case RTE_PROC_INVALID:
471                proc_type = "invalid";
472                break;
473            default:
474                proc_type = "something worse than invalid!!";
475        }
476        fprintf(stderr, "---Process Type : %s\n", proc_type);
477    }
478
479}
480#endif
481
482/**
483 * Expects to be called from the master lcore and moves it to the given dpdk id
484 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
485 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
486 *               and not already in use.
487 * @return 0 is successful otherwise -1 on error.
488 */
489static inline int dpdk_move_master_lcore(size_t core) {
490    struct rte_config *cfg = rte_eal_get_configuration();
491    cpu_set_t cpuset;
492    int i;
493
494    assert (core < RTE_MAX_LCORE);
495    assert (rte_get_master_lcore() == rte_lcore_id());
496
497    if (core == rte_lcore_id())
498        return 0;
499
500    // Make sure we are not overwriting someone else
501    assert(!rte_lcore_is_enabled(core));
502
503    // Move the core
504    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
505    cfg->lcore_role[core] = ROLE_RTE;
506    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
507    rte_eal_get_configuration()->master_lcore = core;
508    RTE_PER_LCORE(_lcore_id) = core;
509
510    // Now change the affinity
511    CPU_ZERO(&cpuset);
512
513    if (lcore_config[core].detected) {
514        CPU_SET(core, &cpuset);
515    } else {
516        for (i = 0; i < RTE_MAX_LCORE; ++i) {
517            if (lcore_config[i].detected)
518                CPU_SET(i, &cpuset);
519        }
520    }
521
522    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
523    if (i != 0) {
524        // TODO proper libtrace style error here!!
525        fprintf(stderr, "pthread_setaffinity_np failed\n");
526        return -1;
527    }
528    return 0;
529}
530
531
532/**
533 * XXX This is very bad XXX
534 * But we have to do something to allow getopts nesting
535 * Luckly normally the format is last so it doesn't matter
536 * DPDK only supports modern systems so hopefully this
537 * will continue to work
538 */
539struct saved_getopts {
540        char *optarg;
541        int optind;
542        int opterr;
543        int optopt;
544};
545
546static void save_getopts(struct saved_getopts *opts) {
547        opts->optarg = optarg;
548        opts->optind = optind;
549        opts->opterr = opterr;
550        opts->optopt = optopt;
551}
552
553static void restore_getopts(struct saved_getopts *opts) {
554        optarg = opts->optarg;
555        optind = opts->optind;
556        opterr = opts->opterr;
557        optopt = opts->optopt;
558}
559
560static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
561                                        char * err, int errlen) {
562    int ret; /* Returned error codes */
563    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
564    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
565    char mem_map[20] = {0}; /* The memory name */
566    long nb_cpu; /* The number of CPUs in the system */
567    long my_cpu; /* The CPU number we want to bind to */
568    int i;
569    struct rte_config *cfg = rte_eal_get_configuration();
570        struct saved_getopts save_opts;
571
572#if DEBUG
573    rte_set_log_level(RTE_LOG_DEBUG);
574#else
575    rte_set_log_level(RTE_LOG_WARNING);
576#endif
577    /*
578     * Using unique file prefixes mean separate memory is used, unlinking
579     * the two processes. However be careful we still cannot access a
580     * port that already in use.
581     *
582     * Using unique file prefixes mean separate memory is used, unlinking
583     * the two processes. However be careful we still cannot access a
584     * port that already in use.
585     */
586    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
587                "--file-prefix", mem_map, "-m", "980", NULL};
588    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
589
590    /* This initialises the Environment Abstraction Layer (EAL)
591     * If we had slave workers these are put into WAITING state
592     *
593     * Basically binds this thread to a fixed core, which we choose as
594     * the last core on the machine (assuming fewer interrupts mapped here).
595     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
596     * "-n" the number of memory channels into the CPU (hardware specific)
597     *      - Most likely to be half the number of ram slots in your machine.
598     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
599     * Controls where in memory packets are stored and should spread across
600     * the channels. We just use 1 to be safe.
601     */
602
603    /* Get the number of cpu cores in the system and use the last core
604     * on the correct numa node */
605    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
606    if (nb_cpu <= 0) {
607        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
608        nb_cpu = 1; /* fallback to the first core */
609    }
610    if (nb_cpu > RTE_MAX_LCORE)
611        nb_cpu = RTE_MAX_LCORE;
612
613    my_cpu = -1;
614    /* This allows the user to specify the core - we would try to do this
615     * automatically but it's hard to tell that this is secondary
616     * before running rte_eal_init(...). Currently we are limited to 1
617     * instance per core due to the way memory is allocated. */
618    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
619        snprintf(err, errlen, "Failed to parse URI");
620        return -1;
621    }
622
623#if HAVE_LIBNUMA
624        format_data->nic_numa_node = pci_to_numa(&use_addr);
625        if (my_cpu < 0) {
626                /* If we can assign to a core on the same numa node */
627                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
628                if(format_data->nic_numa_node >= 0) {
629                        int max_node_cpu = -1;
630                        struct bitmask *mask = numa_allocate_cpumask();
631                        assert(mask);
632                        numa_node_to_cpus(format_data->nic_numa_node, mask);
633                        for (i = 0 ; i < nb_cpu; ++i) {
634                                if (numa_bitmask_isbitset(mask,i))
635                                        max_node_cpu = i+1;
636                        }
637                        my_cpu = max_node_cpu;
638                }
639        }
640#endif
641        if (my_cpu < 0) {
642                my_cpu = nb_cpu;
643        }
644
645
646    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
647                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
648
649    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
650        snprintf(err, errlen,
651          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
652          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
653        return -1;
654    }
655
656    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
657       info older versions */
658    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
659    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
660
661#if !DPDK_USE_BLACKLIST
662    /* Black list all ports besides the one that we want to use */
663        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
664                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
665                         " are you sure the address is correct?: %s", strerror(-ret));
666                return -1;
667        }
668#endif
669
670        /* Give the memory map a unique name */
671        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
672    /* rte_eal_init it makes a call to getopt so we need to reset the
673     * global optind variable of getopt otherwise this fails */
674        save_getopts(&save_opts);
675    optind = 1;
676    if ((ret = rte_eal_init(argc, argv)) < 0) {
677        snprintf(err, errlen,
678          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
679        return -1;
680    }
681        restore_getopts(&save_opts);
682    // These are still running but will never do anything with DPDK v1.7 we
683    // should remove this XXX in the future
684    for(i = 0; i < RTE_MAX_LCORE; ++i) {
685            if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
686            cfg->lcore_role[i] = ROLE_OFF;
687            cfg->lcore_count--;
688        }
689    }
690    // Only the master should be running
691    assert(cfg->lcore_count == 1);
692
693    dpdk_move_master_lcore(my_cpu-1);
694
695#if DEBUG
696    dump_configuration();
697#endif
698
699#if DPDK_USE_PMD_INIT
700    /* This registers all available NICs with Intel DPDK
701     * These are not loaded until rte_eal_pci_probe() is called.
702     */
703    if ((ret = rte_pmd_init_all()) < 0) {
704        snprintf(err, errlen,
705          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
706        return -1;
707    }
708#endif
709
710#if DPDK_USE_BLACKLIST
711    /* Black list all ports besides the one that we want to use */
712        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
713                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
714                         " are you sure the address is correct?: %s", strerror(-ret));
715                return -1;
716        }
717#endif
718
719    /* This loads DPDK drivers against all ports that are not blacklisted */
720        if ((ret = rte_eal_pci_probe()) < 0) {
721        snprintf(err, errlen,
722            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
723        return -1;
724    }
725
726    format_data->nb_ports = rte_eth_dev_count();
727
728    if (format_data->nb_ports != 1) {
729        snprintf(err, errlen,
730            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
731            format_data->nb_ports);
732        return -1;
733    }
734
735    struct rte_eth_dev_info dev_info;
736    rte_eth_dev_info_get(0, &dev_info);
737    fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
738                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
739
740    return 0;
741}
742
743static int dpdk_init_input (libtrace_t *libtrace) {
744    char err[500];
745    err[0] = 0;
746
747    libtrace->format_data = (struct dpdk_format_data_t *)
748                            malloc(sizeof(struct dpdk_format_data_t));
749    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
750    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
751    FORMAT(libtrace)->nb_ports = 0;
752    FORMAT(libtrace)->snaplen = 0; /* Use default */
753    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
754    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
755    FORMAT(libtrace)->nic_numa_node = -1;
756    FORMAT(libtrace)->promisc = -1;
757    FORMAT(libtrace)->pktmbuf_pool = NULL;
758#if DPDK_USE_BLACKLIST
759    FORMAT(libtrace)->nb_blacklist = 0;
760#endif
761    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
762    FORMAT(libtrace)->mempool_name[0] = 0;
763    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
764    FORMAT(libtrace)->burst_size = 0;
765    FORMAT(libtrace)->burst_offset = 0;
766
767    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
768        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
769        free(libtrace->format_data);
770        libtrace->format_data = NULL;
771        return -1;
772    }
773    return 0;
774};
775
776static int dpdk_init_output(libtrace_out_t *libtrace)
777{
778    char err[500];
779    err[0] = 0;
780
781    libtrace->format_data = (struct dpdk_format_data_t *)
782                            malloc(sizeof(struct dpdk_format_data_t));
783    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
784    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
785    FORMAT(libtrace)->nb_ports = 0;
786    FORMAT(libtrace)->snaplen = 0; /* Use default */
787    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
788    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
789    FORMAT(libtrace)->nic_numa_node = -1;
790    FORMAT(libtrace)->promisc = -1;
791    FORMAT(libtrace)->pktmbuf_pool = NULL;
792#if DPDK_USE_BLACKLIST
793    FORMAT(libtrace)->nb_blacklist = 0;
794#endif
795    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
796    FORMAT(libtrace)->mempool_name[0] = 0;
797    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
798    FORMAT(libtrace)->burst_size = 0;
799    FORMAT(libtrace)->burst_offset = 0;
800
801    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
802        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
803        free(libtrace->format_data);
804        libtrace->format_data = NULL;
805        return -1;
806    }
807    return 0;
808};
809
810static int dpdk_pconfig_input (libtrace_t *libtrace,
811                                trace_parallel_option_t option,
812                                void *data) {
813        switch (option) {
814                case TRACE_OPTION_SET_HASHER:
815                        switch (*((enum hasher_types *) data))
816                        {
817                                case HASHER_BALANCE:
818                                case HASHER_UNIDIRECTIONAL:
819                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
820                                        return 0;
821                                case HASHER_BIDIRECTIONAL:
822                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
823                                        return 0;
824                                case HASHER_HARDWARE:
825                                case HASHER_CUSTOM:
826                                        // We don't support these
827                                        return -1;
828                        }
829        break;
830        }
831        return -1;
832}
833/**
834 * Note here snaplen excludes the MAC checksum. Packets over
835 * the requested snaplen will be dropped. (Excluding MAC checksum)
836 *
837 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
838 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
839 * is set the maximum size of the returned packet would be 1518 otherwise
840 * 1514 would be the largest size possibly returned.
841 *
842 */
843static int dpdk_config_input (libtrace_t *libtrace,
844                                        trace_option_t option,
845                                        void *data) {
846    switch (option) {
847        case TRACE_OPTION_SNAPLEN:
848            /* Only support changing snaplen before a call to start is
849             * made */
850            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
851                FORMAT(libtrace)->snaplen=*(int*)data;
852            else
853                return -1;
854            return 0;
855                case TRACE_OPTION_PROMISC:
856                        FORMAT(libtrace)->promisc=*(int*)data;
857            return 0;
858        case TRACE_OPTION_FILTER:
859            /* TODO filtering */
860            break;
861        case TRACE_OPTION_META_FREQ:
862            break;
863        case TRACE_OPTION_EVENT_REALTIME:
864            break;
865        /* Avoid default: so that future options will cause a warning
866         * here to remind us to implement it, or flag it as
867         * unimplementable
868         */
869    }
870
871        /* Don't set an error - trace_config will try to deal with the
872         * option and will set an error if it fails */
873    return -1;
874}
875
876/* Can set jumbo frames/ or limit the size of a frame by setting both
877 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
878 *
879 */
880static struct rte_eth_conf port_conf = {
881        .rxmode = {
882                .mq_mode = ETH_RSS,
883                .split_hdr_size = 0,
884                .header_split   = 0, /**< Header Split disabled */
885                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
886                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
887                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
888                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
889#if GET_MAC_CRC_CHECKSUM
890/* So it appears that if hw_strip_crc is turned off the driver will still
891 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
892 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
893 * So lets just add it back on when we receive the packet.
894 */
895                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
896#else
897/* By default strip the MAC checksum because it's a bit of a hack to
898 * actually read these. And don't want to rely on disabling this to actualy
899 * always cut off the checksum in the future
900 */
901                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
902#endif
903        },
904        .txmode = {
905                .mq_mode = ETH_DCB_NONE,
906        },
907        .rx_adv_conf = {
908                .rss_conf = {
909                        // .rss_key = &rss_key, // We set this per format
910                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
911                },
912        },
913        .intr_conf = {
914                .lsc = 1
915        }
916};
917
918static const struct rte_eth_rxconf rx_conf = {
919        .rx_thresh = {
920                .pthresh = 8,/* RX_PTHRESH prefetch */
921                .hthresh = 8,/* RX_HTHRESH host */
922                .wthresh = 4,/* RX_WTHRESH writeback */
923        },
924    .rx_free_thresh = 0,
925    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
926};
927
928static const struct rte_eth_txconf tx_conf = {
929        .tx_thresh = {
930        /**
931         * TX_PTHRESH prefetch
932         * Set on the NIC, if the number of unprocessed descriptors to queued on
933         * the card fall below this try grab at least hthresh more unprocessed
934         * descriptors.
935         */
936                .pthresh = 36,
937
938        /* TX_HTHRESH host
939         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
940         */
941                .hthresh = 0,
942
943        /* TX_WTHRESH writeback
944         * Set on the NIC, the number of sent descriptors before writing back
945         * status to confirm the transmission. This is done more efficiently as
946         * a bulk DMA-transfer rather than writing one at a time.
947         * Similar to tx_free_thresh however this is applied to the NIC, where
948         * as tx_free_thresh is when DPDK will check these. This is extended
949         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
950         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
951         */
952                .wthresh = 4,
953        },
954
955    /* Used internally by DPDK rather than passed to the NIC. The number of
956     * packet descriptors to send before checking for any responses written
957     * back (to confirm the transmission). Default = 32 if set to 0)
958     */
959        .tx_free_thresh = 0,
960
961    /* This is the Report Status threshold, used by 10Gbit cards,
962     * This signals the card to only write back status (such as
963     * transmission successful) after this minimum number of transmit
964     * descriptors are seen. The default is 32 (if set to 0) however if set
965     * to greater than 1 TX wthresh must be set to zero, because this is kindof
966     * a replacement. See the dpdk programmers guide for more restrictions.
967     */
968        .tx_rs_thresh = 1,
969};
970
971/**
972 * A callback for a link state change (LSC).
973 *
974 * Packets may be received before this notification. In fact the DPDK IGXBE
975 * driver likes to put a delay upto 5sec before sending this.
976 *
977 * We use this to ensure the link speed is correct for our timestamp
978 * calculations. Because packets might be received before the link up we still
979 * update this when the packet is received.
980 *
981 * @param port The DPDK port
982 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
983 * @param cb_arg The dpdk_format_data_t structure associated with the format
984 */
985static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
986                              void *cb_arg) {
987        struct dpdk_format_data_t * format_data = cb_arg;
988        struct rte_eth_link link_info;
989        assert(event == RTE_ETH_EVENT_INTR_LSC);
990        assert(port == format_data->port);
991
992        rte_eth_link_get_nowait(port, &link_info);
993
994        if (link_info.link_status)
995                format_data->link_speed = link_info.link_speed;
996        else
997                format_data->link_speed = 0;
998
999#if DEBUG
1000        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1001                link_info.link_status ? "up" : "down",
1002                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1003                                          "full-duplex" : "half-duplex",
1004                (int) link_info.link_speed);
1005#endif
1006
1007        /* Turns out DPDK drivers might not come back up if the link speed
1008         * changes. So we reset the autoneg procedure. This is very unsafe
1009         * we have have threads reading packets and we stop the port. */
1010#if 0
1011        if (!link_info.link_status) {
1012                int ret;
1013                rte_eth_dev_stop(port);
1014                ret = rte_eth_dev_start(port);
1015                if (ret < 0) {
1016                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1017                                strerror(-ret));
1018                }
1019        }
1020#endif
1021}
1022
1023/* Attach memory to the port and start the port or restart the port.
1024 */
1025static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
1026    int ret; /* Check return values for errors */
1027    struct rte_eth_link link_info; /* Wait for link */
1028    unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id());
1029
1030    /* Already started */
1031    if (format_data->paused == DPDK_RUNNING)
1032        return 0;
1033
1034    /* First time started we need to alloc our memory, doing this here
1035     * rather than in environment setup because we don't have snaplen then */
1036    if (format_data->paused == DPDK_NEVER_STARTED) {
1037        if (format_data->snaplen == 0) {
1038            format_data->snaplen = RX_MBUF_SIZE;
1039            port_conf.rxmode.jumbo_frame = 0;
1040            port_conf.rxmode.max_rx_pkt_len = 0;
1041        } else {
1042            /* Use jumbo frames */
1043            port_conf.rxmode.jumbo_frame = 1;
1044            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1045        }
1046
1047        /* This is additional overhead so make sure we allow space for this */
1048#if GET_MAC_CRC_CHECKSUM
1049        format_data->snaplen += ETHER_CRC_LEN;
1050#endif
1051#if HAS_HW_TIMESTAMPS_82580
1052        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1053#endif
1054
1055        /* Create the mbuf pool, which is the place our packets are allocated
1056         * from - TODO figure out if there is is a free function (I cannot see one)
1057         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1058         * allocate however that extra 1 packet is not used.
1059         * (I assume <= vs < error some where in DPDK code)
1060         * TX requires nb_tx_buffers + 1 in the case the queue is full
1061         * so that will fill the new buffer and wait until slots in the
1062         * ring become available.
1063         */
1064#if DEBUG
1065    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1066#endif
1067    format_data->pktmbuf_pool =
1068            rte_mempool_create(format_data->mempool_name,
1069                       (format_data->nb_rx_buf + format_data->nb_tx_buf + 1),
1070                       format_data->snaplen + sizeof(struct rte_mbuf)
1071                                        + RTE_PKTMBUF_HEADROOM,
1072                       128, sizeof(struct rte_pktmbuf_pool_private),
1073                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1074                       cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/);
1075
1076    if (format_data->pktmbuf_pool == NULL) {
1077            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"
1078                        "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node);
1079            return -1;
1080        }
1081    }
1082
1083    /* ----------- Now do the setup for the port mapping ------------ */
1084    /* Order of calls must be
1085     * rte_eth_dev_configure()
1086     * rte_eth_tx_queue_setup()
1087     * rte_eth_rx_queue_setup()
1088     * rte_eth_dev_start()
1089     * other rte_eth calls
1090     */
1091
1092
1093    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1094
1095    /* This must be called first before another *eth* function
1096     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1097    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
1098    if (ret < 0) {
1099        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1100                            " %"PRIu8" : %s", format_data->port,
1101                            strerror(-ret));
1102        return -1;
1103    }
1104    /* Initialise the TX queue a minimum value if using this port for
1105     * receiving. Otherwise a larger size if writing packets.
1106     */
1107    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1108                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1109    if (ret < 0) {
1110        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1111                            " %"PRIu8" : %s", format_data->port,
1112                            strerror(-ret));
1113        return -1;
1114    }
1115    /* Initialise the RX queue with some packets from memory */
1116    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
1117                                 format_data->nb_rx_buf, cpu_numa_node,
1118                                 &rx_conf, format_data->pktmbuf_pool);
1119    if (ret < 0) {
1120        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1121                    " %"PRIu8" : %s", format_data->port,
1122                    strerror(-ret));
1123        return -1;
1124    }
1125
1126    /* Start device */
1127    ret = rte_eth_dev_start(format_data->port);
1128    if (ret < 0) {
1129        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1130                    strerror(-ret));
1131        return -1;
1132    }
1133
1134    /* Default promiscuous to on */
1135    if (format_data->promisc == -1)
1136        format_data->promisc = 1;
1137
1138    if (format_data->promisc == 1)
1139        rte_eth_promiscuous_enable(format_data->port);
1140    else
1141        rte_eth_promiscuous_disable(format_data->port);
1142
1143        /* Register a callback for link state changes */
1144        ret = rte_eth_dev_callback_register(format_data->port,
1145                                            RTE_ETH_EVENT_INTR_LSC,
1146                                            dpdk_lsc_callback,
1147                                            format_data);
1148        /* If this fails it is not a show stopper */
1149#if DEBUG
1150        fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1151                ret, strerror(-ret));
1152#endif
1153
1154    /* Get the current link status */
1155    rte_eth_link_get_nowait(format_data->port, &link_info);
1156    format_data->link_speed = link_info.link_speed;
1157#if DEBUG
1158    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1159            (int) link_info.link_duplex, (int) link_info.link_speed);
1160#endif
1161    /* We have now successfully started/unpaused */
1162    format_data->paused = DPDK_RUNNING;
1163
1164    return 0;
1165}
1166
1167/* Attach memory to the port and start (or restart) the port/s.
1168 */
1169static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) {
1170    int ret, i; /* Check return values for errors */
1171    struct rte_eth_link link_info; /* Wait for link */
1172
1173    /* Already started */
1174    if (format_data->paused == DPDK_RUNNING)
1175        return 0;
1176
1177    /* First time started we need to alloc our memory, doing this here
1178     * rather than in environment setup because we don't have snaplen then */
1179    if (format_data->paused == DPDK_NEVER_STARTED) {
1180        if (format_data->snaplen == 0) {
1181            format_data->snaplen = RX_MBUF_SIZE;
1182            port_conf.rxmode.jumbo_frame = 0;
1183            port_conf.rxmode.max_rx_pkt_len = 0;
1184        } else {
1185            /* Use jumbo frames */
1186            port_conf.rxmode.jumbo_frame = 1;
1187            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1188        }
1189
1190        /* This is additional overhead so make sure we allow space for this */
1191#if GET_MAC_CRC_CHECKSUM
1192        format_data->snaplen += ETHER_CRC_LEN;
1193#endif
1194#if HAS_HW_TIMESTAMPS_82580
1195        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1196#endif
1197
1198        /* Create the mbuf pool, which is the place our packets are allocated
1199         * from - TODO figure out if there is a free function (I cannot see one)
1200         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1201         * allocate however that extra 1 packet is not used.
1202         * (I assume <= vs < error some where in DPDK code)
1203         * TX requires nb_tx_buffers + 1 in the case the queue is full
1204         * so that will fill the new buffer and wait until slots in the
1205         * ring become available.
1206         */
1207#if DEBUG
1208    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1209#endif
1210    format_data->pktmbuf_pool =
1211            rte_mempool_create(format_data->mempool_name,
1212                       (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
1213                       format_data->snaplen + sizeof(struct rte_mbuf)
1214                                        + RTE_PKTMBUF_HEADROOM,
1215                       128, sizeof(struct rte_pktmbuf_pool_private),
1216                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1217                       format_data->nic_numa_node, 0);
1218
1219        if (format_data->pktmbuf_pool == NULL) {
1220            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1221                        "pool failed: %s", strerror(rte_errno));
1222            return -1;
1223        }
1224    }
1225
1226    /* ----------- Now do the setup for the port mapping ------------ */
1227    /* Order of calls must be
1228     * rte_eth_dev_configure()
1229     * rte_eth_tx_queue_setup()
1230     * rte_eth_rx_queue_setup()
1231     * rte_eth_dev_start()
1232     * other rte_eth calls
1233     */
1234
1235    /* This must be called first before another *eth* function
1236     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1237    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1238    if (ret < 0) {
1239        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1240                            " %"PRIu8" : %s", format_data->port,
1241                            strerror(-ret));
1242        return -1;
1243    }
1244#if DEBUG
1245    fprintf(stderr, "Doing dev configure\n");
1246#endif
1247    /* Initialise the TX queue a minimum value if using this port for
1248     * receiving. Otherwise a larger size if writing packets.
1249     */
1250    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1251                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1252    if (ret < 0) {
1253        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1254                            " %"PRIu8" : %s", format_data->port,
1255                            strerror(-ret));
1256        return -1;
1257    }
1258
1259    for (i=0; i < rx_queues; i++) {
1260#if DEBUG
1261    fprintf(stderr, "Doing queue configure\n");
1262#endif
1263
1264                /* Initialise the RX queue with some packets from memory */
1265                ret = rte_eth_rx_queue_setup(format_data->port, i,
1266                                             format_data->nb_rx_buf, format_data->nic_numa_node,
1267                                             &rx_conf, format_data->pktmbuf_pool);
1268        /* Init per_thread data structures */
1269        format_data->per_lcore[i].port = format_data->port;
1270        format_data->per_lcore[i].queue_id = i;
1271
1272                if (ret < 0) {
1273                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1274                                                " %"PRIu8" : %s", format_data->port,
1275                                                strerror(-ret));
1276                        return -1;
1277                }
1278        }
1279
1280#if DEBUG
1281    fprintf(stderr, "Doing start device\n");
1282#endif
1283    /* Start device */
1284    ret = rte_eth_dev_start(format_data->port);
1285#if DEBUG
1286    fprintf(stderr, "Done start device\n");
1287#endif
1288    if (ret < 0) {
1289        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1290                    strerror(-ret));
1291        return -1;
1292    }
1293
1294
1295    /* Default promiscuous to on */
1296    if (format_data->promisc == -1)
1297        format_data->promisc = 1;
1298
1299    if (format_data->promisc == 1)
1300        rte_eth_promiscuous_enable(format_data->port);
1301    else
1302        rte_eth_promiscuous_disable(format_data->port);
1303
1304
1305    /* We have now successfully started/unpased */
1306    format_data->paused = DPDK_RUNNING;
1307
1308    // Can use remote launch for all
1309    /*RTE_LCORE_FOREACH_SLAVE(i) {
1310                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
1311        }*/
1312
1313    /* Register a callback for link state changes */
1314    ret = rte_eth_dev_callback_register(format_data->port,
1315                                        RTE_ETH_EVENT_INTR_LSC,
1316                                        dpdk_lsc_callback,
1317                                        format_data);
1318    /* If this fails it is not a show stopper */
1319#if DEBUG
1320    fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1321            ret, strerror(-ret));
1322#endif
1323
1324    /* Get the current link status */
1325    rte_eth_link_get_nowait(format_data->port, &link_info);
1326    format_data->link_speed = link_info.link_speed;
1327#if DEBUG
1328    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1329            (int) link_info.link_duplex, (int) link_info.link_speed);
1330        struct rte_eth_rss_reta reta_conf = {0};
1331        reta_conf.mask_lo = ~reta_conf.mask_lo;
1332        reta_conf.mask_hi = ~reta_conf.mask_hi;
1333        int qew = rte_eth_dev_rss_reta_query(format_data->port, &reta_conf);
1334        fprintf(stderr, "err=%d", qew);
1335        for (i = 0; i < ETH_RSS_RETA_NUM_ENTRIES; i++) {
1336                fprintf(stderr, "[%d] = %d\n", i, (int)reta_conf.reta[i]);
1337        }
1338
1339#endif
1340
1341    return 0;
1342}
1343
1344static int dpdk_start_input (libtrace_t *libtrace) {
1345    char err[500];
1346    err[0] = 0;
1347
1348    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1349        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1350        free(libtrace->format_data);
1351        libtrace->format_data = NULL;
1352        return -1;
1353    }
1354    return 0;
1355}
1356
1357static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1358    struct rte_eth_dev_info dev_info;
1359    rte_eth_dev_info_get(port_id, &dev_info);
1360    return dev_info.max_rx_queues;
1361}
1362
1363static inline size_t dpdk_processor_count () {
1364    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1365    if (nb_cpu <= 0)
1366        return 1;
1367    else
1368        return (size_t) nb_cpu;
1369}
1370
1371static int dpdk_pstart_input (libtrace_t *libtrace) {
1372    char err[500];
1373    int i=0, phys_cores=0;
1374    int tot = libtrace->perpkt_thread_count;
1375    err[0] = 0;
1376
1377    if (rte_lcore_id() != rte_get_master_lcore())
1378        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
1379
1380    // If the master is not on the last thread we move it there
1381    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1382        // Consider error handling here
1383        dpdk_move_master_lcore(RTE_MAX_LCORE - 1);
1384    }
1385
1386    // Don't exceed the number of cores in the system/detected by dpdk
1387    // We don't have to force this but performance wont be good if we don't
1388    for (i = 0; i < RTE_MAX_LCORE; ++i) {
1389        if (lcore_config[i].detected) {
1390            if (rte_lcore_is_enabled(i))
1391                fprintf(stderr, "Found core %d already in use!\n", i);
1392            else
1393                phys_cores++;
1394        }
1395    }
1396
1397        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1398        tot = MIN(tot, phys_cores);
1399
1400        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
1401
1402    if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1403        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1404        free(libtrace->format_data);
1405        libtrace->format_data = NULL;
1406        return -1;
1407    }
1408
1409    // Make sure we only start the number that we should
1410    libtrace->perpkt_thread_count = tot;
1411    return 0;
1412}
1413
1414
1415/**
1416 * Register a thread with the DPDK system,
1417 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1418 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1419 * gives it.
1420 *
1421 * We then allow a mapper thread to be started on every real core as DPDK would,
1422 * we also bind these to the corresponding CPU cores.
1423 *
1424 * @param libtrace A pointer to the trace
1425 * @param reading True if the thread will be used to read packets, i.e. will
1426 *                call pread_packet(), false if thread used to process packet
1427 *                in any other manner including statistics functions.
1428 */
1429static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1430{
1431    struct rte_config *cfg = rte_eal_get_configuration();
1432    int i;
1433    int new_id = -1;
1434
1435    // If 'reading packets' fill in cores from 0 up and bind affinity
1436    // otherwise start from the MAX core (which is also the master) and work backwards
1437    // in this case physical cores on the system will not exist so we don't bind
1438    // these to any particular physical core
1439    pthread_mutex_lock(&libtrace->libtrace_lock);
1440    if (reading) {
1441#if HAVE_LIBNUMA
1442        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1443                if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
1444                                new_id = i;
1445                        if (!lcore_config[i].detected)
1446                                new_id = -1;
1447                        break;
1448                }
1449        }
1450#endif
1451        /* Retry without the the numa restriction */
1452        if (new_id == -1) {
1453                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1454                                if (!rte_lcore_is_enabled(i)) {
1455                                        new_id = i;
1456                                if (!lcore_config[i].detected)
1457                                        fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
1458                                break;
1459                        }
1460                }
1461        }
1462    } else {
1463        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1464            if (!rte_lcore_is_enabled(i)) {
1465                new_id = i;
1466                break;
1467            }
1468        }
1469    }
1470
1471    if (new_id == -1) {
1472        assert(cfg->lcore_count == RTE_MAX_LCORE);
1473        // TODO proper libtrace style error here!!
1474        fprintf(stderr, "Too many threads for DPDK!!\n");
1475        pthread_mutex_unlock(&libtrace->libtrace_lock);
1476        return -1;
1477    }
1478
1479    // Enable the core in global DPDK structs
1480    cfg->lcore_role[new_id] = ROLE_RTE;
1481    cfg->lcore_count++;
1482    // Set TLS to reflect our new number
1483    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
1484    fprintf(stderr, "original id%d", rte_lcore_id());
1485    RTE_PER_LCORE(_lcore_id) = new_id;
1486        char name[99];
1487        pthread_getname_np(pthread_self(),
1488                              name, sizeof(name));
1489
1490    fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
1491
1492    if (reading) {
1493        // Set affinity bind to corresponding core
1494        cpu_set_t cpuset;
1495        CPU_ZERO(&cpuset);
1496        CPU_SET(rte_lcore_id(), &cpuset);
1497        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1498        if (i != 0) {
1499            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
1500            pthread_mutex_unlock(&libtrace->libtrace_lock);
1501            return -1;
1502        }
1503    }
1504
1505    // Map our TLS to the thread data
1506    if (reading) {
1507        if(t->type == THREAD_PERPKT) {
1508            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
1509        } else {
1510            t->format_data = &FORMAT(libtrace)->per_lcore[0];
1511        }
1512    }
1513    pthread_mutex_unlock(&libtrace->libtrace_lock);
1514    return 0;
1515}
1516
1517
1518/**
1519 * Unregister a thread with the DPDK system.
1520 *
1521 * Only previously registered threads should be calling this just before
1522 * they are destroyed.
1523 */
1524static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1525{
1526    struct rte_config *cfg = rte_eal_get_configuration();
1527
1528    assert(rte_lcore_id() < RTE_MAX_LCORE);
1529    pthread_mutex_lock(&libtrace->libtrace_lock);
1530    // Skip if master!!
1531    if (rte_lcore_id() == rte_get_master_lcore()) {
1532        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1533        pthread_mutex_unlock(&libtrace->libtrace_lock);
1534        return;
1535    }
1536
1537    // Disable this core in global DPDK structs
1538    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1539    cfg->lcore_count--;
1540    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1541    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1542    pthread_mutex_unlock(&libtrace->libtrace_lock);
1543    return;
1544}
1545
1546static int dpdk_start_output(libtrace_out_t *libtrace)
1547{
1548    char err[500];
1549    err[0] = 0;
1550
1551    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1552        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1553        free(libtrace->format_data);
1554        libtrace->format_data = NULL;
1555        return -1;
1556    }
1557    return 0;
1558}
1559
1560static int dpdk_pause_input(libtrace_t * libtrace) {
1561    /* This stops the device, but can be restarted using rte_eth_dev_start() */
1562    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1563#if DEBUG
1564        fprintf(stderr, "Pausing DPDK port\n");
1565#endif
1566        rte_eth_dev_stop(FORMAT(libtrace)->port);
1567        FORMAT(libtrace)->paused = DPDK_PAUSED;
1568        /* Empty the queue of packets */
1569        for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1570                rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1571        }
1572        FORMAT(libtrace)->burst_offset = 0;
1573        FORMAT(libtrace)->burst_size = 0;
1574        /* If we pause it the driver will be reset and likely our counter */
1575
1576        FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0;
1577#if HAS_HW_TIMESTAMPS_82580
1578        FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0;
1579#endif
1580    }
1581    return 0;
1582}
1583
1584static int dpdk_write_packet(libtrace_out_t *trace,
1585                libtrace_packet_t *packet){
1586    struct rte_mbuf* m_buff[1];
1587
1588    int wirelen = trace_get_wire_length(packet);
1589    int caplen = trace_get_capture_length(packet);
1590
1591    /* Check for a checksum and remove it */
1592    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1593                                            wirelen == caplen)
1594        caplen -= ETHER_CRC_LEN;
1595
1596    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1597    if (m_buff[0] == NULL) {
1598        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1599        return -1;
1600    } else {
1601        int ret;
1602        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1603        do {
1604            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1605        } while (ret != 1);
1606    }
1607
1608    return 0;
1609}
1610
1611static int dpdk_fin_input(libtrace_t * libtrace) {
1612    /* Free our memory structures */
1613    if (libtrace->format_data != NULL) {
1614        /* Close the device completely, device cannot be restarted */
1615        if (FORMAT(libtrace)->port != 0xFF)
1616                rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1617                                                RTE_ETH_EVENT_INTR_LSC,
1618                                                dpdk_lsc_callback,
1619                                                FORMAT(libtrace));
1620                rte_eth_dev_close(FORMAT(libtrace)->port);
1621                /* filter here if we used it */
1622                free(libtrace->format_data);
1623        }
1624
1625    /* Revert to the original PCI drivers */
1626    /* No longer in DPDK
1627    rte_eal_pci_exit(); */
1628    return 0;
1629}
1630
1631
1632static int dpdk_fin_output(libtrace_out_t * libtrace) {
1633    /* Free our memory structures */
1634    if (libtrace->format_data != NULL) {
1635        /* Close the device completely, device cannot be restarted */
1636        if (FORMAT(libtrace)->port != 0xFF)
1637            rte_eth_dev_close(FORMAT(libtrace)->port);
1638        /* filter here if we used it */
1639                free(libtrace->format_data);
1640        }
1641
1642    /* Revert to the original PCI drivers */
1643    /* No longer in DPDK
1644    rte_eal_pci_exit(); */
1645    return 0;
1646}
1647
1648/**
1649 * Get the start of the additional header that we added to a packet.
1650 */
1651static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1652    assert(packet);
1653    assert(packet->buffer);
1654    /* Our header sits straight after the mbuf header */
1655    return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1656}
1657
1658static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1659    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1660    return hdr->cap_len;
1661}
1662
1663static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1664    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1665    if (size > hdr->cap_len) {
1666        /* Cannot make a packet bigger */
1667                return trace_get_capture_length(packet);
1668        }
1669
1670    /* Reset the cached capture length first*/
1671    packet->capture_length = -1;
1672    hdr->cap_len = (uint32_t) size;
1673        return trace_get_capture_length(packet);
1674}
1675
1676static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1677    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1678    int org_cap_size; /* The original capture size */
1679    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1680        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1681                            sizeof(struct hw_timestamp_82580);
1682    } else {
1683        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1684    }
1685    if (hdr->flags & INCLUDES_CHECKSUM) {
1686        return org_cap_size;
1687    } else {
1688        /* DPDK packets are always TRACE_TYPE_ETH packets */
1689        return org_cap_size + ETHER_CRC_LEN;
1690    }
1691}
1692static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1693    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1694    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1695        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1696                sizeof(struct hw_timestamp_82580);
1697    else
1698        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1699}
1700
1701static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1702                libtrace_packet_t *packet, void *buffer,
1703                libtrace_rt_types_t rt_type, uint32_t flags) {
1704    assert(packet);
1705    if (packet->buffer != buffer &&
1706        packet->buf_control == TRACE_CTRL_PACKET) {
1707        free(packet->buffer);
1708    }
1709
1710    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1711        packet->buf_control = TRACE_CTRL_PACKET;
1712    } else
1713        packet->buf_control = TRACE_CTRL_EXTERNAL;
1714
1715    packet->buffer = buffer;
1716    packet->header = buffer;
1717
1718    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1719    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1720    packet->type = rt_type;
1721    return 0;
1722}
1723
1724
1725/**
1726 * Given a packet size and a link speed, computes the
1727 * time to transmit in nanoseconds.
1728 *
1729 * @param format_data The dpdk format data from which we get the link speed
1730 *        and if unset updates it in a thread safe manner
1731 * @param pkt_size The size of the packet in bytes
1732 * @return The wire time in nanoseconds
1733 */
1734static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1735        uint32_t wire_time;
1736        /* 20 extra bytes of interframe gap and preamble */
1737# if GET_MAC_CRC_CHECKSUM
1738        wire_time = ((pkt_size + 20) * 8000);
1739# else
1740        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1741# endif
1742
1743        /* Division is really slow and introduces a pipeline stall
1744         * The compiler will optimise this into magical multiplication and shifting
1745         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1746         */
1747retry_calc_wiretime:
1748        switch (format_data->link_speed) {
1749        case ETH_LINK_SPEED_40G:
1750                wire_time /=  ETH_LINK_SPEED_40G;
1751                break;
1752        case ETH_LINK_SPEED_20G:
1753                wire_time /= ETH_LINK_SPEED_20G;
1754                break;
1755        case ETH_LINK_SPEED_10G:
1756                wire_time /= ETH_LINK_SPEED_10G;
1757                break;
1758        case ETH_LINK_SPEED_1000:
1759                wire_time /= ETH_LINK_SPEED_1000;
1760                break;
1761        case 0:
1762                {
1763                /* Maybe the link was down originally, but now it should be up */
1764                struct rte_eth_link link = {0};
1765                rte_eth_link_get_nowait(format_data->port, &link);
1766                if (link.link_status && link.link_speed) {
1767                        format_data->link_speed = link.link_speed;
1768#ifdef DEBUG
1769                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1770#endif
1771                        goto retry_calc_wiretime;
1772                }
1773                /* We don't know the link speed, make sure numbers are counting up */
1774                wire_time = 1;
1775                break;
1776                }
1777        default:
1778                wire_time /= format_data->link_speed;
1779        }
1780        return wire_time;
1781}
1782
1783
1784
1785/*
1786 * Does any extra preperation to all captured packets
1787 * This includes adding our extra header to it with the timestamp,
1788 * and any snapping
1789 *
1790 * @param format_data The DPDK format data
1791 * @param plc The DPDK per lcore format data
1792 * @param pkts An array of size nb_pkts of DPDK packets
1793 * @param nb_pkts The number of packets in pkts and optionally packets
1794 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
1795 */
1796static inline void dpdk_ready_pkts(struct dpdk_format_data_t *format_data, struct dpdk_per_lcore_t *plc,
1797                                   struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
1798        struct dpdk_addt_hdr *hdr;
1799        size_t i;
1800        uint64_t cur_sys_time_ns;
1801#if HAS_HW_TIMESTAMPS_82580
1802        struct hw_timestamp_82580 *hw_ts;
1803        uint64_t estimated_wraps;
1804#else
1805
1806#endif
1807
1808#if USE_CLOCK_GETTIME
1809        struct timespec cur_sys_time = {0};
1810        /* This looks terrible and I feel bad doing it. But it's OK
1811         * on new kernels, because this is a fast vsyscall */
1812        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1813        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1814#else
1815        struct timeval cur_sys_time = {0};
1816        /* Also a fast vsyscall */
1817        gettimeofday(&cur_sys_time, NULL);
1818        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1819#endif
1820
1821        /* The system clock is not perfect so when running
1822         * at linerate we could timestamp a packet in the past.
1823         * To avoid this we munge the timestamp to appear 1ns
1824         * after the previous packet. We should eventually catch up
1825         * to system time since a 64byte packet on a 10G link takes 67ns.
1826         *
1827         * Note with parallel readers timestamping packets
1828         * with duplicate stamps or out of order is unavoidable without
1829         * hardware timestamping from the NIC.
1830         */
1831#if !HAS_HW_TIMESTAMPS_82580
1832        if (plc->ts_last_sys >= cur_sys_time_ns) {
1833                cur_sys_time_ns = plc->ts_last_sys + 1;
1834        }
1835#endif
1836
1837        assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); // TODO static compile time assert sometime??
1838        for (i = 0 ; i < nb_pkts ; ++i) {
1839
1840                /* We put our header straight after the dpdk header */
1841                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1842                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1843
1844#if GET_MAC_CRC_CHECKSUM
1845                /* Add back in the CRC sum */
1846                pkts[i]->pkt.pkt_len += ETHER_CRC_LEN;
1847                pkts[i]->pkt.data_len += ETHER_CRC_LEN;
1848                hdr->flags |= INCLUDES_CHECKSUM;
1849#endif
1850
1851                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1852
1853#if HAS_HW_TIMESTAMPS_82580
1854                /* The timestamp is sitting before our packet and is included in pkt_len */
1855                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1856                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1857                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1858
1859                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1860                 *
1861                 *        +----------+---+   +--------------+
1862                 *  82580 |    24    | 8 |   |      32      |
1863                 *        +----------+---+   +--------------+
1864                 *          reserved  \______ 40 bits _____/
1865                 *
1866                 * The 40 bit 82580 SYSTIM overflows every
1867                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1868                 *
1869                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1870                 * Endian (for the full 64 bits) i.e. picture is mirrored
1871                 */
1872
1873                /* Despite what the documentation says this is in Little
1874                 * Endian byteorder. Mask the reserved section out.
1875                 */
1876                hdr->timestamp = le64toh(hw_ts->timestamp) &
1877                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1878
1879                if (unlikely(plc->ts_first_sys == 0)) {
1880                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1881                        plc->ts_last_sys = plc->ts_first_sys;
1882                }
1883
1884                /* This will have serious problems if packets aren't read quickly
1885                 * that is within a couple of seconds because our clock cycles every
1886                 * 18 seconds */
1887                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1888                                  / (1ull<<TS_NBITS_82580);
1889
1890                /* Estimated_wraps gives the number of times the counter should have
1891                 * wrapped (however depending on value last time it could have wrapped
1892                 * twice more (if hw clock is close to its max value) or once less (allowing
1893                 * for a bit of variance between hw and sys clock). But if the clock
1894                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1895                if (unlikely(estimated_wraps >= 2)) {
1896                        /* 2 or more wrap arounds add all but the very last wrap */
1897                        plc->wrap_count += estimated_wraps - 1;
1898                }
1899
1900                /* Set the timestamp to the lowest possible value we're considering */
1901                hdr->timestamp += plc->ts_first_sys +
1902                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1903
1904                /* In most runs only the first if() will need evaluating - i.e our
1905                 * estimate is correct. */
1906                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1907                                              hdr->timestamp, MAXSKEW_82580))) {
1908                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1909                        plc->wrap_count++;
1910                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1911                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1912                                             hdr->timestamp, MAXSKEW_82580)) {
1913                                /* Failed to match estimated_wraps */
1914                                plc->wrap_count++;
1915                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1916                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1917                                                     hdr->timestamp, MAXSKEW_82580)) {
1918                                        if (estimated_wraps == 0) {
1919                                                /* 0 case Failed to match estimated_wraps+2 */
1920                                                printf("WARNING - Hardware Timestamp failed to"
1921                                                       " match using systemtime!\n");
1922                                                hdr->timestamp = cur_sys_time_ns;
1923                                        } else {
1924                                                /* Failed to match estimated_wraps+1 */
1925                                                plc->wrap_count++;
1926                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1927                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1928                                                                     hdr->timestamp, MAXSKEW_82580)) {
1929                                                        /* Failed to match estimated_wraps+2 */
1930                                                        printf("WARNING - Hardware Timestamp failed to"
1931                                                               " match using systemtime!!\n");
1932                                                }
1933                                        }
1934                                }
1935                        }
1936                }
1937#else
1938
1939                hdr->timestamp = cur_sys_time_ns;
1940                /* Offset the next packet by the wire time of previous */
1941                calculate_wire_time(format_data, hdr->cap_len);
1942
1943#endif
1944                if(packets) {
1945                        packets[i]->buffer = pkts[i];
1946                        packets[i]->header = pkts[i];
1947#if HAS_HW_TIMESTAMPS_82580
1948                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
1949                                              RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580);
1950#else
1951                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
1952                                              RTE_PKTMBUF_HEADROOM;
1953#endif
1954                        packets[i]->error = 1;
1955                }
1956        }
1957
1958        plc->ts_last_sys = cur_sys_time_ns;
1959
1960        return;
1961}
1962
1963
1964static void dpdk_fin_packet(libtrace_packet_t *packet)
1965{
1966        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1967                rte_pktmbuf_free(packet->buffer);
1968                packet->buffer = NULL;
1969        }
1970}
1971
1972
1973static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1974    int nb_rx; /* Number of rx packets we've received */
1975
1976    /* Free the last packet buffer */
1977    if (packet->buffer != NULL) {
1978        /* Buffer is owned by DPDK */
1979        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1980            rte_pktmbuf_free(packet->buffer);
1981            packet->buffer = NULL;
1982        } else
1983        /* Buffer is owned by packet i.e. has been malloc'd */
1984        if (packet->buf_control == TRACE_CTRL_PACKET) {
1985            free(packet->buffer);
1986            packet->buffer = NULL;
1987        }
1988    }
1989
1990    packet->buf_control = TRACE_CTRL_EXTERNAL;
1991    packet->type = TRACE_RT_DATA_DPDK;
1992
1993    /* Check if we already have some packets buffered */
1994    if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
1995            packet->buffer =  FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
1996            dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1997            return 1; // TODO should be bytes read, which essentially useless anyway
1998    }
1999    /* Wait for a packet */
2000    while (1) {
2001        /* Poll for a single packet */
2002        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2003                                 FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2004        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
2005                FORMAT(libtrace)->burst_size = nb_rx;
2006                FORMAT(libtrace)->burst_offset = 1;
2007                dpdk_ready_pkts(FORMAT(libtrace), &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
2008                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2009                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2010                return 1; // TODO should be bytes read, which essentially useless anyway
2011        }
2012        if (libtrace_halt) {
2013                return 0;
2014        }
2015        /* Wait a while, polling on memory degrades performance
2016         * This relieves the pressure on memory allowing the NIC to DMA */
2017        rte_delay_us(10);
2018    }
2019
2020    /* We'll never get here - but if we did it would be bad */
2021    return -1;
2022}
2023
2024static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
2025    size_t nb_rx; /* Number of rx packets we've recevied */
2026    struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2027    size_t i;
2028
2029    for (i = 0 ; i < nb_packets ; ++i) {
2030            /* Free the last packet buffer */
2031            if (packets[i]->buffer != NULL) {
2032                /* Buffer is owned by DPDK */
2033                if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) {
2034                    rte_pktmbuf_free(packets[i]->buffer);
2035                    packets[i]->buffer = NULL;
2036                } else
2037                /* Buffer is owned by packet i.e. has been malloc'd */
2038                if (packets[i]->buf_control == TRACE_CTRL_PACKET) {
2039                    free(packets[i]->buffer);
2040                    packets[i]->buffer = NULL;
2041                }
2042            }
2043            packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2044            packets[i]->type = TRACE_RT_DATA_DPDK;
2045    }
2046
2047    /* Wait for a packet */
2048    while (1) {
2049        /* Poll for a single packet */
2050        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
2051                            PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets);
2052        if (nb_rx > 0) {
2053                /* Got some packets - otherwise we keep spining */
2054                //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2055                dpdk_ready_pkts(FORMAT(libtrace), PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
2056                return nb_rx;
2057        }
2058        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
2059        if (libtrace_message_queue_count(&t->messages) > 0) {
2060                printf("Extra message yay");
2061                return -2;
2062        }
2063        if (libtrace_halt) {
2064                return 0;
2065        }
2066        /* Wait a while, polling on memory degrades performance
2067         * This relieves the pressure on memory allowing the NIC to DMA */
2068        rte_delay_us(10);
2069    }
2070
2071    /* We'll never get here - but if we did it would be bad */
2072    return -1;
2073}
2074
2075static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2076    struct timeval tv;
2077    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2078
2079    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2080    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2081    return tv;
2082}
2083
2084static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2085    struct timespec ts;
2086    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2087
2088    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2089    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2090    return ts;
2091}
2092
2093static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2094    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2095}
2096
2097static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2098    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2099    return (libtrace_direction_t) hdr->direction;
2100}
2101
2102static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2103    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2104    hdr->direction = (uint8_t) direction;
2105    return (libtrace_direction_t) hdr->direction;
2106}
2107
2108/*
2109 * NOTE: Drops could occur for other reasons than running out of buffer
2110 * space. Such as failed MAC checksums and oversized packets.
2111 */
2112static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
2113    struct rte_eth_stats stats = {0};
2114
2115    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2116        return UINT64_MAX;
2117    /* Grab the current stats */
2118    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2119
2120    /* Get the drop counter */
2121    return (uint64_t) stats.ierrors;
2122}
2123
2124static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
2125    struct rte_eth_stats stats = {0};
2126
2127    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2128        return UINT64_MAX;
2129    /* Grab the current stats */
2130    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2131
2132    /* Get the drop counter */
2133    return (uint64_t) stats.ipackets;
2134}
2135
2136/*
2137 * This is the number of packets filtered by the NIC
2138 * and maybe ahead of number read using libtrace.
2139 *
2140 * XXX we are yet to implement any filtering, but if it was this should
2141 * get the result. So this will just return 0 for now.
2142 */
2143static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
2144    struct rte_eth_stats stats = {0};
2145
2146    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2147        return UINT64_MAX;
2148    /* Grab the current stats */
2149    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2150
2151    /* Get the drop counter */
2152    return (uint64_t) stats.fdirmiss;
2153}
2154
2155/* Attempts to read a packet in a non-blocking fashion. If one is not
2156 * available a SLEEP event is returned. We do not have the ability to
2157 * create a select()able file descriptor in DPDK.
2158 */
2159static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2160                                        libtrace_packet_t *packet) {
2161    libtrace_eventobj_t event = {0,0,0.0,0};
2162    int nb_rx; /* Number of receive packets we've read */
2163    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2164
2165    do {
2166
2167        /* See if we already have a packet waiting */
2168        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2169                        FORMAT(trace)->queue_id, pkts_burst, 1);
2170
2171        if (nb_rx > 0) {
2172            /* Free the last packet buffer */
2173            if (packet->buffer != NULL) {
2174                /* Buffer is owned by DPDK */
2175                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2176                    rte_pktmbuf_free(packet->buffer);
2177                    packet->buffer = NULL;
2178                } else
2179                /* Buffer is owned by packet i.e. has been malloc'd */
2180                if (packet->buf_control == TRACE_CTRL_PACKET) {
2181                    free(packet->buffer);
2182                    packet->buffer = NULL;
2183                }
2184            }
2185
2186            packet->buf_control = TRACE_CTRL_EXTERNAL;
2187            packet->type = TRACE_RT_DATA_DPDK;
2188            event.type = TRACE_EVENT_PACKET;
2189            dpdk_ready_pkts(FORMAT(trace), &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
2190            event.size = 1; // TODO should be bytes read, which essentially useless anyway
2191
2192            /* XXX - Check this passes the filter trace_read_packet normally
2193             * does this for us but this wont */
2194            if (trace->filter) {
2195                if (!trace_apply_filter(trace->filter, packet)) {
2196                    /* Failed the filter so we loop for another packet */
2197                    trace->filtered_packets ++;
2198                    continue;
2199                }
2200            }
2201            trace->accepted_packets ++;
2202        } else {
2203            /* We only want to sleep for a very short time - we are non-blocking */
2204            event.type = TRACE_EVENT_SLEEP;
2205            event.seconds = 0.0001;
2206            event.size = 0;
2207        }
2208
2209        /* If we get here we have our event */
2210        break;
2211    } while (1);
2212
2213    return event;
2214}
2215
2216
2217static void dpdk_help(void) {
2218    printf("dpdk format module: $Revision: 1752 $\n");
2219    printf("Supported input URIs:\n");
2220    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2221    printf("\tThe -<coreid> is optional \n");
2222    printf("\t e.g. dpdk:0000:01:00.1\n");
2223    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2224    printf("\t By default the last CPU core is used if not otherwise specified.\n");
2225    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2226    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2227    printf("\n");
2228    printf("Supported output URIs:\n");
2229    printf("\tSame format as the input URI.\n");
2230    printf("\t e.g. dpdk:0000:01:00.1\n");
2231    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2232    printf("\n");
2233}
2234
2235static struct libtrace_format_t dpdk = {
2236        "dpdk",
2237        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
2238        TRACE_FORMAT_DPDK,
2239        NULL,                   /* probe filename */
2240        NULL,                               /* probe magic */
2241        dpdk_init_input,            /* init_input */
2242        dpdk_config_input,          /* config_input */
2243        dpdk_start_input,           /* start_input */
2244        dpdk_pause_input,           /* pause_input */
2245        dpdk_init_output,           /* init_output */
2246        NULL,                               /* config_output */
2247        dpdk_start_output,          /* start_ouput */
2248        dpdk_fin_input,             /* fin_input */
2249        dpdk_fin_output,        /* fin_output */
2250        dpdk_read_packet,           /* read_packet */
2251        dpdk_prepare_packet,    /* prepare_packet */
2252        dpdk_fin_packet,                                    /* fin_packet */
2253        dpdk_write_packet,          /* write_packet */
2254        dpdk_get_link_type,         /* get_link_type */
2255        dpdk_get_direction,         /* get_direction */
2256        dpdk_set_direction,         /* set_direction */
2257        NULL,                               /* get_erf_timestamp */
2258        dpdk_get_timeval,           /* get_timeval */
2259        dpdk_get_timespec,          /* get_timespec */
2260        NULL,                               /* get_seconds */
2261        NULL,                               /* seek_erf */
2262        NULL,                               /* seek_timeval */
2263        NULL,                               /* seek_seconds */
2264        dpdk_get_capture_length,/* get_capture_length */
2265        dpdk_get_wire_length,   /* get_wire_length */
2266        dpdk_get_framing_length,/* get_framing_length */
2267        dpdk_set_capture_length,/* set_capture_length */
2268        NULL,                               /* get_received_packets */
2269        dpdk_get_filtered_packets,/* get_filtered_packets */
2270        dpdk_get_dropped_packets,/* get_dropped_packets */
2271    dpdk_get_captured_packets,/* get_captured_packets */
2272        NULL,                       /* get_fd */
2273        dpdk_trace_event,               /* trace_event */
2274    dpdk_help,              /* help */
2275    NULL,                   /* next pointer */
2276    {true, 8},              /* Live, NICs typically have 8 threads */
2277    dpdk_pstart_input, /* pstart_input */
2278        dpdk_pread_packets, /* pread_packets */
2279        dpdk_pause_input, /* ppause */
2280        dpdk_fin_input, /* p_fin */
2281        dpdk_pconfig_input, /* pconfig_input */
2282    dpdk_pregister_thread, /* pregister_thread */
2283    dpdk_punregister_thread /* unpregister_thread */
2284};
2285
2286void dpdk_constructor(void) {
2287        register_format(&dpdk);
2288}
Note: See TracBrowser for help on using the repository browser.