source: lib/format_dpdk.c @ 2badac9

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2badac9 was 2badac9, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Fixes whitelisting DPDK ports cards with DPDK 1.7, allowing more than one port to have the igb_uio driver loaded.

  • Property mode set to 100644
File size: 70.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#define _GNU_SOURCE
44
45#include "config.h"
46#include "libtrace.h"
47#include "libtrace_int.h"
48#include "format_helper.h"
49#include "libtrace_arphrd.h"
50#include "hash_toeplitz.h"
51
52#ifdef HAVE_INTTYPES_H
53#  include <inttypes.h>
54#else
55# error "Can't find inttypes.h"
56#endif
57
58#include <stdlib.h>
59#include <assert.h>
60#include <unistd.h>
61#include <endian.h>
62#include <string.h>
63
64/* We can deal with any minor differences by checking the RTE VERSION
65 * Typically DPDK backports some fixes (typically for building against
66 * newer kernels) to the older version of DPDK.
67 *
68 * These get released with the rX suffix. The following macros where added
69 * in these new releases.
70 *
71 * Below this is a log of version that required changes to the libtrace
72 * code (that we still attempt to support).
73 *
74 * Currently 1.5 to 1.7 is supported.
75 */
76#include <rte_eal.h>
77#include <rte_version.h>
78#ifndef RTE_VERSION_NUM
79#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
80#endif
81#ifndef RTE_VER_PATCH_RELEASE
82#       define RTE_VER_PATCH_RELEASE 0
83#endif
84#ifndef RTE_VERSION
85#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
86        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
87#endif
88
89/* 1.6.0r2 :
90 *      rte_eal_pci_set_blacklist() is removed
91 *      device_list is renamed to pci_device_list
92 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
93 *      as such we do apply the whitelist before rte_eal_init.
94 *      This also works correctly with DPDK 1.6.0r2.
95 *
96 * Replaced by:
97 *      rte_devargs (we can simply whitelist)
98 */
99#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
100#       define DPDK_USE_BLACKLIST 1
101#else
102#       define DPDK_USE_BLACKLIST 0
103#endif
104
105/*
106 * 1.7.0 :
107 *      rte_pmd_init_all is removed
108 *
109 * Replaced by:
110 *      Nothing, no longer needed
111 */
112#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
113#       define DPDK_USE_PMD_INIT 1
114#else
115#       define DPDK_USE_PMD_INIT 0
116#endif
117
118#include <rte_per_lcore.h>
119#include <rte_debug.h>
120#include <rte_errno.h>
121#include <rte_common.h>
122#include <rte_log.h>
123#include <rte_memcpy.h>
124#include <rte_prefetch.h>
125#include <rte_branch_prediction.h>
126#include <rte_pci.h>
127#include <rte_ether.h>
128#include <rte_ethdev.h>
129#include <rte_ring.h>
130#include <rte_mempool.h>
131#include <rte_mbuf.h>
132#include <rte_launch.h>
133#include <rte_lcore.h>
134#include <rte_per_lcore.h>
135#include <pthread.h>
136
137/* The default size of memory buffers to use - This is the max size of standard
138 * ethernet packet less the size of the MAC CHECKSUM */
139#define RX_MBUF_SIZE 1514
140
141/* The minimum number of memory buffers per queue tx or rx. Search for
142 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
143 */
144#define MIN_NB_BUF 64
145
146/* Number of receive memory buffers to use
147 * By default this is limited by driver to 4k and must be a multiple of 128.
148 * A modification can be made to the driver to remove this limit.
149 * This can be increased in the driver and here.
150 * Should be at least MIN_NB_BUF.
151 */
152#define NB_RX_MBUF 4096
153
154/* Number of send memory buffers to use.
155 * Same limits apply as those to NB_TX_MBUF.
156 */
157#define NB_TX_MBUF 1024
158
159/* The size of the PCI blacklist needs to be big enough to contain
160 * every PCI device address (listed by lspci every bus:device.function tuple).
161 */
162#define BLACK_LIST_SIZE 50
163
164/* The maximum number of characters the mempool name can be */
165#define MEMPOOL_NAME_LEN 20
166
167#define MBUF(x) ((struct rte_mbuf *) x)
168/* Get the original placement of the packet data */
169#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
170#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
171#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
172
173#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
174                        (uint64_t) tv.tv_usec*1000ull)
175#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
176                        (uint64_t) ts.tv_nsec)
177
178#if RTE_PKTMBUF_HEADROOM != 128
179#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
180         "any libtrace instance processing these packet must be have the" \
181         "same RTE_PKTMBUF_HEADROOM set"
182#endif
183
184/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
185 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
186 *
187 * Make sure you understand what these are doing before enabling them.
188 * They might make traces incompatable with other builds etc.
189 *
190 * These are also included to show how to do somethings which aren't
191 * obvious in the DPDK documentation.
192 */
193
194/* Print verbose messages to stdout */
195#define DEBUG 1
196
197/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
198 * only turn on if you know clock_gettime is a vsyscall on your system
199 * overwise could be a large overhead. Again gettimeofday() should be
200 * vsyscall also if it's not you should seriously consider updating your
201 * kernel.
202 */
203#ifdef HAVE_LIBRT
204/* You can turn this on (set to 1) to prefer clock_gettime */
205#define USE_CLOCK_GETTIME 0
206#else
207/* DONT CHANGE THIS !!! */
208#define USE_CLOCK_GETTIME 0
209#endif
210
211/* This is fairly safe to turn on - currently there appears to be a 'bug'
212 * in DPDK that will remove the checksum by making the packet appear 4bytes
213 * smaller than what it really is. Most formats don't include the checksum
214 * hence writing out a port such as int: ring: and dpdk: assumes there
215 * is no checksum and will attempt to write the checksum as part of the
216 * packet
217 */
218#define GET_MAC_CRC_CHECKSUM 0
219
220/* This requires a modification of the pmd drivers (inside Intel DPDK)
221 */
222#define HAS_HW_TIMESTAMPS_82580 0
223
224#if HAS_HW_TIMESTAMPS_82580
225# define TS_NBITS_82580     40
226/* The maximum on the +ve or -ve side that we can be, make it half way */
227# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
228#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
229#endif
230
231/* As per Intel 82580 specification - mismatch in 82580 datasheet
232 * it states ts is stored in Big Endian, however its actually Little */
233struct hw_timestamp_82580 {
234    uint64_t reserved;
235    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
236};
237
238enum paused_state {
239    DPDK_NEVER_STARTED,
240    DPDK_RUNNING,
241    DPDK_PAUSED,
242};
243
244struct dpdk_per_lcore_t
245{
246        // TODO move time stamp stuff here
247        uint16_t queue_id;
248        uint8_t port;
249};
250
251/* Used by both input and output however some fields are not used
252 * for output */
253struct dpdk_format_data_t {
254    int8_t promisc; /* promiscuous mode - RX only */
255    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
256    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
257    uint8_t paused; /* See paused_state */ 
258    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
259    int snaplen; /* The snap length for the capture - RX only */
260    /* We always have to setup both rx and tx queues even if we don't want them */
261    int nb_rx_buf; /* The number of packet buffers in the rx ring */
262    int nb_tx_buf; /* The number of packet buffers in the tx ring */
263    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
264#if DPDK_USE_BLACKLIST
265    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
266        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
267#endif
268    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
269    uint8_t rss_key[40]; // This is the RSS KEY
270#if HAS_HW_TIMESTAMPS_82580
271    /* Timestamping only relevent to RX */
272    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
273    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
274    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
275#endif
276        // DPDK normally seems to have a limit of 8 queues for a given card
277        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
278};
279
280enum dpdk_addt_hdr_flags {
281    INCLUDES_CHECKSUM = 0x1,
282    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
283};
284
285/**
286 * A structure placed in front of the packet where we can store
287 * additional information about the given packet.
288 * +--------------------------+
289 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
290 * +--------------------------+
291 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
292 * +--------------------------+
293 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
294 * +--------------------------+
295 * |   sizeof(dpdk_addt_hdr)  | 1 byte
296 * +--------------------------+
297 * *   hw_timestamp_82580     * 16 bytes Optional
298 * +--------------------------+
299 * |       Packet data        | Variable Size
300 * |                          |
301 */
302struct dpdk_addt_hdr {
303    uint64_t timestamp;
304    uint8_t flags;
305    uint8_t direction;
306    uint8_t reserved1;
307    uint8_t reserved2;
308    uint32_t cap_len; /* The size to say the capture is */
309};
310
311/**
312 * We want to blacklist all devices except those on the whitelist
313 * (I say list, but yes it is only the one).
314 *
315 * The default behaviour of rte_pci_probe() will map every possible device
316 * to its DPDK driver. The DPDK driver will take the ethernet device
317 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
318 *
319 * So blacklist all devices except the one that we wish to use so that
320 * the others can still be used as standard ethernet ports.
321 *
322 * @return 0 if successful, otherwise -1 on error.
323 */
324#if DPDK_USE_BLACKLIST
325static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
326{
327        struct rte_pci_device *dev = NULL;
328        format_data->nb_blacklist = 0;
329
330        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
331
332        TAILQ_FOREACH(dev, &device_list, next) {
333        if (whitelist != NULL && whitelist->domain == dev->addr.domain
334            && whitelist->bus == dev->addr.bus
335            && whitelist->devid == dev->addr.devid
336            && whitelist->function == dev->addr.function)
337            continue;
338                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
339                                / sizeof (format_data->blacklist[0])) {
340                        printf("Warning: too many devices to blacklist consider"
341                                        " increasing BLACK_LIST_SIZE");
342                        break;
343                }
344                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
345                ++format_data->nb_blacklist;
346        }
347
348        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
349        return 0;
350}
351#else /* DPDK_USE_BLACKLIST */
352#include <rte_devargs.h>
353static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
354{
355        char pci_str[20] = {0};
356        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
357                 whitelist->domain,
358                 whitelist->bus,
359                 whitelist->devid,
360                 whitelist->function);
361        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
362                return -1;
363        }
364        return 0;
365}
366#endif
367
368/**
369 * Parse the URI format as a pci address
370 * Fills in addr, note core is optional and is unchanged if
371 * a value for it is not provided.
372 *
373 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
374 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
375 */
376static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
377    char * wrkstr;
378    char * pch;
379    assert(str);
380    wrkstr = strdup(str);
381   
382    pch = strtok(wrkstr,":");
383    if (pch == NULL || pch[0] == 0) {
384        free(wrkstr); return -1;
385    }
386    addr->domain = (uint16_t) atoi(pch);
387
388    pch = strtok(NULL,":");
389    if (pch == NULL || pch[0] == 0) {
390        free(wrkstr); return -1;
391    }
392    addr->bus = (uint8_t) atoi(pch);
393
394    pch = strtok(NULL,".");
395    if (pch == NULL || pch[0] == 0) {
396        free(wrkstr); return -1;
397    }
398    addr->devid = (uint8_t) atoi(pch);
399
400    pch = strtok(NULL,"-"); /* Might not find the '-' it's optional */
401    if (pch == NULL || pch[0] == 0) {
402        free(wrkstr); return -1;
403    }
404    addr->function = (uint8_t) atoi(pch);
405
406    pch = strtok(NULL, ""); /* Find end of string */
407   
408    if (pch != NULL && pch[0] != 0) {
409        *core = (long) atoi(pch);
410    }
411
412    free(wrkstr);
413    return 0;
414}
415
416#if DEBUG
417/* For debugging */
418static inline void dump_configuration()
419{
420    struct rte_config * global_config;
421    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
422   
423    if (nb_cpu <= 0) {
424        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
425        nb_cpu = 1; /* fallback to just 1 core */
426    }
427    if (nb_cpu > RTE_MAX_LCORE)
428        nb_cpu = RTE_MAX_LCORE;
429   
430    global_config = rte_eal_get_configuration();
431   
432    if (global_config != NULL) {
433        int i;
434        fprintf(stderr, "Intel DPDK setup\n"
435               "---Version      : %s\n"
436               "---Master LCore : %"PRIu32"\n"
437               "---LCore Count  : %"PRIu32"\n",
438               rte_version(),
439               global_config->master_lcore, global_config->lcore_count);
440       
441        for (i = 0 ; i < nb_cpu; i++) {
442            fprintf(stderr, "   ---Core %d : %s\n", i, 
443                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
444        }
445       
446        const char * proc_type;
447        switch (global_config->process_type) {
448            case RTE_PROC_AUTO:
449                proc_type = "auto";
450                break;
451            case RTE_PROC_PRIMARY:
452                proc_type = "primary";
453                break;
454            case RTE_PROC_SECONDARY:
455                proc_type = "secondary";
456                break;
457            case RTE_PROC_INVALID:
458                proc_type = "invalid";
459                break;
460            default:
461                proc_type = "something worse than invalid!!";
462        }
463        fprintf(stderr, "---Process Type : %s\n", proc_type);
464    }
465   
466}
467#endif
468
469/**
470 * Expects to be called from the master lcore and moves it to the given dpdk id
471 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
472 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
473 *               and not already in use.
474 * @return 0 is successful otherwise -1 on error.
475 */
476static inline int dpdk_move_master_lcore(size_t core) {
477    struct rte_config *cfg = rte_eal_get_configuration();
478    cpu_set_t cpuset;
479    int i;
480
481    assert (core < RTE_MAX_LCORE);
482    assert (rte_get_master_lcore() == rte_lcore_id());
483
484    if (core == rte_lcore_id())
485        return 0;
486
487    // Make sure we are not overwriting someone else
488    assert(!rte_lcore_is_enabled(core));
489
490    // Move the core
491    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
492    cfg->lcore_role[core] = ROLE_RTE;
493    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
494    rte_eal_get_configuration()->master_lcore = core;
495    RTE_PER_LCORE(_lcore_id) = core;
496
497    // Now change the affinity
498    CPU_ZERO(&cpuset);
499
500    if (lcore_config[core].detected) {
501        CPU_SET(core, &cpuset);
502    } else {
503        for (i = 0; i < RTE_MAX_LCORE; ++i) {
504            if (lcore_config[i].detected)
505                CPU_SET(i, &cpuset);
506        }
507    }
508
509    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
510    if (i != 0) {
511        // TODO proper libtrace style error here!!
512        fprintf(stderr, "pthread_setaffinity_np failed\n");
513        return -1;
514    }
515    return 0;
516}
517
518
519/**
520 * XXX This is very bad XXX
521 * But we have to do something to allow getopts nesting
522 * Luckly normally the format is last so it doesn't matter
523 * DPDK only supports modern systems so hopefully this
524 * will continue to work
525 */
526struct saved_getopts {
527        char *optarg;
528        int optind;
529        int opterr;
530        int optopt;
531};
532
533static void save_getopts(struct saved_getopts *opts) {
534        opts->optarg = optarg;
535        opts->optind = optind;
536        opts->opterr = opterr;
537        opts->optopt = optopt;
538}
539
540static void restore_getopts(struct saved_getopts *opts) {
541        optarg = opts->optarg;
542        optind = opts->optind;
543        opterr = opts->opterr;
544        optopt = opts->optopt;
545}
546
547static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
548                                        char * err, int errlen) {
549    int ret; /* Returned error codes */
550    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
551    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
552    char mem_map[20] = {0}; /* The memory name */
553    long nb_cpu; /* The number of CPUs in the system */
554    long my_cpu; /* The CPU number we want to bind to */
555    int i;
556    struct rte_config *cfg = rte_eal_get_configuration();
557        struct saved_getopts save_opts;
558   
559#if DEBUG
560    rte_set_log_level(RTE_LOG_DEBUG);
561#else
562    rte_set_log_level(RTE_LOG_WARNING);
563#endif
564    /*
565     * Using unique file prefixes mean separate memory is used, unlinking
566     * the two processes. However be careful we still cannot access a
567     * port that already in use.
568     *
569     * Using unique file prefixes mean separate memory is used, unlinking
570     * the two processes. However be careful we still cannot access a
571     * port that already in use.
572     */
573    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
574                "--file-prefix", mem_map, "-m", "256", NULL};
575    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
576   
577    /* This initialises the Environment Abstraction Layer (EAL)
578     * If we had slave workers these are put into WAITING state
579     *
580     * Basically binds this thread to a fixed core, which we choose as
581     * the last core on the machine (assuming fewer interrupts mapped here).
582     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
583     * "-n" the number of memory channels into the CPU (hardware specific)
584     *      - Most likely to be half the number of ram slots in your machine.
585     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
586     * Controls where in memory packets are stored and should spread across
587     * the channels. We just use 1 to be safe.
588     */
589
590    /* Get the number of cpu cores in the system and use the last core */
591    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
592    if (nb_cpu <= 0) {
593        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
594        nb_cpu = 1; /* fallback to the first core */
595    }
596    if (nb_cpu > RTE_MAX_LCORE)
597        nb_cpu = RTE_MAX_LCORE;
598
599    my_cpu = nb_cpu;
600    /* This allows the user to specify the core - we would try to do this
601     * automatically but it's hard to tell that this is secondary
602     * before running rte_eal_init(...). Currently we are limited to 1
603     * instance per core due to the way memory is allocated. */
604    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
605        snprintf(err, errlen, "Failed to parse URI");
606        return -1;
607    }
608
609    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
610                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
611
612    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
613        snprintf(err, errlen, 
614          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
615          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
616        return -1;
617    }
618
619    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
620       info older versions */
621    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
622    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
623
624#if !DPDK_USE_BLACKLIST
625    /* Black list all ports besides the one that we want to use */
626        if ((ret = whitelist_devices(format_data, &use_addr)) < 0) {
627                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
628                         " are you sure the address is correct?: %s", strerror(-ret));
629                return -1;
630        }
631#endif
632
633        /* Give the memory map a unique name */
634        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
635    /* rte_eal_init it makes a call to getopt so we need to reset the
636     * global optind variable of getopt otherwise this fails */
637        save_getopts(&save_opts);
638    optind = 1;
639    if ((ret = rte_eal_init(argc, argv)) < 0) {
640        snprintf(err, errlen, 
641          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
642        return -1;
643    }
644        restore_getopts(&save_opts);
645    // These are still running but will never do anything with DPDK v1.7 we
646    // should remove this XXX in the future
647    for(i = 0; i < RTE_MAX_LCORE; ++i) {
648        if (rte_lcore_is_enabled(i) && i != rte_get_master_lcore()) {
649            cfg->lcore_role[i] = ROLE_OFF;
650            cfg->lcore_count--;
651        }
652    }
653    // Only the master should be running
654    assert(cfg->lcore_count == 1);
655
656    dpdk_move_master_lcore(my_cpu-1);
657
658#if DEBUG
659    dump_configuration();
660#endif
661
662#if DPDK_USE_PMD_INIT
663    /* This registers all available NICs with Intel DPDK
664     * These are not loaded until rte_eal_pci_probe() is called.
665     */
666    if ((ret = rte_pmd_init_all()) < 0) {
667        snprintf(err, errlen, 
668          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
669        return -1;
670    }
671#endif
672
673#if DPDK_USE_BLACKLIST
674    /* Black list all ports besides the one that we want to use */
675        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
676                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
677                         " are you sure the address is correct?: %s", strerror(-ret));
678                return -1;
679        }
680#endif
681
682    /* This loads DPDK drivers against all ports that are not blacklisted */
683        if ((ret = rte_eal_pci_probe()) < 0) {
684        snprintf(err, errlen, 
685            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
686        return -1;
687    }
688
689    format_data->nb_ports = rte_eth_dev_count();
690
691    if (format_data->nb_ports != 1) {
692        snprintf(err, errlen, 
693            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
694            format_data->nb_ports);
695        return -1;
696    }
697   
698    struct rte_eth_dev_info dev_info;
699    rte_eth_dev_info_get(0, &dev_info);
700    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d", 
701                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
702
703    return 0;
704}
705
706static int dpdk_init_input (libtrace_t *libtrace) {
707    char err[500];
708    err[0] = 0;
709   
710    libtrace->format_data = (struct dpdk_format_data_t *)
711                            malloc(sizeof(struct dpdk_format_data_t));
712    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
713    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
714    FORMAT(libtrace)->nb_ports = 0;
715    FORMAT(libtrace)->snaplen = 0; /* Use default */
716    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
717    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
718    FORMAT(libtrace)->promisc = -1;
719    FORMAT(libtrace)->pktmbuf_pool = NULL;
720#if DPDK_USE_BLACKLIST
721    FORMAT(libtrace)->nb_blacklist = 0;
722#endif
723    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
724    FORMAT(libtrace)->mempool_name[0] = 0;
725#if HAS_HW_TIMESTAMPS_82580
726    FORMAT(libtrace)->ts_first_sys = 0;
727    FORMAT(libtrace)->ts_last_sys = 0;
728    FORMAT(libtrace)->wrap_count = 0;
729#endif
730       
731    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
732        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
733        free(libtrace->format_data);
734        libtrace->format_data = NULL;
735        return -1;
736    }
737    return 0;
738};
739
740static int dpdk_init_output(libtrace_out_t *libtrace)
741{
742    char err[500];
743    err[0] = 0;
744   
745    libtrace->format_data = (struct dpdk_format_data_t *)
746                            malloc(sizeof(struct dpdk_format_data_t));
747    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
748    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
749    FORMAT(libtrace)->nb_ports = 0;
750    FORMAT(libtrace)->snaplen = 0; /* Use default */
751    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
752    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
753    FORMAT(libtrace)->promisc = -1;
754    FORMAT(libtrace)->pktmbuf_pool = NULL;
755#if DPDK_USE_BLACKLIST
756    FORMAT(libtrace)->nb_blacklist = 0;
757#endif
758    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
759    FORMAT(libtrace)->mempool_name[0] = 0;
760#if HAS_HW_TIMESTAMPS_82580
761    FORMAT(libtrace)->ts_first_sys = 0;
762    FORMAT(libtrace)->ts_last_sys = 0;
763    FORMAT(libtrace)->wrap_count = 0;
764#endif
765
766    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
767        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
768        free(libtrace->format_data);
769        libtrace->format_data = NULL;
770        return -1;
771    }
772    return 0;
773};
774
775static int dpdk_pconfig_input (libtrace_t *libtrace,
776                                trace_parallel_option_t option,
777                                void *data) {
778        switch (option) {
779                case TRACE_OPTION_SET_HASHER:
780                        switch (*((enum hasher_types *) data))
781                        {
782                                case HASHER_BALANCE:
783                                case HASHER_UNIDIRECTIONAL:
784                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
785                                        return 0;
786                                case HASHER_BIDIRECTIONAL:
787                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
788                                        return 0;
789                                case HASHER_HARDWARE:
790                                case HASHER_CUSTOM:
791                                        // We don't support these
792                                        return -1;
793                        }
794        break;
795        }
796        return -1;
797}
798/**
799 * Note here snaplen excludes the MAC checksum. Packets over
800 * the requested snaplen will be dropped. (Excluding MAC checksum)
801 *
802 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
803 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
804 * is set the maximum size of the returned packet would be 1518 otherwise
805 * 1514 would be the largest size possibly returned.
806 *
807 */
808static int dpdk_config_input (libtrace_t *libtrace,
809                                        trace_option_t option,
810                                        void *data) {
811    switch (option) {
812        case TRACE_OPTION_SNAPLEN:
813            /* Only support changing snaplen before a call to start is
814             * made */
815            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
816                FORMAT(libtrace)->snaplen=*(int*)data;
817            else
818                return -1;
819            return 0;
820                case TRACE_OPTION_PROMISC:
821                        FORMAT(libtrace)->promisc=*(int*)data;
822            return 0;
823        case TRACE_OPTION_FILTER:
824            /* TODO filtering */
825            break;
826        case TRACE_OPTION_META_FREQ:
827            break;
828        case TRACE_OPTION_EVENT_REALTIME:
829            break;
830        /* Avoid default: so that future options will cause a warning
831         * here to remind us to implement it, or flag it as
832         * unimplementable
833         */
834    }
835
836        /* Don't set an error - trace_config will try to deal with the
837         * option and will set an error if it fails */
838    return -1;
839}
840
841/* Can set jumbo frames/ or limit the size of a frame by setting both
842 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
843 *
844 */
845static struct rte_eth_conf port_conf = {
846        .rxmode = {
847                .mq_mode = ETH_RSS,
848                .split_hdr_size = 0,
849                .header_split   = 0, /**< Header Split disabled */
850                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
851                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
852                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
853        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
854#if GET_MAC_CRC_CHECKSUM
855/* So it appears that if hw_strip_crc is turned off the driver will still
856 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
857 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
858 * So lets just add it back on when we receive the packet.
859 */
860                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
861#else
862/* By default strip the MAC checksum because it's a bit of a hack to
863 * actually read these. And don't want to rely on disabling this to actualy
864 * always cut off the checksum in the future
865 */
866        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
867#endif
868        },
869        .txmode = {
870                .mq_mode = ETH_DCB_NONE,
871        },
872        .rx_adv_conf = {
873                .rss_conf = {
874                        // .rss_key = &rss_key, // We set this per format
875                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
876                },
877        },
878};
879
880static const struct rte_eth_rxconf rx_conf = {
881        .rx_thresh = {
882                .pthresh = 8,/* RX_PTHRESH prefetch */
883                .hthresh = 8,/* RX_HTHRESH host */
884                .wthresh = 4,/* RX_WTHRESH writeback */
885        },
886    .rx_free_thresh = 0,
887    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
888};
889
890static const struct rte_eth_txconf tx_conf = {
891        .tx_thresh = {
892        /**
893         * TX_PTHRESH prefetch
894         * Set on the NIC, if the number of unprocessed descriptors to queued on
895         * the card fall below this try grab at least hthresh more unprocessed
896         * descriptors.
897         */
898                .pthresh = 36,
899
900        /* TX_HTHRESH host
901         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
902         */
903                .hthresh = 0,
904       
905        /* TX_WTHRESH writeback
906         * Set on the NIC, the number of sent descriptors before writing back
907         * status to confirm the transmission. This is done more efficiently as
908         * a bulk DMA-transfer rather than writing one at a time.
909         * Similar to tx_free_thresh however this is applied to the NIC, where
910         * as tx_free_thresh is when DPDK will check these. This is extended
911         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
912         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
913         */
914                .wthresh = 4,
915        },
916
917    /* Used internally by DPDK rather than passed to the NIC. The number of
918     * packet descriptors to send before checking for any responses written
919     * back (to confirm the transmission). Default = 32 if set to 0)
920     */
921        .tx_free_thresh = 0,
922
923    /* This is the Report Status threshold, used by 10Gbit cards,
924     * This signals the card to only write back status (such as
925     * transmission successful) after this minimum number of transmit
926     * descriptors are seen. The default is 32 (if set to 0) however if set
927     * to greater than 1 TX wthresh must be set to zero, because this is kindof
928     * a replacement. See the dpdk programmers guide for more restrictions.
929     */
930        .tx_rs_thresh = 1,
931};
932
933/* Attach memory to the port and start the port or restart the port.
934 */
935static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
936    int ret; /* Check return values for errors */
937    struct rte_eth_link link_info; /* Wait for link */
938   
939    /* Already started */
940    if (format_data->paused == DPDK_RUNNING)
941        return 0;
942
943    /* First time started we need to alloc our memory, doing this here
944     * rather than in environment setup because we don't have snaplen then */
945    if (format_data->paused == DPDK_NEVER_STARTED) {
946        if (format_data->snaplen == 0) {
947            format_data->snaplen = RX_MBUF_SIZE;
948            port_conf.rxmode.jumbo_frame = 0;
949            port_conf.rxmode.max_rx_pkt_len = 0;
950        } else {
951            /* Use jumbo frames */
952            port_conf.rxmode.jumbo_frame = 1;
953            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
954        }
955
956        /* This is additional overhead so make sure we allow space for this */
957#if GET_MAC_CRC_CHECKSUM
958        format_data->snaplen += ETHER_CRC_LEN;
959#endif
960#if HAS_HW_TIMESTAMPS_82580
961        format_data->snaplen += sizeof(struct hw_timestamp_82580);
962#endif
963
964        /* Create the mbuf pool, which is the place our packets are allocated
965         * from - TODO figure out if there is is a free function (I cannot see one)
966         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
967         * allocate however that extra 1 packet is not used.
968         * (I assume <= vs < error some where in DPDK code)
969         * TX requires nb_tx_buffers + 1 in the case the queue is full
970         * so that will fill the new buffer and wait until slots in the
971         * ring become available.
972         */
973#if DEBUG
974    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
975#endif
976        format_data->pktmbuf_pool =
977            rte_mempool_create(format_data->mempool_name,
978                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
979                       format_data->snaplen + sizeof(struct rte_mbuf) 
980                                        + RTE_PKTMBUF_HEADROOM,
981                       8, sizeof(struct rte_pktmbuf_pool_private),
982                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
983                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
984
985        if (format_data->pktmbuf_pool == NULL) {
986            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
987                        "pool failed: %s", strerror(rte_errno));
988            return -1;
989        }
990    }
991   
992    /* ----------- Now do the setup for the port mapping ------------ */
993    /* Order of calls must be
994     * rte_eth_dev_configure()
995     * rte_eth_tx_queue_setup()
996     * rte_eth_rx_queue_setup()
997     * rte_eth_dev_start()
998     * other rte_eth calls
999     */
1000   
1001   
1002    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1003   
1004    /* This must be called first before another *eth* function
1005     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1006    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
1007    if (ret < 0) {
1008        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1009                            " %"PRIu8" : %s", format_data->port,
1010                            strerror(-ret));
1011        return -1;
1012    }
1013    /* Initialise the TX queue a minimum value if using this port for
1014     * receiving. Otherwise a larger size if writing packets.
1015     */
1016    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1017                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1018    if (ret < 0) {
1019        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1020                            " %"PRIu8" : %s", format_data->port,
1021                            strerror(-ret));
1022        return -1;
1023    }
1024    /* Initialise the RX queue with some packets from memory */
1025    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
1026                            format_data->nb_rx_buf, SOCKET_ID_ANY, 
1027                            &rx_conf, format_data->pktmbuf_pool);
1028    if (ret < 0) {
1029        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1030                    " %"PRIu8" : %s", format_data->port,
1031                    strerror(-ret));
1032        return -1;
1033    }
1034   
1035    /* Start device */
1036    ret = rte_eth_dev_start(format_data->port);
1037    if (ret < 0) {
1038        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1039                    strerror(-ret));
1040        return -1;
1041    }
1042
1043    /* Default promiscuous to on */
1044    if (format_data->promisc == -1)
1045        format_data->promisc = 1;
1046   
1047    if (format_data->promisc == 1)
1048        rte_eth_promiscuous_enable(format_data->port);
1049    else
1050        rte_eth_promiscuous_disable(format_data->port);
1051   
1052    /* Wait for the link to come up */
1053    rte_eth_link_get(format_data->port, &link_info);
1054#if DEBUG
1055    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1056            (int) link_info.link_duplex, (int) link_info.link_speed);
1057#endif
1058
1059    /* We have now successfully started/unpaused */
1060    format_data->paused = DPDK_RUNNING;
1061   
1062    return 0;
1063}
1064
1065/* Attach memory to the port and start (or restart) the port/s.
1066 */
1067static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues){
1068    int ret, i; /* Check return values for errors */
1069    struct rte_eth_link link_info; /* Wait for link */
1070   
1071    /* Already started */
1072    if (format_data->paused == DPDK_RUNNING)
1073        return 0;
1074
1075    /* First time started we need to alloc our memory, doing this here
1076     * rather than in environment setup because we don't have snaplen then */
1077    if (format_data->paused == DPDK_NEVER_STARTED) {
1078        if (format_data->snaplen == 0) {
1079            format_data->snaplen = RX_MBUF_SIZE;
1080            port_conf.rxmode.jumbo_frame = 0;
1081            port_conf.rxmode.max_rx_pkt_len = 0;
1082        } else {
1083            /* Use jumbo frames */
1084            port_conf.rxmode.jumbo_frame = 1;
1085            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1086        }
1087
1088        /* This is additional overhead so make sure we allow space for this */
1089#if GET_MAC_CRC_CHECKSUM
1090        format_data->snaplen += ETHER_CRC_LEN;
1091#endif
1092#if HAS_HW_TIMESTAMPS_82580
1093        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1094#endif
1095
1096        /* Create the mbuf pool, which is the place our packets are allocated
1097         * from - TODO figure out if there is is a free function (I cannot see one)
1098         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1099         * allocate however that extra 1 packet is not used.
1100         * (I assume <= vs < error some where in DPDK code)
1101         * TX requires nb_tx_buffers + 1 in the case the queue is full
1102         * so that will fill the new buffer and wait until slots in the
1103         * ring become available.
1104         */
1105#if DEBUG
1106    printf("Creating mempool named %s\n", format_data->mempool_name);
1107#endif
1108        format_data->pktmbuf_pool =
1109            rte_mempool_create(format_data->mempool_name,
1110                       format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1,
1111                       format_data->snaplen + sizeof(struct rte_mbuf) 
1112                                        + RTE_PKTMBUF_HEADROOM,
1113                       8, sizeof(struct rte_pktmbuf_pool_private),
1114                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1115                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
1116
1117        if (format_data->pktmbuf_pool == NULL) {
1118            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1119                        "pool failed: %s", strerror(rte_errno));
1120            return -1;
1121        }
1122    }
1123   
1124    /* ----------- Now do the setup for the port mapping ------------ */
1125    /* Order of calls must be
1126     * rte_eth_dev_configure()
1127     * rte_eth_tx_queue_setup()
1128     * rte_eth_rx_queue_setup()
1129     * rte_eth_dev_start()
1130     * other rte_eth calls
1131     */
1132   
1133    /* This must be called first before another *eth* function
1134     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1135    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1136    if (ret < 0) {
1137        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1138                            " %"PRIu8" : %s", format_data->port,
1139                            strerror(-ret));
1140        return -1;
1141    }
1142#if DEBUG
1143    printf("Doing dev configure\n");
1144#endif
1145    /* Initialise the TX queue a minimum value if using this port for
1146     * receiving. Otherwise a larger size if writing packets.
1147     */
1148    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1149                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1150    if (ret < 0) {
1151        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1152                            " %"PRIu8" : %s", format_data->port,
1153                            strerror(-ret));
1154        return -1;
1155    }
1156   
1157    for (i=0; i < rx_queues; i++) {
1158#if DEBUG
1159    printf("Doing queue configure\n");
1160#endif 
1161                /* Initialise the RX queue with some packets from memory */
1162                ret = rte_eth_rx_queue_setup(format_data->port, i,
1163                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
1164                                                                &rx_conf, format_data->pktmbuf_pool);
1165        /* Init per_thread data structures */
1166        format_data->per_lcore[i].port = format_data->port;
1167        format_data->per_lcore[i].queue_id = i;
1168
1169                if (ret < 0) {
1170                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1171                                                " %"PRIu8" : %s", format_data->port,
1172                                                strerror(-ret));
1173                        return -1;
1174                }
1175        }
1176   
1177#if DEBUG
1178    fprintf(stderr, "Doing start device\n");
1179#endif 
1180    /* Start device */
1181    ret = rte_eth_dev_start(format_data->port);
1182#if DEBUG
1183    fprintf(stderr, "Done start device\n");
1184#endif 
1185    if (ret < 0) {
1186        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1187                    strerror(-ret));
1188        return -1;
1189    }
1190
1191
1192    /* Default promiscuous to on */
1193    if (format_data->promisc == -1)
1194        format_data->promisc = 1;
1195   
1196    if (format_data->promisc == 1)
1197        rte_eth_promiscuous_enable(format_data->port);
1198    else
1199        rte_eth_promiscuous_disable(format_data->port);
1200   
1201   
1202    /* We have now successfully started/unpased */
1203    format_data->paused = DPDK_RUNNING;
1204   
1205    // Can use remote launch for all
1206    /*RTE_LCORE_FOREACH_SLAVE(i) {
1207                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
1208        }*/
1209   
1210    /* Wait for the link to come up */
1211    rte_eth_link_get(format_data->port, &link_info);
1212#if DEBUG
1213    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1214            (int) link_info.link_duplex, (int) link_info.link_speed);
1215#endif
1216
1217    return 0;
1218}
1219
1220static int dpdk_start_input (libtrace_t *libtrace) {
1221    char err[500];
1222    err[0] = 0;
1223
1224    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1225        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1226        free(libtrace->format_data);
1227        libtrace->format_data = NULL;
1228        return -1;
1229    }
1230    return 0;
1231}
1232
1233static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1234    struct rte_eth_dev_info dev_info;
1235    rte_eth_dev_info_get(port_id, &dev_info);
1236    return dev_info.max_rx_queues;
1237}
1238
1239static inline size_t dpdk_processor_count () {
1240    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1241    if (nb_cpu <= 0)
1242        return 1;
1243    else
1244        return (size_t) nb_cpu;
1245}
1246
1247static int dpdk_pstart_input (libtrace_t *libtrace) {
1248    char err[500];
1249    int i=0, phys_cores=0;
1250    int tot = libtrace->perpkt_thread_count;
1251    err[0] = 0;
1252
1253    if (rte_lcore_id() != rte_get_master_lcore())
1254        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
1255
1256    // If the master is not on the last thread we move it there
1257    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1258        // Consider error handling here
1259        dpdk_move_master_lcore(RTE_MAX_LCORE - 1) == -1;
1260    }
1261
1262    // Don't exceed the number of cores in the system/detected by dpdk
1263    // We don't have to force this but performance wont be good if we don't
1264    for (i = 0; i < RTE_MAX_LCORE; ++i) {
1265        if (lcore_config[i].detected) {
1266            if (rte_lcore_is_enabled(i))
1267                fprintf(stderr, "Found core %d already in use!\n", i);
1268            else
1269                phys_cores++;
1270        }
1271    }
1272
1273        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1274    tot = MIN(tot, phys_cores);
1275
1276        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
1277       
1278    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1279        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1280        free(libtrace->format_data);
1281        libtrace->format_data = NULL;
1282        return -1;
1283    }
1284
1285    // Make sure we only start the number that we should
1286    libtrace->perpkt_thread_count = tot;
1287    return 0;
1288}
1289
1290
1291/**
1292 * Register a thread with the DPDK system,
1293 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1294 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1295 * gives it.
1296 *
1297 * We then allow a mapper thread to be started on every real core as DPDK would
1298 * we also bind these to the corresponding CPU cores.
1299 *
1300 * @param libtrace A pointer to the trace
1301 * @param reading True if the thread will be used to read packets, i.e. will
1302 *                call pread_packet(), false if thread used to process packet
1303 *                in any other manner including statistics functions.
1304 */
1305static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1306{
1307    struct rte_config *cfg = rte_eal_get_configuration();
1308    int i;
1309    int new_id = -1;
1310
1311    // If 'reading packets' fill in cores from 0 up and bind affinity
1312    // otherwise start from the MAX core (which is also the master) and work backwards
1313    // in this case physical cores on the system will not exist so we don't bind
1314    // these to any particular physical core
1315    if (reading) {
1316        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1317            if (!rte_lcore_is_enabled(i)) {
1318                new_id = i;
1319                if (!lcore_config[i].detected)
1320                    fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
1321                break;
1322            }
1323        }
1324    } else {
1325        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1326            if (!rte_lcore_is_enabled(i)) {
1327                new_id = i;
1328                break;
1329            }
1330        }
1331    }
1332
1333    if (new_id == -1) {
1334        assert(cfg->lcore_count == RTE_MAX_LCORE);
1335        // TODO proper libtrace style error here!!
1336        fprintf(stderr, "Too many threads for DPDK!!\n");
1337        return -1;
1338    }
1339
1340    // Enable the core in global DPDK structs
1341    cfg->lcore_role[new_id] = ROLE_RTE;
1342    cfg->lcore_count++;
1343    // Set TLS to reflect our new number
1344    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
1345    fprintf(stderr, "original id%d", rte_lcore_id());
1346    RTE_PER_LCORE(_lcore_id) = new_id;
1347    fprintf(stderr, " new id%d\n", rte_lcore_id());
1348
1349    if (reading) {
1350        // Set affinity bind to corresponding core
1351        cpu_set_t cpuset;
1352        CPU_ZERO(&cpuset);
1353        CPU_SET(rte_lcore_id(), &cpuset);
1354        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1355        if (i != 0) {
1356            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
1357            return -1;
1358        }
1359    }
1360
1361    // Map our TLS to the thread data
1362    if (reading) {
1363        if(t->type == THREAD_PERPKT) {
1364            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
1365        } else {
1366            t->format_data = &FORMAT(libtrace)->per_lcore[0];
1367        }
1368    }
1369}
1370
1371
1372/**
1373 * Unregister a thread with the DPDK system.
1374 *
1375 * Only previously registered threads should be calling this just before
1376 * they are destroyed.
1377 */
1378static int dpdk_punregister_thread(libtrace_t libtrace, libtrace_thread_t *t UNUSED)
1379{
1380    struct rte_config *cfg = rte_eal_get_configuration();
1381
1382    assert(rte_lcore_id() >= 0 && rte_lcore_id() < RTE_MAX_LCORE);
1383
1384    // Skip if master!!
1385    if (rte_lcore_id() == rte_get_master_lcore()) {
1386        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1387        return 0;
1388    }
1389
1390    // Disable this core in global DPDK structs
1391    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1392    cfg->lcore_count--;
1393    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1394    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1395    return 0;
1396}
1397
1398static int dpdk_start_output(libtrace_out_t *libtrace)
1399{
1400    char err[500];
1401    err[0] = 0;
1402   
1403    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1404        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1405        free(libtrace->format_data);
1406        libtrace->format_data = NULL;
1407        return -1;
1408    }
1409    return 0;
1410}
1411
1412static int dpdk_pause_input(libtrace_t * libtrace){
1413    /* This stops the device, but can be restarted using rte_eth_dev_start() */
1414    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1415#if DEBUG     
1416        fprintf(stderr, "Pausing port\n");
1417#endif
1418        rte_eth_dev_stop(FORMAT(libtrace)->port);
1419        FORMAT(libtrace)->paused = DPDK_PAUSED;
1420        /* If we pause it the driver will be reset and likely our counter */
1421#if HAS_HW_TIMESTAMPS_82580
1422        FORMAT(libtrace)->ts_first_sys = 0;
1423        FORMAT(libtrace)->ts_last_sys = 0;
1424#endif
1425    }
1426    return 0;
1427}
1428
1429static int dpdk_write_packet(libtrace_out_t *trace, 
1430                libtrace_packet_t *packet){
1431    struct rte_mbuf* m_buff[1];
1432   
1433    int wirelen = trace_get_wire_length(packet);
1434    int caplen = trace_get_capture_length(packet);
1435   
1436    /* Check for a checksum and remove it */
1437    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1438                                            wirelen == caplen)
1439        caplen -= ETHER_CRC_LEN;
1440
1441    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1442    if (m_buff[0] == NULL) {
1443        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1444        return -1;
1445    } else {
1446        int ret;
1447        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1448        do {
1449            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1450        } while (ret != 1);
1451    }
1452
1453    return 0;
1454}
1455
1456static int dpdk_fin_input(libtrace_t * libtrace) {
1457    /* Free our memory structures */
1458    if (libtrace->format_data != NULL) {
1459        /* Close the device completely, device cannot be restarted */
1460        if (FORMAT(libtrace)->port != 0xFF)
1461            rte_eth_dev_close(FORMAT(libtrace)->port);
1462        /* filter here if we used it */
1463                free(libtrace->format_data);
1464        }
1465
1466    /* Revert to the original PCI drivers */
1467    /* No longer in DPDK
1468    rte_eal_pci_exit(); */
1469    return 0;
1470}
1471
1472
1473static int dpdk_fin_output(libtrace_out_t * libtrace) {
1474    /* Free our memory structures */
1475    if (libtrace->format_data != NULL) {
1476        /* Close the device completely, device cannot be restarted */
1477        if (FORMAT(libtrace)->port != 0xFF)
1478            rte_eth_dev_close(FORMAT(libtrace)->port);
1479        /* filter here if we used it */
1480                free(libtrace->format_data);
1481        }
1482
1483    /* Revert to the original PCI drivers */
1484    /* No longer in DPDK
1485    rte_eal_pci_exit(); */
1486    return 0;
1487}
1488
1489/**
1490 * Get the start of additional header that we added to a packet.
1491 */
1492static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1493    uint8_t *hdrsize;
1494    assert(packet);
1495    assert(packet->buffer);
1496    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
1497    /* The byte before the original packet data denotes the size in bytes
1498     * of our additional header that we added sits before the 'size byte' */
1499    hdrsize--;
1500    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
1501}
1502
1503static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1504    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1505    return hdr->cap_len;
1506}
1507
1508static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1509    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1510    if (size > hdr->cap_len) {
1511        /* Cannot make a packet bigger */
1512                return trace_get_capture_length(packet);
1513        }
1514
1515    /* Reset the cached capture length first*/
1516    packet->capture_length = -1;
1517    hdr->cap_len = (uint32_t) size;
1518        return trace_get_capture_length(packet);
1519}
1520
1521static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1522    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1523    int org_cap_size; /* The original capture size */
1524    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1525        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1526                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
1527                            sizeof(struct hw_timestamp_82580);
1528    } else {
1529        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1530                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
1531    }
1532    if (hdr->flags & INCLUDES_CHECKSUM) {
1533        return org_cap_size;
1534    } else {
1535        /* DPDK packets are always TRACE_TYPE_ETH packets */
1536        return org_cap_size + ETHER_CRC_LEN;
1537    }
1538}
1539static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1540    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1541    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1542        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1543                sizeof(struct hw_timestamp_82580);
1544    else
1545        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1546}
1547
1548static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1549                libtrace_packet_t *packet, void *buffer,
1550                libtrace_rt_types_t rt_type, uint32_t flags) {
1551    assert(packet);
1552    if (packet->buffer != buffer &&
1553        packet->buf_control == TRACE_CTRL_PACKET) {
1554        free(packet->buffer);
1555    }
1556
1557    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1558        packet->buf_control = TRACE_CTRL_PACKET;
1559    } else
1560        packet->buf_control = TRACE_CTRL_EXTERNAL;
1561
1562    packet->buffer = buffer;
1563    packet->header = buffer;
1564
1565    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1566    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1567    packet->type = rt_type;
1568    return 0;
1569}
1570
1571/*
1572 * Does any extra preperation to a captured packet.
1573 * This includes adding our extra header to it with the timestamp
1574 */
1575static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1576                                                        struct rte_mbuf* pkt){
1577    uint8_t * hdr_size;
1578    struct dpdk_addt_hdr *hdr;
1579#if HAS_HW_TIMESTAMPS_82580
1580    struct hw_timestamp_82580 *hw_ts;
1581    struct timeval cur_sys_time;
1582    uint64_t cur_sys_time_ns;
1583    uint64_t estimated_wraps;
1584   
1585    /* Using gettimeofday because it's most likely to be a vsyscall
1586     * We don't want to slow down anything with systemcalls we dont need
1587     * accauracy */
1588    gettimeofday(&cur_sys_time, NULL);
1589#else
1590# if USE_CLOCK_GETTIME
1591    struct timespec cur_sys_time;
1592   
1593    /* This looks terrible and I feel bad doing it. But it's OK
1594     * on new kernels, because this is a vsyscall */
1595    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1596# else
1597    struct timeval cur_sys_time;
1598    /* Should be a vsyscall */
1599    gettimeofday(&cur_sys_time, NULL);
1600# endif
1601#endif
1602
1603    /* Record the size of our header */
1604    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1605    *hdr_size = sizeof(struct dpdk_addt_hdr);
1606    /* Now put our header in front of that size */
1607    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1608    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1609   
1610#if GET_MAC_CRC_CHECKSUM
1611    /* Add back in the CRC sum */
1612    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1613    pkt->pkt.data_len += ETHER_CRC_LEN;
1614    hdr->flags |= INCLUDES_CHECKSUM;
1615#endif
1616
1617#if HAS_HW_TIMESTAMPS_82580
1618    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1619     *
1620     *        +----------+---+   +--------------+
1621     *  82580 |    24    | 8 |   |      32      |
1622     *        +----------+---+   +--------------+
1623     *          reserved  \______ 40 bits _____/
1624     *
1625     * The 40 bit 82580 SYSTIM overflows every
1626     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1627     *
1628     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1629     * Endian (for the full 64 bits) i.e. picture is mirrored
1630     */
1631   
1632    /* The timestamp is sitting before our packet and is included in pkt_len */
1633    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1634    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1635   
1636    /* Despite what the documentation says this is in Little
1637     * Endian byteorder. Mask the reserved section out.
1638     */
1639    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1640                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1641               
1642    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1643    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1644        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1645        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1646    }
1647   
1648    /* This will have serious problems if packets aren't read quickly
1649     * that is within a couple of seconds because our clock cycles every
1650     * 18 seconds */
1651    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1652                            / (1ull<<TS_NBITS_82580);
1653   
1654    /* Estimated_wraps gives the number of times the counter should have
1655     * wrapped (however depending on value last time it could have wrapped
1656     * twice more (if hw clock is close to its max value) or once less (allowing
1657     * for a bit of variance between hw and sys clock). But if the clock
1658     * shouldn't have wrapped once then don't allow it to go backwards in time */
1659    if (unlikely(estimated_wraps >= 2)) {
1660        /* 2 or more wrap arounds add all but the very last wrap */
1661        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1662    }
1663   
1664    /* Set the timestamp to the lowest possible value we're considering */
1665    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1666                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1667   
1668    /* In most runs only the first if() will need evaluating - i.e our
1669     * estimate is correct. */
1670    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1671                                hdr->timestamp, MAXSKEW_82580))) {
1672        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1673        FORMAT(libtrace)->wrap_count++;
1674        hdr->timestamp += (1ull<<TS_NBITS_82580);
1675        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1676                                hdr->timestamp, MAXSKEW_82580)) {
1677            /* Failed to match estimated_wraps */
1678            FORMAT(libtrace)->wrap_count++;
1679            hdr->timestamp += (1ull<<TS_NBITS_82580);
1680            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1681                                hdr->timestamp, MAXSKEW_82580)) {
1682                if (estimated_wraps == 0) {
1683                    /* 0 case Failed to match estimated_wraps+2 */
1684                    printf("WARNING - Hardware Timestamp failed to"
1685                                            " match using systemtime!\n");
1686                    hdr->timestamp = cur_sys_time_ns;
1687                } else {
1688                    /* Failed to match estimated_wraps+1 */
1689                    FORMAT(libtrace)->wrap_count++;
1690                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1691                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1692                                hdr->timestamp, MAXSKEW_82580)) {
1693                        /* Failed to match estimated_wraps+2 */
1694                        printf("WARNING - Hardware Timestamp failed to"
1695                                            " match using systemtime!!\n");
1696                    }
1697                }
1698            }
1699        }
1700    }
1701
1702    /* Log our previous for the next loop */
1703    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1704
1705#else
1706# if USE_CLOCK_GETTIME
1707    hdr->timestamp = TS_TO_NS(cur_sys_time);
1708# else
1709    hdr->timestamp = TV_TO_NS(cur_sys_time);
1710# endif
1711#endif
1712
1713    /* Intels samples prefetch into level 0 cache lets assume it is a good
1714     * idea and do the same */
1715    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1716    packet->buffer = pkt;
1717    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1718
1719    /* Set our capture length for the first time */
1720    hdr->cap_len = dpdk_get_wire_length(packet);
1721    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1722        hdr->cap_len -= ETHER_CRC_LEN;
1723    }
1724   
1725
1726    return dpdk_get_framing_length(packet) +
1727                        dpdk_get_capture_length(packet);
1728}
1729
1730
1731static void dpdk_fin_packet(libtrace_packet_t *packet)
1732{
1733        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1734                rte_pktmbuf_free(packet->buffer);
1735                packet->buffer = NULL;
1736        }
1737}
1738
1739static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1740    int nb_rx; /* Number of rx packets we've recevied */
1741    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1742
1743    /* Free the last packet buffer */
1744    if (packet->buffer != NULL) {
1745        /* Buffer is owned by DPDK */
1746        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1747            rte_pktmbuf_free(packet->buffer);
1748            packet->buffer = NULL;
1749        } else
1750        /* Buffer is owned by packet i.e. has been malloc'd */
1751        if (packet->buf_control == TRACE_CTRL_PACKET) {
1752            free(packet->buffer);
1753            packet->buffer = NULL;
1754        }
1755    }
1756   
1757    packet->buf_control = TRACE_CTRL_EXTERNAL;
1758    packet->type = TRACE_RT_DATA_DPDK;
1759   
1760    /* Wait for a packet */
1761    while (1) {
1762        /* Poll for a single packet */
1763        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1764                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1765        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1766            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1767        }
1768    }
1769   
1770    /* We'll never get here - but if we did it would be bad */
1771    return -1;
1772}
1773
1774static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
1775    int nb_rx; /* Number of rx packets we've recevied */
1776    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1777
1778    /* Free the last packet buffer */
1779    if (packet->buffer != NULL) {
1780        /* Buffer is owned by DPDK */
1781        if ( packet->buf_control == TRACE_CTRL_EXTERNAL) {
1782            rte_pktmbuf_free(packet->buffer);
1783            packet->buffer = NULL;
1784        } else
1785        /* Buffer is owned by packet i.e. has been malloc'd */
1786        if (packet->buf_control == TRACE_CTRL_PACKET) {
1787            free(packet->buffer);
1788            packet->buffer = NULL;
1789        }
1790    }
1791   
1792    packet->buf_control = TRACE_CTRL_EXTERNAL;
1793    packet->type = TRACE_RT_DATA_DPDK;
1794   
1795    /* Wait for a packet */
1796    while (1) {
1797        /* Poll for a single packet */
1798        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
1799                            PERPKT_FORMAT(t)->queue_id, pkts_burst, 1);
1800        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1801                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1802            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1803        }
1804        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
1805        if (libtrace_message_queue_count(&t->messages) > 0) {
1806                        printf("Extra message yay");
1807                        return -2;
1808                }
1809    }
1810   
1811    /* We'll never get here - but if we did it would be bad */
1812    return -1;
1813}
1814
1815static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1816    struct timeval tv;
1817    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1818   
1819    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1820    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1821    return tv;
1822}
1823
1824static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1825    struct timespec ts;
1826    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1827   
1828    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1829    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1830    return ts;
1831}
1832
1833static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1834    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1835}
1836
1837static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1838    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1839    return (libtrace_direction_t) hdr->direction;
1840}
1841
1842static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1843    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1844    hdr->direction = (uint8_t) direction;
1845    return (libtrace_direction_t) hdr->direction;
1846}
1847
1848/*
1849 * NOTE: Drops could occur for other reasons than running out of buffer
1850 * space. Such as failed MAC checksums and oversized packets.
1851 */
1852static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1853    struct rte_eth_stats stats = {0};
1854   
1855    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1856        return UINT64_MAX;
1857    /* Grab the current stats */
1858    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1859   
1860    /* Get the drop counter */
1861    return (uint64_t) stats.ierrors;
1862}
1863
1864static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1865    struct rte_eth_stats stats = {0};
1866   
1867    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1868        return UINT64_MAX;
1869    /* Grab the current stats */
1870    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1871   
1872    /* Get the drop counter */
1873    return (uint64_t) stats.ipackets;
1874}
1875
1876/*
1877 * This is the number of packets filtered by the NIC
1878 * and maybe ahead of number read using libtrace.
1879 *
1880 * XXX we are yet to implement any filtering, but if it was this should
1881 * get the result. So this will just return 0 for now.
1882 */
1883static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1884    struct rte_eth_stats stats = {0};
1885   
1886    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1887        return UINT64_MAX;
1888    /* Grab the current stats */
1889    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1890   
1891    /* Get the drop counter */
1892    return (uint64_t) stats.fdirmiss;
1893}
1894
1895/* Attempts to read a packet in a non-blocking fashion. If one is not
1896 * available a SLEEP event is returned. We do not have the ability to
1897 * create a select()able file descriptor in DPDK.
1898 */
1899static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1900                                        libtrace_packet_t *packet) {
1901    libtrace_eventobj_t event = {0,0,0.0,0};
1902    int nb_rx; /* Number of receive packets we've read */
1903    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1904   
1905    do {
1906   
1907        /* See if we already have a packet waiting */
1908        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1909                        FORMAT(trace)->queue_id, pkts_burst, 1);
1910       
1911        if (nb_rx > 0) {
1912            /* Free the last packet buffer */
1913            if (packet->buffer != NULL) {
1914                /* Buffer is owned by DPDK */
1915                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1916                    rte_pktmbuf_free(packet->buffer);
1917                    packet->buffer = NULL;
1918                } else
1919                /* Buffer is owned by packet i.e. has been malloc'd */
1920                if (packet->buf_control == TRACE_CTRL_PACKET) {
1921                    free(packet->buffer);
1922                    packet->buffer = NULL;
1923                }
1924            }
1925           
1926            packet->buf_control = TRACE_CTRL_EXTERNAL;
1927            packet->type = TRACE_RT_DATA_DPDK;
1928            event.type = TRACE_EVENT_PACKET;
1929            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1930           
1931            /* XXX - Check this passes the filter trace_read_packet normally
1932             * does this for us but this wont */
1933            if (trace->filter) {
1934                if (!trace_apply_filter(trace->filter, packet)) {
1935                    /* Failed the filter so we loop for another packet */
1936                    trace->filtered_packets ++;
1937                    continue;
1938                }
1939            }
1940            trace->accepted_packets ++;
1941        } else {
1942            /* We only want to sleep for a very short time - we are non-blocking */
1943            event.type = TRACE_EVENT_SLEEP;
1944            event.seconds = 0.0001;
1945            event.size = 0;
1946        }
1947       
1948        /* If we get here we have our event */
1949        break;
1950    } while (1);
1951
1952    return event;
1953}
1954
1955
1956static void dpdk_help(void) {
1957    printf("dpdk format module: $Revision: 1752 $\n");
1958    printf("Supported input URIs:\n");
1959    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1960    printf("\tThe -<coreid> is optional \n");
1961    printf("\t e.g. dpdk:0000:01:00.1\n");
1962    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1963    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1964    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1965    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1966    printf("\n");
1967    printf("Supported output URIs:\n");
1968    printf("\tSame format as the input URI.\n");
1969    printf("\t e.g. dpdk:0000:01:00.1\n");
1970    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1971    printf("\n");
1972}
1973
1974static struct libtrace_format_t dpdk = {
1975        "dpdk",
1976        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1977        TRACE_FORMAT_DPDK,
1978        NULL,                   /* probe filename */
1979        NULL,                               /* probe magic */
1980        dpdk_init_input,            /* init_input */
1981        dpdk_config_input,          /* config_input */
1982        dpdk_start_input,           /* start_input */
1983        dpdk_pause_input,           /* pause_input */
1984        dpdk_init_output,           /* init_output */
1985        NULL,                               /* config_output */
1986        dpdk_start_output,          /* start_ouput */
1987        dpdk_fin_input,             /* fin_input */
1988        dpdk_fin_output,        /* fin_output */
1989        dpdk_read_packet,           /* read_packet */
1990        dpdk_prepare_packet,    /* prepare_packet */
1991        dpdk_fin_packet,                                    /* fin_packet */
1992        dpdk_write_packet,          /* write_packet */
1993        dpdk_get_link_type,         /* get_link_type */
1994        dpdk_get_direction,         /* get_direction */
1995        dpdk_set_direction,         /* set_direction */
1996        NULL,                               /* get_erf_timestamp */
1997        dpdk_get_timeval,           /* get_timeval */
1998        dpdk_get_timespec,          /* get_timespec */
1999        NULL,                               /* get_seconds */
2000        NULL,                               /* seek_erf */
2001        NULL,                               /* seek_timeval */
2002        NULL,                               /* seek_seconds */
2003        dpdk_get_capture_length,/* get_capture_length */
2004        dpdk_get_wire_length,   /* get_wire_length */
2005        dpdk_get_framing_length,/* get_framing_length */
2006        dpdk_set_capture_length,/* set_capture_length */
2007        NULL,                               /* get_received_packets */
2008        dpdk_get_filtered_packets,/* get_filtered_packets */
2009        dpdk_get_dropped_packets,/* get_dropped_packets */
2010    dpdk_get_captured_packets,/* get_captured_packets */
2011        NULL,                       /* get_fd */
2012        dpdk_trace_event,               /* trace_event */
2013    dpdk_help,              /* help */
2014    NULL,                   /* next pointer */
2015    {true, 8},              /* Live, NICs typically have 8 threads */
2016    dpdk_pstart_input, /* pstart_input */
2017        dpdk_pread_packet, /* pread_packet */
2018        dpdk_pause_input, /* ppause */
2019        dpdk_fin_input, /* p_fin */
2020        dpdk_pconfig_input, /* pconfig_input */
2021    dpdk_pregister_thread, /* pregister_thread */
2022    dpdk_punregister_thread /* unpregister_thread */
2023};
2024
2025void dpdk_constructor(void) {
2026        register_format(&dpdk);
2027}
Note: See TracBrowser for help on using the repository browser.