Changeset d7fd648 for lib/format_dpdk.c


Ignore:
Timestamp:
12/19/14 15:47:34 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
1960910
Parents:
6e41e73
Message:

Improves speed of the DPDK format(and parallel libtrace) and fixes some DPDK bugs

Fixes bug with PCI address being parsed as a decimal instead of hex.
Fixes bug so DPDK Breaks out of loop when libtrace_halt is called

For performance

  • Rearranges the header format used to simplify code which might show a small speed up
  • Batching packets is now supported thoughout the parallel framework
  • DPDK now always reads packets in bursts if possible, including in single threaded mode
  • Calls to retrive system time only needs to occur once for a batch of packets
  • The CPU core used to run DPDK is/are now picked based upon the NUMA node the NIC is attached to
  • A delay has been added to reduce the memory load of polling after unsuccessful attempts this tends to improve performance
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdk.c

    r59e8400 rd7fd648  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 
     4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
    7  * Author: Richard Sanger 
    8  *         
     7 * Author: Richard Sanger
     8 *
    99 * All rights reserved.
    1010 *
    11  * This code has been developed by the University of Waikato WAND 
     11 * This code has been developed by the University of Waikato WAND
    1212 * research group. For further information please see http://www.wand.net.nz/
    1313 *
     
    3535 * Intel Data Plane Development Kit is a LIVE capture format.
    3636 *
    37  * This format also supports writing which will write packets out to the 
    38  * network as a form of packet replay. This should not be confused with the 
    39  * RT protocol which is intended to transfer captured packet records between 
     37 * This format also supports writing which will write packets out to the
     38 * network as a form of packet replay. This should not be confused with the
     39 * RT protocol which is intended to transfer captured packet records between
    4040 * RT-speaking programs.
    4141 */
     
    6161#include <endian.h>
    6262#include <string.h>
     63
     64#if HAVE_LIBNUMA
     65#include <numa.h>
     66#endif
    6367
    6468/* We can deal with any minor differences by checking the RTE VERSION
     
    133137#include <rte_lcore.h>
    134138#include <rte_per_lcore.h>
     139#include <rte_cycles.h>
    135140#include <pthread.h>
    136141
    137 /* The default size of memory buffers to use - This is the max size of standard 
     142/* The default size of memory buffers to use - This is the max size of standard
    138143 * ethernet packet less the size of the MAC CHECKSUM */
    139144#define RX_MBUF_SIZE 1514
    140145
    141 /* The minimum number of memory buffers per queue tx or rx. Search for 
     146/* The minimum number of memory buffers per queue tx or rx. Search for
    142147 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
    143148 */
     
    157162#define NB_TX_MBUF 1024
    158163
    159 /* The size of the PCI blacklist needs to be big enough to contain 
     164/* The size of the PCI blacklist needs to be big enough to contain
    160165 * every PCI device address (listed by lspci every bus:device.function tuple).
    161166 */
     
    164169/* The maximum number of characters the mempool name can be */
    165170#define MEMPOOL_NAME_LEN 20
     171
     172/* For single threaded libtrace we read packets as a batch/burst
     173 * this is the maximum size of said burst */
     174#define BURST_SIZE 50
    166175
    167176#define MBUF(x) ((struct rte_mbuf *) x)
     
    172181
    173182#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    174                         (uint64_t) tv.tv_usec*1000ull)
     183                        (uint64_t) tv.tv_usec*1000ull)
    175184#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
    176                         (uint64_t) ts.tv_nsec)
     185                        (uint64_t) ts.tv_nsec)
    177186
    178187#if RTE_PKTMBUF_HEADROOM != 128
    179188#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
    180         "any libtrace instance processing these packet must be have the" \
    181         "same RTE_PKTMBUF_HEADROOM set"
     189        "any libtrace instance processing these packet must be have the" \
     190        "same RTE_PKTMBUF_HEADROOM set"
    182191#endif
    183192
    184193/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    185  * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 
    186  * 
     194 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
     195 *
    187196 * Make sure you understand what these are doing before enabling them.
    188197 * They might make traces incompatable with other builds etc.
    189  * 
     198 *
    190199 * These are also included to show how to do somethings which aren't
    191200 * obvious in the DPDK documentation.
     
    195204#define DEBUG 1
    196205
    197 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 
    198  * only turn on if you know clock_gettime is a vsyscall on your system 
     206/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     207 * only turn on if you know clock_gettime is a vsyscall on your system
    199208 * overwise could be a large overhead. Again gettimeofday() should be
    200209 * vsyscall also if it's not you should seriously consider updating your
     
    203212#ifdef HAVE_LIBRT
    204213/* You can turn this on (set to 1) to prefer clock_gettime */
    205 #define USE_CLOCK_GETTIME 0
     214#define USE_CLOCK_GETTIME 1
    206215#else
    207216/* DONT CHANGE THIS !!! */
    208 #define USE_CLOCK_GETTIME 0
     217#define USE_CLOCK_GETTIME 1
    209218#endif
    210219
     
    214223 * hence writing out a port such as int: ring: and dpdk: assumes there
    215224 * is no checksum and will attempt to write the checksum as part of the
    216  * packet 
     225 * packet
    217226 */
    218227#define GET_MAC_CRC_CHECKSUM 0
    219228
    220229/* This requires a modification of the pmd drivers (inside Intel DPDK)
     230 * TODO this requires updating (packet sizes are wrong TS most likely also)
    221231 */
    222232#define HAS_HW_TIMESTAMPS_82580 0
     
    244254struct dpdk_per_lcore_t
    245255{
    246         // TODO move time stamp stuff here
    247         uint16_t queue_id;
    248         uint8_t port;
     256        uint16_t queue_id;
     257        uint8_t port;
     258        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
     259#if HAS_HW_TIMESTAMPS_82580
     260        /* Timestamping only relevent to RX */
     261        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
     262        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
     263#endif
    249264};
    250265
     
    255270    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
    256271    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
    257     uint8_t paused; /* See paused_state */ 
     272    uint8_t paused; /* See paused_state */
    258273    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
     274    uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
    259275    int snaplen; /* The snap length for the capture - RX only */
    260276    /* We always have to setup both rx and tx queues even if we don't want them */
    261277    int nb_rx_buf; /* The number of packet buffers in the rx ring */
    262278    int nb_tx_buf; /* The number of packet buffers in the tx ring */
     279    int nic_numa_node; /* The NUMA node that the NIC is attached to */
    263280    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
    264281#if DPDK_USE_BLACKLIST
     
    268285    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    269286    uint8_t rss_key[40]; // This is the RSS KEY
    270 #if HAS_HW_TIMESTAMPS_82580
    271     /* Timestamping only relevent to RX */
    272     uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
    273     uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
    274     uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    275 #endif
     287    /* To improve performance we always batch reading packets, in a burst */
     288    struct rte_mbuf* burst_pkts[BURST_SIZE];
     289    int burst_size; /* The total number read in the burst */
     290    int burst_offset; /* The offset we are into the burst */
    276291        // DPDK normally seems to have a limit of 8 queues for a given card
    277292        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
     
    283298};
    284299
    285 /** 
     300/**
    286301 * A structure placed in front of the packet where we can store
    287302 * additional information about the given packet.
     
    289304 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
    290305 * +--------------------------+
    291  * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
    292  * +--------------------------+
    293306 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
    294307 * +--------------------------+
    295  * |   sizeof(dpdk_addt_hdr)  | 1 byte
    296  * +--------------------------+ 
     308 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
     309 * +--------------------------+
    297310 * *   hw_timestamp_82580     * 16 bytes Optional
    298311 * +--------------------------+
     
    312325 * We want to blacklist all devices except those on the whitelist
    313326 * (I say list, but yes it is only the one).
    314  * 
     327 *
    315328 * The default behaviour of rte_pci_probe() will map every possible device
    316329 * to its DPDK driver. The DPDK driver will take the ethernet device
    317330 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
    318  * 
    319  * So blacklist all devices except the one that we wish to use so that 
     331 *
     332 * So blacklist all devices except the one that we wish to use so that
    320333 * the others can still be used as standard ethernet ports.
    321334 *
     
    331344
    332345        TAILQ_FOREACH(dev, &device_list, next) {
    333         if (whitelist != NULL && whitelist->domain == dev->addr.domain
    334             && whitelist->bus == dev->addr.bus
    335             && whitelist->devid == dev->addr.devid
    336             && whitelist->function == dev->addr.function)
    337             continue;
     346        if (whitelist != NULL && whitelist->domain == dev->addr.domain
     347            && whitelist->bus == dev->addr.bus
     348            && whitelist->devid == dev->addr.devid
     349            && whitelist->function == dev->addr.function)
     350            continue;
    338351                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
    339                                 / sizeof (format_data->blacklist[0])) {
     352                                / sizeof (format_data->blacklist[0])) {
    340353                        printf("Warning: too many devices to blacklist consider"
    341                                         " increasing BLACK_LIST_SIZE");
     354                                        " increasing BLACK_LIST_SIZE");
    342355                        break;
    343356                }
     
    355368        char pci_str[20] = {0};
    356369        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
    357                 whitelist->domain,
    358                 whitelist->bus,
    359                 whitelist->devid,
    360                 whitelist->function);
     370                whitelist->domain,
     371                whitelist->bus,
     372                whitelist->devid,
     373                whitelist->function);
    361374        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
    362375                return -1;
     
    370383 * Fills in addr, note core is optional and is unchanged if
    371384 * a value for it is not provided.
    372  * 
     385 *
    373386 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
    374  * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 
     387 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
    375388 */
    376389static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
    377390    int matches;
    378391    assert(str);
    379     matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%d", &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
     392    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld", &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
    380393    if (matches >= 4) {
    381         return 0;
     394        return 0;
    382395    } else {
    383         return -1;
    384     }
     396        return -1;
     397    }
     398}
     399
     400/**
     401 * Convert a pci address to the numa node it is
     402 * connected to.
     403 *
     404 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
     405 * so we can call it before DPDK
     406 *
     407 * @return -1 if unknown otherwise a number 0 or higher of the numa node
     408 */
     409static int pci_to_numa(struct rte_pci_addr * dev_addr) {
     410        char path[50] = {0};
     411        FILE *file;
     412
     413        /* Read from the system */
     414        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
     415                 dev_addr->domain,
     416                 dev_addr->bus,
     417                 dev_addr->devid,
     418                 dev_addr->function);
     419
     420        if((file = fopen(path, "r")) != NULL) {
     421                int numa_node = -1;
     422                fscanf(file, "%d", &numa_node);
     423                fclose(file);
     424                return numa_node;
     425        }
     426        return -1;
    385427}
    386428
     
    391433    struct rte_config * global_config;
    392434    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    393    
     435
    394436    if (nb_cpu <= 0) {
    395         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    396         nb_cpu = 1; /* fallback to just 1 core */
     437        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
     438        nb_cpu = 1; /* fallback to just 1 core */
    397439    }
    398440    if (nb_cpu > RTE_MAX_LCORE)
    399         nb_cpu = RTE_MAX_LCORE;
    400    
     441        nb_cpu = RTE_MAX_LCORE;
     442
    401443    global_config = rte_eal_get_configuration();
    402    
     444
    403445    if (global_config != NULL) {
    404         int i;
    405         fprintf(stderr, "Intel DPDK setup\n"
    406                "---Version      : %s\n"
    407                "---Master LCore : %"PRIu32"\n"
    408                "---LCore Count  : %"PRIu32"\n",
    409                rte_version(),
    410                global_config->master_lcore, global_config->lcore_count);
    411        
    412         for (i = 0 ; i < nb_cpu; i++) {
    413             fprintf(stderr, "   ---Core %d : %s\n", i,
    414                    global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
    415         }
    416        
    417         const char * proc_type;
    418         switch (global_config->process_type) {
    419             case RTE_PROC_AUTO:
    420                 proc_type = "auto";
    421                 break;
    422             case RTE_PROC_PRIMARY:
    423                 proc_type = "primary";
    424                 break;
    425             case RTE_PROC_SECONDARY:
    426                 proc_type = "secondary";
    427                 break;
    428             case RTE_PROC_INVALID:
    429                 proc_type = "invalid";
    430                 break;
    431             default:
    432                 proc_type = "something worse than invalid!!";
    433         }
    434         fprintf(stderr, "---Process Type : %s\n", proc_type);
    435     }
    436    
     446        int i;
     447        fprintf(stderr, "Intel DPDK setup\n"
     448               "---Version      : %s\n"
     449               "---Master LCore : %"PRIu32"\n"
     450               "---LCore Count  : %"PRIu32"\n",
     451               rte_version(),
     452               global_config->master_lcore, global_config->lcore_count);
     453
     454        for (i = 0 ; i < nb_cpu; i++) {
     455            fprintf(stderr, "   ---Core %d : %s\n", i,
     456                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
     457        }
     458
     459        const char * proc_type;
     460        switch (global_config->process_type) {
     461            case RTE_PROC_AUTO:
     462                proc_type = "auto";
     463                break;
     464            case RTE_PROC_PRIMARY:
     465                proc_type = "primary";
     466                break;
     467            case RTE_PROC_SECONDARY:
     468                proc_type = "secondary";
     469                break;
     470            case RTE_PROC_INVALID:
     471                proc_type = "invalid";
     472                break;
     473            default:
     474                proc_type = "something worse than invalid!!";
     475        }
     476        fprintf(stderr, "---Process Type : %s\n", proc_type);
     477    }
     478
    437479}
    438480#endif
     
    454496
    455497    if (core == rte_lcore_id())
    456         return 0;
     498        return 0;
    457499
    458500    // Make sure we are not overwriting someone else
     
    470512
    471513    if (lcore_config[core].detected) {
    472         CPU_SET(core, &cpuset);
     514        CPU_SET(core, &cpuset);
    473515    } else {
    474         for (i = 0; i < RTE_MAX_LCORE; ++i) {
    475             if (lcore_config[i].detected)
    476                 CPU_SET(i, &cpuset);
    477         }
     516        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     517            if (lcore_config[i].detected)
     518                CPU_SET(i, &cpuset);
     519        }
    478520    }
    479521
    480522    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
    481523    if (i != 0) {
    482         // TODO proper libtrace style error here!!
    483         fprintf(stderr, "pthread_setaffinity_np failed\n");
    484         return -1;
     524        // TODO proper libtrace style error here!!
     525        fprintf(stderr, "pthread_setaffinity_np failed\n");
     526        return -1;
    485527    }
    486528    return 0;
     
    517559
    518560static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
    519                                         char * err, int errlen) {
     561                                        char * err, int errlen) {
    520562    int ret; /* Returned error codes */
    521     struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
     563    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
    522564    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
    523565    char mem_map[20] = {0}; /* The memory name */
     
    527569    struct rte_config *cfg = rte_eal_get_configuration();
    528570        struct saved_getopts save_opts;
    529    
     571
    530572#if DEBUG
    531573    rte_set_log_level(RTE_LOG_DEBUG);
    532 #else 
     574#else
    533575    rte_set_log_level(RTE_LOG_WARNING);
    534576#endif
    535577    /*
    536578     * Using unique file prefixes mean separate memory is used, unlinking
    537      * the two processes. However be careful we still cannot access a 
     579     * the two processes. However be careful we still cannot access a
    538580     * port that already in use.
    539      * 
     581     *
    540582     * Using unique file prefixes mean separate memory is used, unlinking
    541      * the two processes. However be careful we still cannot access a 
     583     * the two processes. However be careful we still cannot access a
    542584     * port that already in use.
    543585     */
    544586    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
    545                 "--file-prefix", mem_map, "-m", "256", NULL};
     587                "--file-prefix", mem_map, "-m", "980", NULL};
    546588    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    547    
     589
    548590    /* This initialises the Environment Abstraction Layer (EAL)
    549591     * If we had slave workers these are put into WAITING state
    550      * 
     592     *
    551593     * Basically binds this thread to a fixed core, which we choose as
    552594     * the last core on the machine (assuming fewer interrupts mapped here).
     
    559601     */
    560602
    561     /* Get the number of cpu cores in the system and use the last core */
     603    /* Get the number of cpu cores in the system and use the last core
     604     * on the correct numa node */
    562605    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    563606    if (nb_cpu <= 0) {
    564         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    565         nb_cpu = 1; /* fallback to the first core */
     607        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
     608        nb_cpu = 1; /* fallback to the first core */
    566609    }
    567610    if (nb_cpu > RTE_MAX_LCORE)
    568         nb_cpu = RTE_MAX_LCORE;
    569 
    570     my_cpu = nb_cpu;
    571     /* This allows the user to specify the core - we would try to do this 
     611        nb_cpu = RTE_MAX_LCORE;
     612
     613    my_cpu = -1;
     614    /* This allows the user to specify the core - we would try to do this
    572615     * automatically but it's hard to tell that this is secondary
    573      * before running rte_eal_init(...). Currently we are limited to 1 
     616     * before running rte_eal_init(...). Currently we are limited to 1
    574617     * instance per core due to the way memory is allocated. */
    575618    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
    576         snprintf(err, errlen, "Failed to parse URI");
    577         return -1;
    578     }
     619        snprintf(err, errlen, "Failed to parse URI");
     620        return -1;
     621    }
     622
     623#if HAVE_LIBNUMA
     624        format_data->nic_numa_node = pci_to_numa(&use_addr);
     625        if (my_cpu < 0) {
     626                /* If we can assign to a core on the same numa node */
     627                printf("Using pci card on numa_node%d\n", format_data->nic_numa_node);
     628                if(format_data->nic_numa_node >= 0) {
     629                        int max_node_cpu = -1;
     630                        struct bitmask *mask = numa_allocate_cpumask();
     631                        assert(mask);
     632                        numa_node_to_cpus(format_data->nic_numa_node, mask);
     633                        for (i = 0 ; i < nb_cpu; ++i) {
     634                                if (numa_bitmask_isbitset(mask,i))
     635                                        max_node_cpu = i+1;
     636                        }
     637                        my_cpu = max_node_cpu;
     638                }
     639        }
     640#endif
     641        if (my_cpu < 0) {
     642                my_cpu = nb_cpu;
     643        }
     644
    579645
    580646    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
    581                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
     647                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
    582648
    583649    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
    584         snprintf(err, errlen,
    585           "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
    586           " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
    587         return -1;
     650        snprintf(err, errlen,
     651          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
     652          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
     653        return -1;
    588654    }
    589655
     
    597663        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
    598664                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    599                         " are you sure the address is correct?: %s", strerror(-ret));
     665                        " are you sure the address is correct?: %s", strerror(-ret));
    600666                return -1;
    601667        }
     
    604670        /* Give the memory map a unique name */
    605671        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
    606     /* rte_eal_init it makes a call to getopt so we need to reset the 
     672    /* rte_eal_init it makes a call to getopt so we need to reset the
    607673     * global optind variable of getopt otherwise this fails */
    608674        save_getopts(&save_opts);
    609675    optind = 1;
    610676    if ((ret = rte_eal_init(argc, argv)) < 0) {
    611         snprintf(err, errlen,
    612           "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
    613         return -1;
     677        snprintf(err, errlen,
     678          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
     679        return -1;
    614680    }
    615681        restore_getopts(&save_opts);
     
    617683    // should remove this XXX in the future
    618684    for(i = 0; i < RTE_MAX_LCORE; ++i) {
    619         if (rte_lcore_is_enabled(i) && i != rte_get_master_lcore()) {
    620             cfg->lcore_role[i] = ROLE_OFF;
    621             cfg->lcore_count--;
    622         }
     685            if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
     686            cfg->lcore_role[i] = ROLE_OFF;
     687            cfg->lcore_count--;
     688        }
    623689    }
    624690    // Only the master should be running
     
    636702     */
    637703    if ((ret = rte_pmd_init_all()) < 0) {
    638         snprintf(err, errlen,
    639           "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
    640         return -1;
     704        snprintf(err, errlen,
     705          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
     706        return -1;
    641707    }
    642708#endif
     
    646712        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
    647713                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    648                         " are you sure the address is correct?: %s", strerror(-ret));
     714                        " are you sure the address is correct?: %s", strerror(-ret));
    649715                return -1;
    650716        }
     
    653719    /* This loads DPDK drivers against all ports that are not blacklisted */
    654720        if ((ret = rte_eal_pci_probe()) < 0) {
    655         snprintf(err, errlen,
    656             "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
    657         return -1;
     721        snprintf(err, errlen,
     722            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
     723        return -1;
    658724    }
    659725
     
    661727
    662728    if (format_data->nb_ports != 1) {
    663         snprintf(err, errlen,
    664             "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
    665             format_data->nb_ports);
    666         return -1;
    667     }
    668    
     729        snprintf(err, errlen,
     730            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
     731            format_data->nb_ports);
     732        return -1;
     733    }
     734
    669735    struct rte_eth_dev_info dev_info;
    670736    rte_eth_dev_info_get(0, &dev_info);
    671     printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d", 
     737    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
    672738                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
    673739
     
    678744    char err[500];
    679745    err[0] = 0;
    680    
     746
    681747    libtrace->format_data = (struct dpdk_format_data_t *)
    682                             malloc(sizeof(struct dpdk_format_data_t));
     748                            malloc(sizeof(struct dpdk_format_data_t));
    683749    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    684750    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
     
    687753    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
    688754    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
     755    FORMAT(libtrace)->nic_numa_node = -1;
    689756    FORMAT(libtrace)->promisc = -1;
    690757    FORMAT(libtrace)->pktmbuf_pool = NULL;
     
    694761    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    695762    FORMAT(libtrace)->mempool_name[0] = 0;
    696 #if HAS_HW_TIMESTAMPS_82580
    697     FORMAT(libtrace)->ts_first_sys = 0;
    698     FORMAT(libtrace)->ts_last_sys = 0;
    699     FORMAT(libtrace)->wrap_count = 0;
    700 #endif
    701        
     763    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     764    FORMAT(libtrace)->burst_size = 0;
     765    FORMAT(libtrace)->burst_offset = 0;
     766
    702767    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    703         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    704         free(libtrace->format_data);
    705         libtrace->format_data = NULL;
    706         return -1;
     768        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     769        free(libtrace->format_data);
     770        libtrace->format_data = NULL;
     771        return -1;
    707772    }
    708773    return 0;
     
    713778    char err[500];
    714779    err[0] = 0;
    715    
     780
    716781    libtrace->format_data = (struct dpdk_format_data_t *)
    717                             malloc(sizeof(struct dpdk_format_data_t));
     782                            malloc(sizeof(struct dpdk_format_data_t));
    718783    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    719784    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
     
    722787    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
    723788    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
     789    FORMAT(libtrace)->nic_numa_node = -1;
    724790    FORMAT(libtrace)->promisc = -1;
    725791    FORMAT(libtrace)->pktmbuf_pool = NULL;
     
    729795    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    730796    FORMAT(libtrace)->mempool_name[0] = 0;
    731 #if HAS_HW_TIMESTAMPS_82580
    732     FORMAT(libtrace)->ts_first_sys = 0;
    733     FORMAT(libtrace)->ts_last_sys = 0;
    734     FORMAT(libtrace)->wrap_count = 0;
    735 #endif
     797    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     798    FORMAT(libtrace)->burst_size = 0;
     799    FORMAT(libtrace)->burst_offset = 0;
    736800
    737801    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    738         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    739         free(libtrace->format_data);
    740         libtrace->format_data = NULL;
    741         return -1;
     802        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     803        free(libtrace->format_data);
     804        libtrace->format_data = NULL;
     805        return -1;
    742806    }
    743807    return 0;
     
    745809
    746810static int dpdk_pconfig_input (libtrace_t *libtrace,
    747                                 trace_parallel_option_t option,
    748                                 void *data) {
     811                                trace_parallel_option_t option,
     812                                void *data) {
    749813        switch (option) {
    750814                case TRACE_OPTION_SET_HASHER:
     
    763827                                        return -1;
    764828                        }
    765         break;
     829        break;
    766830        }
    767831        return -1;
     
    770834 * Note here snaplen excludes the MAC checksum. Packets over
    771835 * the requested snaplen will be dropped. (Excluding MAC checksum)
    772  * 
     836 *
    773837 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
    774838 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
    775839 * is set the maximum size of the returned packet would be 1518 otherwise
    776840 * 1514 would be the largest size possibly returned.
    777  * 
     841 *
    778842 */
    779843static int dpdk_config_input (libtrace_t *libtrace,
    780                                         trace_option_t option,
    781                                         void *data) {
     844                                        trace_option_t option,
     845                                        void *data) {
    782846    switch (option) {
    783         case TRACE_OPTION_SNAPLEN:
    784             /* Only support changing snaplen before a call to start is
    785              * made */
    786             if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
    787                 FORMAT(libtrace)->snaplen=*(int*)data;
    788             else
    789                 return -1;
    790             return 0;
     847        case TRACE_OPTION_SNAPLEN:
     848            /* Only support changing snaplen before a call to start is
     849             * made */
     850            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
     851                FORMAT(libtrace)->snaplen=*(int*)data;
     852            else
     853                return -1;
     854            return 0;
    791855                case TRACE_OPTION_PROMISC:
    792856                        FORMAT(libtrace)->promisc=*(int*)data;
    793             return 0;
    794         case TRACE_OPTION_FILTER:
    795             /* TODO filtering */
    796             break;
    797         case TRACE_OPTION_META_FREQ:
    798             break;
    799         case TRACE_OPTION_EVENT_REALTIME:
    800             break;
    801         /* Avoid default: so that future options will cause a warning
    802         * here to remind us to implement it, or flag it as
    803         * unimplementable
    804         */
     857            return 0;
     858        case TRACE_OPTION_FILTER:
     859            /* TODO filtering */
     860            break;
     861        case TRACE_OPTION_META_FREQ:
     862            break;
     863        case TRACE_OPTION_EVENT_REALTIME:
     864            break;
     865        /* Avoid default: so that future options will cause a warning
     866        * here to remind us to implement it, or flag it as
     867        * unimplementable
     868        */
    805869    }
    806870
     
    812876/* Can set jumbo frames/ or limit the size of a frame by setting both
    813877 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
    814  * 
     878 *
    815879 */
    816880static struct rte_eth_conf port_conf = {
     
    822886                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
    823887                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
    824         .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
     888                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
    825889#if GET_MAC_CRC_CHECKSUM
    826890/* So it appears that if hw_strip_crc is turned off the driver will still
     
    835899 * always cut off the checksum in the future
    836900 */
    837         .hw_strip_crc   = 1, /**< CRC stripped by hardware */
     901                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
    838902#endif
    839903        },
     
    861925static const struct rte_eth_txconf tx_conf = {
    862926        .tx_thresh = {
    863         /**
    864         * TX_PTHRESH prefetch
    865         * Set on the NIC, if the number of unprocessed descriptors to queued on
    866         * the card fall below this try grab at least hthresh more unprocessed
    867         * descriptors.
    868         */
     927        /**
     928        * TX_PTHRESH prefetch
     929        * Set on the NIC, if the number of unprocessed descriptors to queued on
     930        * the card fall below this try grab at least hthresh more unprocessed
     931        * descriptors.
     932        */
    869933                .pthresh = 36,
    870934
    871         /* TX_HTHRESH host
    872         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
    873         */
     935        /* TX_HTHRESH host
     936        * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
     937        */
    874938                .hthresh = 0,
    875        
    876         /* TX_WTHRESH writeback
    877         * Set on the NIC, the number of sent descriptors before writing back
    878         * status to confirm the transmission. This is done more efficiently as
    879         * a bulk DMA-transfer rather than writing one at a time.
    880         * Similar to tx_free_thresh however this is applied to the NIC, where
    881         * as tx_free_thresh is when DPDK will check these. This is extended
    882         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
    883         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
    884         */
     939
     940        /* TX_WTHRESH writeback
     941        * Set on the NIC, the number of sent descriptors before writing back
     942        * status to confirm the transmission. This is done more efficiently as
     943        * a bulk DMA-transfer rather than writing one at a time.
     944        * Similar to tx_free_thresh however this is applied to the NIC, where
     945        * as tx_free_thresh is when DPDK will check these. This is extended
     946        * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
     947        * descriptors rather only every n'th item, reducing DMA memory bandwidth.
     948        */
    885949                .wthresh = 4,
    886950        },
     
    893957
    894958    /* This is the Report Status threshold, used by 10Gbit cards,
    895      * This signals the card to only write back status (such as 
     959     * This signals the card to only write back status (such as
    896960     * transmission successful) after this minimum number of transmit
    897961     * descriptors are seen. The default is 32 (if set to 0) however if set
     
    907971    int ret; /* Check return values for errors */
    908972    struct rte_eth_link link_info; /* Wait for link */
    909    
     973    unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id());
     974
    910975    /* Already started */
    911976    if (format_data->paused == DPDK_RUNNING)
    912         return 0;
    913 
    914     /* First time started we need to alloc our memory, doing this here 
     977        return 0;
     978
     979    /* First time started we need to alloc our memory, doing this here
    915980     * rather than in environment setup because we don't have snaplen then */
    916981    if (format_data->paused == DPDK_NEVER_STARTED) {
    917         if (format_data->snaplen == 0) {
    918             format_data->snaplen = RX_MBUF_SIZE;
    919             port_conf.rxmode.jumbo_frame = 0;
    920             port_conf.rxmode.max_rx_pkt_len = 0;
    921         } else {
    922             /* Use jumbo frames */
    923             port_conf.rxmode.jumbo_frame = 1;
    924             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    925         }
    926 
    927         /* This is additional overhead so make sure we allow space for this */
     982        if (format_data->snaplen == 0) {
     983            format_data->snaplen = RX_MBUF_SIZE;
     984            port_conf.rxmode.jumbo_frame = 0;
     985            port_conf.rxmode.max_rx_pkt_len = 0;
     986        } else {
     987            /* Use jumbo frames */
     988            port_conf.rxmode.jumbo_frame = 1;
     989            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     990        }
     991
     992        /* This is additional overhead so make sure we allow space for this */
    928993#if GET_MAC_CRC_CHECKSUM
    929         format_data->snaplen += ETHER_CRC_LEN;
     994        format_data->snaplen += ETHER_CRC_LEN;
    930995#endif
    931996#if HAS_HW_TIMESTAMPS_82580
    932         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    933 #endif
    934 
    935         /* Create the mbuf pool, which is the place our packets are allocated
    936          * from - TODO figure out if there is is a free function (I cannot see one)
    937          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    938          * allocate however that extra 1 packet is not used.
    939         * (I assume <= vs < error some where in DPDK code)
    940          * TX requires nb_tx_buffers + 1 in the case the queue is full
    941         * so that will fill the new buffer and wait until slots in the
    942         * ring become available.
    943         */
     997        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     998#endif
     999
     1000        /* Create the mbuf pool, which is the place our packets are allocated
     1001         * from - TODO figure out if there is is a free function (I cannot see one)
     1002         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1003         * allocate however that extra 1 packet is not used.
     1004        * (I assume <= vs < error some where in DPDK code)
     1005         * TX requires nb_tx_buffers + 1 in the case the queue is full
     1006        * so that will fill the new buffer and wait until slots in the
     1007        * ring become available.
     1008        */
    9441009#if DEBUG
    9451010    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    9461011#endif
    947         format_data->pktmbuf_pool =
    948             rte_mempool_create(format_data->mempool_name,
    949                        format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
    950                        format_data->snaplen + sizeof(struct rte_mbuf)
    951                                         + RTE_PKTMBUF_HEADROOM,
    952                        8, sizeof(struct rte_pktmbuf_pool_private),
    953                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    954                        0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
    955 
    956         if (format_data->pktmbuf_pool == NULL) {
    957             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
    958                         "pool failed: %s", strerror(rte_errno));
    959             return -1;
    960         }
    961     }
    962    
     1012    format_data->pktmbuf_pool =
     1013            rte_mempool_create(format_data->mempool_name,
     1014                       (format_data->nb_rx_buf + format_data->nb_tx_buf + 1),
     1015                       format_data->snaplen + sizeof(struct rte_mbuf)
     1016                                        + RTE_PKTMBUF_HEADROOM,
     1017                       128, sizeof(struct rte_pktmbuf_pool_private),
     1018                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     1019                       cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/);
     1020
     1021    if (format_data->pktmbuf_pool == NULL) {
     1022            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"
     1023                        "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node);
     1024            return -1;
     1025        }
     1026    }
     1027
    9631028    /* ----------- Now do the setup for the port mapping ------------ */
    964     /* Order of calls must be 
     1029    /* Order of calls must be
    9651030     * rte_eth_dev_configure()
    9661031     * rte_eth_tx_queue_setup()
     
    9691034     * other rte_eth calls
    9701035     */
    971    
    972    
     1036
     1037
    9731038    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
    974    
     1039
    9751040    /* This must be called first before another *eth* function
    9761041     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    9771042    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
    9781043    if (ret < 0) {
    979         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    980                             " %"PRIu8" : %s", format_data->port,
    981                             strerror(-ret));
    982         return -1;
     1044        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1045                            " %"PRIu8" : %s", format_data->port,
     1046                            strerror(-ret));
     1047        return -1;
    9831048    }
    9841049    /* Initialise the TX queue a minimum value if using this port for
     
    9861051     */
    9871052    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    988                         format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
     1053                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
    9891054    if (ret < 0) {
    990         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    991                             " %"PRIu8" : %s", format_data->port,
    992                             strerror(-ret));
    993         return -1;
     1055        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     1056                            " %"PRIu8" : %s", format_data->port,
     1057                            strerror(-ret));
     1058        return -1;
    9941059    }
    9951060    /* Initialise the RX queue with some packets from memory */
    9961061    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
    997                             format_data->nb_rx_buf, SOCKET_ID_ANY,
    998                             &rx_conf, format_data->pktmbuf_pool);
     1062                                 format_data->nb_rx_buf, cpu_numa_node,
     1063                                 &rx_conf, format_data->pktmbuf_pool);
    9991064    if (ret < 0) {
    1000         snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
    1001                     " %"PRIu8" : %s", format_data->port,
    1002                     strerror(-ret));
    1003         return -1;
    1004     }
    1005    
     1065        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     1066                    " %"PRIu8" : %s", format_data->port,
     1067                    strerror(-ret));
     1068        return -1;
     1069    }
     1070
    10061071    /* Start device */
    10071072    ret = rte_eth_dev_start(format_data->port);
    10081073    if (ret < 0) {
    1009         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    1010                     strerror(-ret));
    1011         return -1;
     1074        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1075                    strerror(-ret));
     1076        return -1;
    10121077    }
    10131078
    10141079    /* Default promiscuous to on */
    10151080    if (format_data->promisc == -1)
    1016         format_data->promisc = 1;
    1017    
     1081        format_data->promisc = 1;
     1082
    10181083    if (format_data->promisc == 1)
    1019         rte_eth_promiscuous_enable(format_data->port);
     1084        rte_eth_promiscuous_enable(format_data->port);
    10201085    else
    1021         rte_eth_promiscuous_disable(format_data->port);
    1022    
     1086        rte_eth_promiscuous_disable(format_data->port);
     1087
    10231088    /* Wait for the link to come up */
    10241089    rte_eth_link_get(format_data->port, &link_info);
     1090    format_data->link_speed = link_info.link_speed;
    10251091#if DEBUG
    10261092    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    1027             (int) link_info.link_duplex, (int) link_info.link_speed);
    1028 #endif
    1029 
     1093            (int) link_info.link_duplex, (int) link_info.link_speed);
     1094#endif
    10301095    /* We have now successfully started/unpaused */
    10311096    format_data->paused = DPDK_RUNNING;
    1032    
     1097
    10331098    return 0;
    10341099}
     
    10361101/* Attach memory to the port and start (or restart) the port/s.
    10371102 */
    1038 static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues){
     1103static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) {
    10391104    int ret, i; /* Check return values for errors */
    10401105    struct rte_eth_link link_info; /* Wait for link */
    1041    
     1106
    10421107    /* Already started */
    10431108    if (format_data->paused == DPDK_RUNNING)
    1044         return 0;
    1045 
    1046     /* First time started we need to alloc our memory, doing this here 
     1109        return 0;
     1110
     1111    /* First time started we need to alloc our memory, doing this here
    10471112     * rather than in environment setup because we don't have snaplen then */
    10481113    if (format_data->paused == DPDK_NEVER_STARTED) {
    1049         if (format_data->snaplen == 0) {
    1050             format_data->snaplen = RX_MBUF_SIZE;
    1051             port_conf.rxmode.jumbo_frame = 0;
    1052             port_conf.rxmode.max_rx_pkt_len = 0;
    1053         } else {
    1054             /* Use jumbo frames */
    1055             port_conf.rxmode.jumbo_frame = 1;
    1056             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    1057         }
    1058 
    1059         /* This is additional overhead so make sure we allow space for this */
     1114        if (format_data->snaplen == 0) {
     1115            format_data->snaplen = RX_MBUF_SIZE;
     1116            port_conf.rxmode.jumbo_frame = 0;
     1117            port_conf.rxmode.max_rx_pkt_len = 0;
     1118        } else {
     1119            /* Use jumbo frames */
     1120            port_conf.rxmode.jumbo_frame = 1;
     1121            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1122        }
     1123
     1124        /* This is additional overhead so make sure we allow space for this */
    10601125#if GET_MAC_CRC_CHECKSUM
    1061         format_data->snaplen += ETHER_CRC_LEN;
     1126        format_data->snaplen += ETHER_CRC_LEN;
    10621127#endif
    10631128#if HAS_HW_TIMESTAMPS_82580
    1064         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    1065 #endif
    1066 
    1067         /* Create the mbuf pool, which is the place our packets are allocated
    1068          * from - TODO figure out if there is is a free function (I cannot see one)
    1069          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    1070          * allocate however that extra 1 packet is not used.
    1071         * (I assume <= vs < error some where in DPDK code)
    1072          * TX requires nb_tx_buffers + 1 in the case the queue is full
    1073         * so that will fill the new buffer and wait until slots in the
    1074         * ring become available.
    1075         */
     1129        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1130#endif
     1131
     1132        /* Create the mbuf pool, which is the place our packets are allocated
     1133         * from - TODO figure out if there is a free function (I cannot see one)
     1134         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1135         * allocate however that extra 1 packet is not used.
     1136        * (I assume <= vs < error some where in DPDK code)
     1137         * TX requires nb_tx_buffers + 1 in the case the queue is full
     1138        * so that will fill the new buffer and wait until slots in the
     1139        * ring become available.
     1140        */
    10761141#if DEBUG
    10771142    printf("Creating mempool named %s\n", format_data->mempool_name);
    10781143#endif
    1079         format_data->pktmbuf_pool =
    1080             rte_mempool_create(format_data->mempool_name,
    1081                        format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1,
    1082                        format_data->snaplen + sizeof(struct rte_mbuf)
    1083                                         + RTE_PKTMBUF_HEADROOM,
    1084                        8, sizeof(struct rte_pktmbuf_pool_private),
    1085                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    1086                        0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
    1087 
    1088         if (format_data->pktmbuf_pool == NULL) {
    1089             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
    1090                         "pool failed: %s", strerror(rte_errno));
    1091             return -1;
    1092         }
    1093     }
    1094    
     1144    format_data->pktmbuf_pool =
     1145            rte_mempool_create(format_data->mempool_name,
     1146                       (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
     1147                       format_data->snaplen + sizeof(struct rte_mbuf)
     1148                                        + RTE_PKTMBUF_HEADROOM,
     1149                       128, sizeof(struct rte_pktmbuf_pool_private),
     1150                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     1151                       format_data->nic_numa_node, 0);
     1152
     1153        if (format_data->pktmbuf_pool == NULL) {
     1154            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1155                        "pool failed: %s", strerror(rte_errno));
     1156            return -1;
     1157        }
     1158    }
     1159
    10951160    /* ----------- Now do the setup for the port mapping ------------ */
    1096     /* Order of calls must be 
     1161    /* Order of calls must be
    10971162     * rte_eth_dev_configure()
    10981163     * rte_eth_tx_queue_setup()
     
    11011166     * other rte_eth calls
    11021167     */
    1103    
     1168
    11041169    /* This must be called first before another *eth* function
    11051170     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    11061171    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
    11071172    if (ret < 0) {
    1108         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    1109                             " %"PRIu8" : %s", format_data->port,
    1110                             strerror(-ret));
    1111         return -1;
     1173        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1174                            " %"PRIu8" : %s", format_data->port,
     1175                            strerror(-ret));
     1176        return -1;
    11121177    }
    11131178#if DEBUG
     
    11181183     */
    11191184    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    1120                         format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
     1185                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
    11211186    if (ret < 0) {
    1122         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    1123                             " %"PRIu8" : %s", format_data->port,
    1124                             strerror(-ret));
    1125         return -1;
    1126     }
    1127    
     1187        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     1188                            " %"PRIu8" : %s", format_data->port,
     1189                            strerror(-ret));
     1190        return -1;
     1191    }
     1192
    11281193    for (i=0; i < rx_queues; i++) {
    11291194#if DEBUG
    11301195    printf("Doing queue configure\n");
    1131 #endif 
     1196#endif
     1197
    11321198                /* Initialise the RX queue with some packets from memory */
    11331199                ret = rte_eth_rx_queue_setup(format_data->port, i,
    1134                                                                 format_data->nb_rx_buf, SOCKET_ID_ANY,
    1135                                                                 &rx_conf, format_data->pktmbuf_pool);
    1136         /* Init per_thread data structures */
    1137         format_data->per_lcore[i].port = format_data->port;
    1138         format_data->per_lcore[i].queue_id = i;
     1200                                             format_data->nb_rx_buf, format_data->nic_numa_node,
     1201                                             &rx_conf, format_data->pktmbuf_pool);
     1202        /* Init per_thread data structures */
     1203        format_data->per_lcore[i].port = format_data->port;
     1204        format_data->per_lcore[i].queue_id = i;
    11391205
    11401206                if (ret < 0) {
     
    11451211                }
    11461212        }
    1147    
     1213
    11481214#if DEBUG
    11491215    fprintf(stderr, "Doing start device\n");
    1150 #endif 
     1216#endif
    11511217    /* Start device */
    11521218    ret = rte_eth_dev_start(format_data->port);
    11531219#if DEBUG
    11541220    fprintf(stderr, "Done start device\n");
    1155 #endif 
     1221#endif
    11561222    if (ret < 0) {
    1157         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    1158                     strerror(-ret));
    1159         return -1;
     1223        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1224                    strerror(-ret));
     1225        return -1;
    11601226    }
    11611227
     
    11631229    /* Default promiscuous to on */
    11641230    if (format_data->promisc == -1)
    1165         format_data->promisc = 1;
    1166    
     1231        format_data->promisc = 1;
     1232
    11671233    if (format_data->promisc == 1)
    1168         rte_eth_promiscuous_enable(format_data->port);
     1234        rte_eth_promiscuous_enable(format_data->port);
    11691235    else
    1170         rte_eth_promiscuous_disable(format_data->port);
    1171    
    1172    
     1236        rte_eth_promiscuous_disable(format_data->port);
     1237
     1238
    11731239    /* We have now successfully started/unpased */
    11741240    format_data->paused = DPDK_RUNNING;
    1175    
     1241
    11761242    // Can use remote launch for all
    11771243    /*RTE_LCORE_FOREACH_SLAVE(i) {
    11781244                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
    11791245        }*/
    1180    
     1246
    11811247    /* Wait for the link to come up */
    11821248    rte_eth_link_get(format_data->port, &link_info);
     1249    format_data->link_speed = link_info.link_speed;
    11831250#if DEBUG
    11841251    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    1185             (int) link_info.link_duplex, (int) link_info.link_speed);
     1252            (int) link_info.link_duplex, (int) link_info.link_speed);
     1253        struct rte_eth_rss_reta reta_conf = {0};
     1254        reta_conf.mask_lo = ~reta_conf.mask_lo;
     1255        reta_conf.mask_hi = ~reta_conf.mask_hi;
     1256        int qew = rte_eth_dev_rss_reta_query(format_data->port, &reta_conf);
     1257        fprintf(stderr, "err=%d", qew);
     1258        for (i = 0; i < ETH_RSS_RETA_NUM_ENTRIES; i++) {
     1259                fprintf(stderr, "[%d] = %d\n", i, (int)reta_conf.reta[i]);
     1260        }
     1261
    11861262#endif
    11871263
     
    11941270
    11951271    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    1196         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    1197         free(libtrace->format_data);
    1198         libtrace->format_data = NULL;
    1199         return -1;
     1272        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1273        free(libtrace->format_data);
     1274        libtrace->format_data = NULL;
     1275        return -1;
    12001276    }
    12011277    return 0;
     
    12111287    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    12121288    if (nb_cpu <= 0)
    1213         return 1;
     1289        return 1;
    12141290    else
    1215         return (size_t) nb_cpu;
     1291        return (size_t) nb_cpu;
    12161292}
    12171293
     
    12231299
    12241300    if (rte_lcore_id() != rte_get_master_lcore())
    1225         fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
     1301        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
    12261302
    12271303    // If the master is not on the last thread we move it there
    12281304    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
    1229         // Consider error handling here
    1230         dpdk_move_master_lcore(RTE_MAX_LCORE - 1) == -1;
     1305        // Consider error handling here
     1306        dpdk_move_master_lcore(RTE_MAX_LCORE - 1);
    12311307    }
    12321308
     
    12341310    // We don't have to force this but performance wont be good if we don't
    12351311    for (i = 0; i < RTE_MAX_LCORE; ++i) {
    1236         if (lcore_config[i].detected) {
    1237             if (rte_lcore_is_enabled(i))
    1238                 fprintf(stderr, "Found core %d already in use!\n", i);
    1239             else
    1240                 phys_cores++;
    1241         }
     1312        if (lcore_config[i].detected) {
     1313            if (rte_lcore_is_enabled(i))
     1314                fprintf(stderr, "Found core %d already in use!\n", i);
     1315            else
     1316                phys_cores++;
     1317        }
    12421318    }
    12431319
    12441320        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
    1245     tot = MIN(tot, phys_cores);
     1321        tot = MIN(tot, phys_cores);
    12461322
    12471323        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
    1248        
    1249     if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
    1250         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    1251         free(libtrace->format_data);
    1252         libtrace->format_data = NULL;
    1253         return -1;
     1324
     1325    if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1326        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1327        free(libtrace->format_data);
     1328        libtrace->format_data = NULL;
     1329        return -1;
    12541330    }
    12551331
     
    12681344 * We then allow a mapper thread to be started on every real core as DPDK would
    12691345 * we also bind these to the corresponding CPU cores.
    1270  * 
     1346 *
    12711347 * @param libtrace A pointer to the trace
    12721348 * @param reading True if the thread will be used to read packets, i.e. will
     
    12851361    // these to any particular physical core
    12861362    if (reading) {
    1287         for (i = 0; i < RTE_MAX_LCORE; ++i) {
    1288             if (!rte_lcore_is_enabled(i)) {
    1289                 new_id = i;
    1290                 if (!lcore_config[i].detected)
    1291                     fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
    1292                 break;
    1293             }
    1294         }
     1363#if HAVE_LIBNUMA
     1364        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1365                if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
     1366                                new_id = i;
     1367                        if (!lcore_config[i].detected)
     1368                                new_id = -1;
     1369                        break;
     1370                }
     1371        }
     1372#endif
     1373        /* Retry without the the numa restriction */
     1374        if (new_id == -1) {
     1375                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1376                                if (!rte_lcore_is_enabled(i)) {
     1377                                        new_id = i;
     1378                                if (!lcore_config[i].detected)
     1379                                        fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
     1380                                break;
     1381                        }
     1382                }
     1383        }
    12951384    } else {
    1296         for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
    1297             if (!rte_lcore_is_enabled(i)) {
    1298                 new_id = i;
    1299                 break;
    1300             }
    1301         }
     1385        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1386            if (!rte_lcore_is_enabled(i)) {
     1387                new_id = i;
     1388                break;
     1389            }
     1390        }
    13021391    }
    13031392
    13041393    if (new_id == -1) {
    1305         assert(cfg->lcore_count == RTE_MAX_LCORE);
    1306         // TODO proper libtrace style error here!!
    1307         fprintf(stderr, "Too many threads for DPDK!!\n");
    1308         return -1;
     1394        assert(cfg->lcore_count == RTE_MAX_LCORE);
     1395        // TODO proper libtrace style error here!!
     1396        fprintf(stderr, "Too many threads for DPDK!!\n");
     1397        return -1;
    13091398    }
    13101399
     
    13161405    fprintf(stderr, "original id%d", rte_lcore_id());
    13171406    RTE_PER_LCORE(_lcore_id) = new_id;
    1318     fprintf(stderr, " new id%d\n", rte_lcore_id());
     1407        char name[99];
     1408        pthread_getname_np(pthread_self(),
     1409                              name, sizeof(name));
     1410
     1411    fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
    13191412
    13201413    if (reading) {
    1321         // Set affinity bind to corresponding core
    1322         cpu_set_t cpuset;
    1323         CPU_ZERO(&cpuset);
    1324         CPU_SET(rte_lcore_id(), &cpuset);
    1325         i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
    1326         if (i != 0) {
    1327             fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
    1328             return -1;
    1329         }
     1414        // Set affinity bind to corresponding core
     1415        cpu_set_t cpuset;
     1416        CPU_ZERO(&cpuset);
     1417        CPU_SET(rte_lcore_id(), &cpuset);
     1418        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1419        if (i != 0) {
     1420            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
     1421            return -1;
     1422        }
    13301423    }
    13311424
    13321425    // Map our TLS to the thread data
    13331426    if (reading) {
    1334         if(t->type == THREAD_PERPKT) {
    1335             t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
    1336         } else {
    1337             t->format_data = &FORMAT(libtrace)->per_lcore[0];
    1338         }
    1339     }
     1427        if(t->type == THREAD_PERPKT) {
     1428            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
     1429        } else {
     1430            t->format_data = &FORMAT(libtrace)->per_lcore[0];
     1431        }
     1432    }
     1433    return 0;
    13401434}
    13411435
     
    13431437/**
    13441438 * Unregister a thread with the DPDK system.
    1345  * 
     1439 *
    13461440 * Only previously registered threads should be calling this just before
    13471441 * they are destroyed.
    13481442 */
    1349 static int dpdk_punregister_thread(libtrace_t libtrace, libtrace_thread_t *t UNUSED)
     1443static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
    13501444{
    13511445    struct rte_config *cfg = rte_eal_get_configuration();
    13521446
    1353     assert(rte_lcore_id() >= 0 && rte_lcore_id() < RTE_MAX_LCORE);
     1447    assert(rte_lcore_id() < RTE_MAX_LCORE);
    13541448
    13551449    // Skip if master!!
    13561450    if (rte_lcore_id() == rte_get_master_lcore()) {
    1357         fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
    1358         return 0;
     1451        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1452        return;
    13591453    }
    13601454
     
    13641458    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
    13651459    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
    1366     return 0;
     1460    return;
    13671461}
    13681462
     
    13711465    char err[500];
    13721466    err[0] = 0;
    1373    
     1467
    13741468    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    1375         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    1376         free(libtrace->format_data);
    1377         libtrace->format_data = NULL;
    1378         return -1;
     1469        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1470        free(libtrace->format_data);
     1471        libtrace->format_data = NULL;
     1472        return -1;
    13791473    }
    13801474    return 0;
    13811475}
    13821476
    1383 static int dpdk_pause_input(libtrace_t * libtrace){
     1477static int dpdk_pause_input(libtrace_t * libtrace) {
    13841478    /* This stops the device, but can be restarted using rte_eth_dev_start() */
    13851479    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
    1386 #if DEBUG     
    1387         fprintf(stderr, "Pausing port\n");
    1388 #endif
    1389         rte_eth_dev_stop(FORMAT(libtrace)->port);
    1390         FORMAT(libtrace)->paused = DPDK_PAUSED;
    1391         /* If we pause it the driver will be reset and likely our counter */
     1480#if DEBUG
     1481        fprintf(stderr, "Pausing DPDK port\n");
     1482#endif
     1483        rte_eth_dev_stop(FORMAT(libtrace)->port);
     1484        FORMAT(libtrace)->paused = DPDK_PAUSED;
     1485        /* Empty the queue of packets */
     1486        for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
     1487                rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
     1488        }
     1489        FORMAT(libtrace)->burst_offset = 0;
     1490        FORMAT(libtrace)->burst_size = 0;
     1491        /* If we pause it the driver will be reset and likely our counter */
     1492
     1493        FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0;
    13921494#if HAS_HW_TIMESTAMPS_82580
    1393         FORMAT(libtrace)->ts_first_sys = 0;
    1394         FORMAT(libtrace)->ts_last_sys = 0;
     1495        FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0;
    13951496#endif
    13961497    }
     
    13981499}
    13991500
    1400 static int dpdk_write_packet(libtrace_out_t *trace, 
     1501static int dpdk_write_packet(libtrace_out_t *trace,
    14011502                libtrace_packet_t *packet){
    14021503    struct rte_mbuf* m_buff[1];
    1403    
     1504
    14041505    int wirelen = trace_get_wire_length(packet);
    14051506    int caplen = trace_get_capture_length(packet);
    1406    
     1507
    14071508    /* Check for a checksum and remove it */
    14081509    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
    1409                                             wirelen == caplen)
    1410         caplen -= ETHER_CRC_LEN;
     1510                                            wirelen == caplen)
     1511        caplen -= ETHER_CRC_LEN;
    14111512
    14121513    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
    14131514    if (m_buff[0] == NULL) {
    1414         trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
    1415         return -1;
     1515        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
     1516        return -1;
    14161517    } else {
    1417         int ret;
    1418         memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
    1419         do {
    1420             ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
    1421         } while (ret != 1);
     1518        int ret;
     1519        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
     1520        do {
     1521            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
     1522        } while (ret != 1);
    14221523    }
    14231524
     
    14281529    /* Free our memory structures */
    14291530    if (libtrace->format_data != NULL) {
    1430         /* Close the device completely, device cannot be restarted */
    1431         if (FORMAT(libtrace)->port != 0xFF)
    1432             rte_eth_dev_close(FORMAT(libtrace)->port);
    1433         /* filter here if we used it */
     1531        /* Close the device completely, device cannot be restarted */
     1532        if (FORMAT(libtrace)->port != 0xFF)
     1533            rte_eth_dev_close(FORMAT(libtrace)->port);
     1534        /* filter here if we used it */
    14341535                free(libtrace->format_data);
    14351536        }
     
    14451546    /* Free our memory structures */
    14461547    if (libtrace->format_data != NULL) {
    1447         /* Close the device completely, device cannot be restarted */
    1448         if (FORMAT(libtrace)->port != 0xFF)
    1449             rte_eth_dev_close(FORMAT(libtrace)->port);
    1450         /* filter here if we used it */
     1548        /* Close the device completely, device cannot be restarted */
     1549        if (FORMAT(libtrace)->port != 0xFF)
     1550            rte_eth_dev_close(FORMAT(libtrace)->port);
     1551        /* filter here if we used it */
    14511552                free(libtrace->format_data);
    14521553        }
     
    14581559}
    14591560
    1460 /** 
    1461  * Get the start of additional header that we added to a packet.
     1561/**
     1562 * Get the start of the additional header that we added to a packet.
    14621563 */
    14631564static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
    1464     uint8_t *hdrsize;
    14651565    assert(packet);
    14661566    assert(packet->buffer);
    1467     hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
    1468     /* The byte before the original packet data denotes the size in bytes
    1469      * of our additional header that we added sits before the 'size byte' */
    1470     hdrsize--;
    1471     return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
     1567    /* Our header sits straight after the mbuf header */
     1568    return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
    14721569}
    14731570
     
    14801577    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    14811578    if (size > hdr->cap_len) {
    1482         /* Cannot make a packet bigger */
     1579        /* Cannot make a packet bigger */
    14831580                return trace_get_capture_length(packet);
    14841581        }
     
    14941591    int org_cap_size; /* The original capture size */
    14951592    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
    1496         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1497                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
    1498                             sizeof(struct hw_timestamp_82580);
     1593        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
     1594                            sizeof(struct hw_timestamp_82580);
    14991595    } else {
    1500         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1501                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
     1596        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
    15021597    }
    15031598    if (hdr->flags & INCLUDES_CHECKSUM) {
    1504         return org_cap_size;
     1599        return org_cap_size;
    15051600    } else {
    1506         /* DPDK packets are always TRACE_TYPE_ETH packets */
    1507         return org_cap_size + ETHER_CRC_LEN;
     1601        /* DPDK packets are always TRACE_TYPE_ETH packets */
     1602        return org_cap_size + ETHER_CRC_LEN;
    15081603    }
    15091604}
     
    15111606    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    15121607    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
    1513         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
    1514                 sizeof(struct hw_timestamp_82580);
     1608        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
     1609                sizeof(struct hw_timestamp_82580);
    15151610    else
    1516         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1611        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
    15171612}
    15181613
     
    15221617    assert(packet);
    15231618    if (packet->buffer != buffer &&
    1524         packet->buf_control == TRACE_CTRL_PACKET) {
    1525         free(packet->buffer);
     1619        packet->buf_control == TRACE_CTRL_PACKET) {
     1620        free(packet->buffer);
    15261621    }
    15271622
    15281623    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    1529         packet->buf_control = TRACE_CTRL_PACKET;
     1624        packet->buf_control = TRACE_CTRL_PACKET;
    15301625    } else
    1531         packet->buf_control = TRACE_CTRL_EXTERNAL;
     1626        packet->buf_control = TRACE_CTRL_EXTERNAL;
    15321627
    15331628    packet->buffer = buffer;
     
    15401635}
    15411636
     1637
     1638/**
     1639 * Given a packet size and a link speed, computes the
     1640 * time to transmit in nanoseconds.
     1641 *
     1642 * @param format_data The dpdk format data from which we get the link speed
     1643 *        and if unset updates it in a thread safe manner
     1644 * @param pkt_size The size of the packet in bytes
     1645 * @return The wire time in nanoseconds
     1646 */
     1647static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
     1648        uint32_t wire_time;
     1649        /* 20 extra bytes of interframe gap and preamble */
     1650# if GET_MAC_CRC_CHECKSUM
     1651        wire_time = ((pkt_size + 20) * 8000);
     1652# else
     1653        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
     1654# endif
     1655
     1656        /* Division is really slow and introduces a pipeline stall
     1657         * The compiler will optimise this into magical multiplication and shifting
     1658         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
     1659         */
     1660retry_calc_wiretime:
     1661        switch (format_data->link_speed) {
     1662        case ETH_LINK_SPEED_40G:
     1663                wire_time /=  ETH_LINK_SPEED_40G;
     1664                break;
     1665        case ETH_LINK_SPEED_20G:
     1666                wire_time /= ETH_LINK_SPEED_20G;
     1667                break;
     1668        case ETH_LINK_SPEED_10G:
     1669                wire_time /= ETH_LINK_SPEED_10G;
     1670                break;
     1671        case ETH_LINK_SPEED_1000:
     1672                wire_time /= ETH_LINK_SPEED_1000;
     1673                break;
     1674        case 0:
     1675                {
     1676                /* Maybe the link was down originally, but now it should be up */
     1677                struct rte_eth_link link = {0};
     1678                rte_eth_link_get_nowait(format_data->port, &link);
     1679                if (link.link_status && link.link_speed) {
     1680                        format_data->link_speed = link.link_speed;
     1681#ifdef DEBUG
     1682                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
     1683#endif
     1684                        goto retry_calc_wiretime;
     1685                }
     1686                /* We don't know the link speed, make sure numbers are counting up */
     1687                wire_time = 1;
     1688                break;
     1689                }
     1690        default:
     1691                wire_time /= format_data->link_speed;
     1692        }
     1693        return wire_time;
     1694}
     1695
     1696
     1697
    15421698/*
    1543  * Does any extra preperation to a captured packet.
    1544  * This includes adding our extra header to it with the timestamp
    1545  */
    1546 static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
    1547                                                         struct rte_mbuf* pkt){
    1548     uint8_t * hdr_size;
    1549     struct dpdk_addt_hdr *hdr;
     1699 * Does any extra preperation to all captured packets
     1700 * This includes adding our extra header to it with the timestamp,
     1701 * and any snapping
     1702 *
     1703 * @param format_data The DPDK format data
     1704 * @param plc The DPDK per lcore format data
     1705 * @param pkts An array of size nb_pkts of DPDK packets
     1706 * @param nb_pkts The number of packets in pkts and optionally packets
     1707 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
     1708 */
     1709static inline void dpdk_ready_pkts(struct dpdk_format_data_t *format_data, struct dpdk_per_lcore_t *plc,
     1710                                   struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
     1711        struct dpdk_addt_hdr *hdr;
     1712        size_t i;
     1713        uint64_t cur_sys_time_ns;
    15501714#if HAS_HW_TIMESTAMPS_82580
    1551     struct hw_timestamp_82580 *hw_ts;
    1552     struct timeval cur_sys_time;
    1553     uint64_t cur_sys_time_ns;
    1554     uint64_t estimated_wraps;
    1555    
    1556     /* Using gettimeofday because it's most likely to be a vsyscall
    1557      * We don't want to slow down anything with systemcalls we dont need
    1558      * accauracy */
    1559     gettimeofday(&cur_sys_time, NULL);
     1715        struct hw_timestamp_82580 *hw_ts;
     1716        uint64_t estimated_wraps;
    15601717#else
    1561 # if USE_CLOCK_GETTIME
    1562     struct timespec cur_sys_time;
    1563    
    1564     /* This looks terrible and I feel bad doing it. But it's OK
    1565      * on new kernels, because this is a vsyscall */
    1566     clock_gettime(CLOCK_REALTIME, &cur_sys_time);
    1567 # else
    1568     struct timeval cur_sys_time;
    1569     /* Should be a vsyscall */
    1570     gettimeofday(&cur_sys_time, NULL);
    1571 # endif
    1572 #endif
    1573 
    1574     /* Record the size of our header */
    1575     hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
    1576     *hdr_size = sizeof(struct dpdk_addt_hdr);
    1577     /* Now put our header in front of that size */
    1578     hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
    1579     memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
    1580    
     1718
     1719#endif
     1720
     1721#if USE_CLOCK_GETTIME
     1722        struct timespec cur_sys_time = {0};
     1723        /* This looks terrible and I feel bad doing it. But it's OK
     1724         * on new kernels, because this is a fast vsyscall */
     1725        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
     1726        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
     1727#else
     1728        struct timeval cur_sys_time = {0};
     1729        /* Also a fast vsyscall */
     1730        gettimeofday(&cur_sys_time, NULL);
     1731        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
     1732#endif
     1733
     1734        /* The system clock is not perfect so when running
     1735         * at linerate we could timestamp a packet in the past.
     1736         * To avoid this we munge the timestamp to appear 1ns
     1737         * after the previous packet. We should eventually catch up
     1738         * to system time since a 64byte packet on a 10G link takes 67ns.
     1739         *
     1740         * Note with parallel readers timestamping packets
     1741         * with duplicate stamps or out of order is unavoidable without
     1742         * hardware timestamping from the NIC.
     1743         */
     1744#if !HAS_HW_TIMESTAMPS_82580
     1745        if (plc->ts_last_sys >= cur_sys_time_ns) {
     1746                cur_sys_time_ns = plc->ts_last_sys + 1;
     1747        }
     1748#endif
     1749
     1750        assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); // TODO static compile time assert sometime??
     1751        for (i = 0 ; i < nb_pkts ; ++i) {
     1752
     1753                /* We put our header straight after the dpdk header */
     1754                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
     1755                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
     1756
    15811757#if GET_MAC_CRC_CHECKSUM
    1582     /* Add back in the CRC sum */
    1583     pkt->pkt.pkt_len += ETHER_CRC_LEN;
    1584     pkt->pkt.data_len += ETHER_CRC_LEN;
    1585     hdr->flags |= INCLUDES_CHECKSUM;
    1586 #endif
     1758                /* Add back in the CRC sum */
     1759                pkts[i]->pkt.pkt_len += ETHER_CRC_LEN;
     1760                pkts[i]->pkt.data_len += ETHER_CRC_LEN;
     1761                hdr->flags |= INCLUDES_CHECKSUM;
     1762#endif
     1763
     1764                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
    15871765
    15881766#if HAS_HW_TIMESTAMPS_82580
    1589     /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
    1590      *
    1591      *        +----------+---+   +--------------+
    1592      *  82580 |    24    | 8 |   |      32      |
    1593      *        +----------+---+   +--------------+
    1594      *          reserved  \______ 40 bits _____/
    1595      *
    1596      * The 40 bit 82580 SYSTIM overflows every
    1597      *   2^40 * 10^-9 /  60  = 18.3 minutes.
    1598      *
    1599      * NOTE picture is in Big Endian order, in memory it's acutally in Little
    1600      * Endian (for the full 64 bits) i.e. picture is mirrored
    1601      */
    1602    
    1603     /* The timestamp is sitting before our packet and is included in pkt_len */
    1604     hdr->flags |= INCLUDES_HW_TIMESTAMP;
    1605     hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
    1606    
    1607     /* Despite what the documentation says this is in Little
    1608      * Endian byteorder. Mask the reserved section out.
    1609      */
    1610     hdr->timestamp = le64toh(hw_ts->timestamp) &
    1611                 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
    1612                
    1613     cur_sys_time_ns = TV_TO_NS(cur_sys_time);
    1614     if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
    1615         FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
    1616         FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
    1617     }
    1618    
    1619     /* This will have serious problems if packets aren't read quickly
    1620      * that is within a couple of seconds because our clock cycles every
    1621      * 18 seconds */
    1622     estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
    1623                             / (1ull<<TS_NBITS_82580);
    1624    
    1625     /* Estimated_wraps gives the number of times the counter should have
    1626      * wrapped (however depending on value last time it could have wrapped
    1627      * twice more (if hw clock is close to its max value) or once less (allowing
    1628      * for a bit of variance between hw and sys clock). But if the clock
    1629      * shouldn't have wrapped once then don't allow it to go backwards in time */
    1630     if (unlikely(estimated_wraps >= 2)) {
    1631         /* 2 or more wrap arounds add all but the very last wrap */
    1632         FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
    1633     }
    1634    
    1635     /* Set the timestamp to the lowest possible value we're considering */
    1636     hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
    1637                         FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
    1638    
    1639     /* In most runs only the first if() will need evaluating - i.e our
    1640      * estimate is correct. */
    1641     if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
    1642                                 hdr->timestamp, MAXSKEW_82580))) {
    1643         /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
    1644         FORMAT(libtrace)->wrap_count++;
    1645         hdr->timestamp += (1ull<<TS_NBITS_82580);
    1646         if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1647                                 hdr->timestamp, MAXSKEW_82580)) {
    1648             /* Failed to match estimated_wraps */
    1649             FORMAT(libtrace)->wrap_count++;
    1650             hdr->timestamp += (1ull<<TS_NBITS_82580);
    1651             if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1652                                 hdr->timestamp, MAXSKEW_82580)) {
    1653                 if (estimated_wraps == 0) {
    1654                     /* 0 case Failed to match estimated_wraps+2 */
    1655                     printf("WARNING - Hardware Timestamp failed to"
    1656                                             " match using systemtime!\n");
    1657                     hdr->timestamp = cur_sys_time_ns;
    1658                 } else {
    1659                     /* Failed to match estimated_wraps+1 */
    1660                     FORMAT(libtrace)->wrap_count++;
    1661                     hdr->timestamp += (1ull<<TS_NBITS_82580);
    1662                     if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1663                                 hdr->timestamp, MAXSKEW_82580)) {
    1664                         /* Failed to match estimated_wraps+2 */
    1665                         printf("WARNING - Hardware Timestamp failed to"
    1666                                             " match using systemtime!!\n");
    1667                     }
    1668                 }
    1669             }
    1670         }
    1671     }
    1672 
    1673     /* Log our previous for the next loop */
    1674     FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
    1675 
     1767                /* The timestamp is sitting before our packet and is included in pkt_len */
     1768                hdr->flags |= INCLUDES_HW_TIMESTAMP;
     1769                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
     1770                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
     1771
     1772                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
     1773                 *
     1774                 *        +----------+---+   +--------------+
     1775                 *  82580 |    24    | 8 |   |      32      |
     1776                 *        +----------+---+   +--------------+
     1777                 *          reserved  \______ 40 bits _____/
     1778                 *
     1779                 * The 40 bit 82580 SYSTIM overflows every
     1780                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
     1781                 *
     1782                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
     1783                 * Endian (for the full 64 bits) i.e. picture is mirrored
     1784                 */
     1785
     1786                /* Despite what the documentation says this is in Little
     1787                 * Endian byteorder. Mask the reserved section out.
     1788                 */
     1789                hdr->timestamp = le64toh(hw_ts->timestamp) &
     1790                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
     1791
     1792                if (unlikely(plc->ts_first_sys == 0)) {
     1793                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
     1794                        plc->ts_last_sys = plc->ts_first_sys;
     1795                }
     1796
     1797                /* This will have serious problems if packets aren't read quickly
     1798                 * that is within a couple of seconds because our clock cycles every
     1799                 * 18 seconds */
     1800                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
     1801                                  / (1ull<<TS_NBITS_82580);
     1802
     1803                /* Estimated_wraps gives the number of times the counter should have
     1804                 * wrapped (however depending on value last time it could have wrapped
     1805                 * twice more (if hw clock is close to its max value) or once less (allowing
     1806                 * for a bit of variance between hw and sys clock). But if the clock
     1807                 * shouldn't have wrapped once then don't allow it to go backwards in time */
     1808                if (unlikely(estimated_wraps >= 2)) {
     1809                        /* 2 or more wrap arounds add all but the very last wrap */
     1810                        plc->wrap_count += estimated_wraps - 1;
     1811                }
     1812
     1813                /* Set the timestamp to the lowest possible value we're considering */
     1814                hdr->timestamp += plc->ts_first_sys +
     1815                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
     1816
     1817                /* In most runs only the first if() will need evaluating - i.e our
     1818                 * estimate is correct. */
     1819                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
     1820                                              hdr->timestamp, MAXSKEW_82580))) {
     1821                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
     1822                        plc->wrap_count++;
     1823                        hdr->timestamp += (1ull<<TS_NBITS_82580);
     1824                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1825                                             hdr->timestamp, MAXSKEW_82580)) {
     1826                                /* Failed to match estimated_wraps */
     1827                                plc->wrap_count++;
     1828                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     1829                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1830                                                     hdr->timestamp, MAXSKEW_82580)) {
     1831                                        if (estimated_wraps == 0) {
     1832                                                /* 0 case Failed to match estimated_wraps+2 */
     1833                                                printf("WARNING - Hardware Timestamp failed to"
     1834                                                       " match using systemtime!\n");
     1835                                                hdr->timestamp = cur_sys_time_ns;
     1836                                        } else {
     1837                                                /* Failed to match estimated_wraps+1 */
     1838                                                plc->wrap_count++;
     1839                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     1840                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1841                                                                     hdr->timestamp, MAXSKEW_82580)) {
     1842                                                        /* Failed to match estimated_wraps+2 */
     1843                                                        printf("WARNING - Hardware Timestamp failed to"
     1844                                                               " match using systemtime!!\n");
     1845                                                }
     1846                                        }
     1847                                }
     1848                        }
     1849                }
    16761850#else
    1677 # if USE_CLOCK_GETTIME
    1678     hdr->timestamp = TS_TO_NS(cur_sys_time);
    1679 # else
    1680     hdr->timestamp = TV_TO_NS(cur_sys_time);
    1681 # endif
    1682 #endif
    1683 
    1684     /* Intels samples prefetch into level 0 cache lets assume it is a good
    1685      * idea and do the same */
    1686     rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
    1687     packet->buffer = pkt;
    1688     dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
    1689 
    1690     /* Set our capture length for the first time */
    1691     hdr->cap_len = dpdk_get_wire_length(packet);
    1692     if (!(hdr->flags & INCLUDES_CHECKSUM)) {
    1693         hdr->cap_len -= ETHER_CRC_LEN;
    1694     }
    1695    
    1696 
    1697     return dpdk_get_framing_length(packet) +
    1698                         dpdk_get_capture_length(packet);
     1851
     1852                hdr->timestamp = cur_sys_time_ns;
     1853                /* Offset the next packet by the wire time of previous */
     1854                calculate_wire_time(format_data, hdr->cap_len);
     1855
     1856#endif
     1857                if(packets) {
     1858                        packets[i]->buffer = pkts[i];
     1859                        packets[i]->header = pkts[i];
     1860#if HAS_HW_TIMESTAMPS_82580
     1861                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
     1862                                              RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580);
     1863#else
     1864                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
     1865                                              RTE_PKTMBUF_HEADROOM;
     1866#endif
     1867                        packets[i]->error = 1;
     1868                }
     1869        }
     1870
     1871        plc->ts_last_sys = cur_sys_time_ns;
     1872
     1873        return;
    16991874}
    17001875
     
    17081883}
    17091884
     1885
    17101886static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    1711     int nb_rx; /* Number of rx packets we've recevied */
    1712     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
     1887    int nb_rx; /* Number of rx packets we've received */
    17131888
    17141889    /* Free the last packet buffer */
    17151890    if (packet->buffer != NULL) {
    1716         /* Buffer is owned by DPDK */
    1717         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1718             rte_pktmbuf_free(packet->buffer);
    1719             packet->buffer = NULL;
    1720         } else
    1721         /* Buffer is owned by packet i.e. has been malloc'd */
    1722         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1723             free(packet->buffer);
    1724             packet->buffer = NULL;
    1725         }
    1726     }
    1727    
     1891        /* Buffer is owned by DPDK */
     1892        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1893            rte_pktmbuf_free(packet->buffer);
     1894            packet->buffer = NULL;
     1895        } else
     1896        /* Buffer is owned by packet i.e. has been malloc'd */
     1897        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1898            free(packet->buffer);
     1899            packet->buffer = NULL;
     1900        }
     1901    }
     1902
    17281903    packet->buf_control = TRACE_CTRL_EXTERNAL;
    17291904    packet->type = TRACE_RT_DATA_DPDK;
    1730    
     1905
     1906    /* Check if we already have some packets buffered */
     1907    if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
     1908            packet->buffer =  FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
     1909            dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     1910            return 1; // TODO should be bytes read, which essentially useless anyway
     1911    }
    17311912    /* Wait for a packet */
    17321913    while (1) {
    1733         /* Poll for a single packet */
    1734         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    1735                             FORMAT(libtrace)->queue_id, pkts_burst, 1);       
    1736         if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    1737             return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    1738         }
    1739     }
    1740    
     1914        /* Poll for a single packet */
     1915        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
     1916                                 FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE);
     1917        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
     1918                FORMAT(libtrace)->burst_size = nb_rx;
     1919                FORMAT(libtrace)->burst_offset = 1;
     1920                dpdk_ready_pkts(FORMAT(libtrace), &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
     1921                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
     1922                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     1923                return 1; // TODO should be bytes read, which essentially useless anyway
     1924        }
     1925        if (libtrace_halt) {
     1926                return 0;
     1927        }
     1928        /* Wait a while, polling on memory degrades performance
     1929         * This relieves the pressure on memory allowing the NIC to DMA */
     1930        rte_delay_us(10);
     1931    }
     1932
    17411933    /* We'll never get here - but if we did it would be bad */
    17421934    return -1;
    17431935}
    17441936
    1745 static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
    1746     int nb_rx; /* Number of rx packets we've recevied */
    1747     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
    1748 
    1749     /* Free the last packet buffer */
    1750     if (packet->buffer != NULL) {
    1751         /* Buffer is owned by DPDK */
    1752         if ( packet->buf_control == TRACE_CTRL_EXTERNAL) {
    1753             rte_pktmbuf_free(packet->buffer);
    1754             packet->buffer = NULL;
    1755         } else
    1756         /* Buffer is owned by packet i.e. has been malloc'd */
    1757         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1758             free(packet->buffer);
    1759             packet->buffer = NULL;
    1760         }
    1761     }
    1762    
    1763     packet->buf_control = TRACE_CTRL_EXTERNAL;
    1764     packet->type = TRACE_RT_DATA_DPDK;
    1765    
     1937static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
     1938    size_t nb_rx; /* Number of rx packets we've recevied */
     1939    struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
     1940    size_t i;
     1941
     1942    for (i = 0 ; i < nb_packets ; ++i) {
     1943            /* Free the last packet buffer */
     1944            if (packets[i]->buffer != NULL) {
     1945                /* Buffer is owned by DPDK */
     1946                if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) {
     1947                    rte_pktmbuf_free(packets[i]->buffer);
     1948                    packets[i]->buffer = NULL;
     1949                } else
     1950                /* Buffer is owned by packet i.e. has been malloc'd */
     1951                if (packets[i]->buf_control == TRACE_CTRL_PACKET) {
     1952                    free(packets[i]->buffer);
     1953                    packets[i]->buffer = NULL;
     1954                }
     1955            }
     1956            packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
     1957            packets[i]->type = TRACE_RT_DATA_DPDK;
     1958    }
     1959
    17661960    /* Wait for a packet */
    17671961    while (1) {
    1768         /* Poll for a single packet */
    1769         nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
    1770                             PERPKT_FORMAT(t)->queue_id, pkts_burst, 1);
    1771         if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    1772                         //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
    1773             return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    1774         }
    1775         // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
    1776         if (libtrace_message_queue_count(&t->messages) > 0) {
    1777                         printf("Extra message yay");
    1778                         return -2;
    1779                 }
    1780     }
    1781    
     1962        /* Poll for a single packet */
     1963        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
     1964                            PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets);
     1965        if (nb_rx > 0) {
     1966                /* Got some packets - otherwise we keep spining */
     1967                //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     1968                dpdk_ready_pkts(FORMAT(libtrace), PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
     1969                return nb_rx;
     1970        }
     1971        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
     1972        if (libtrace_message_queue_count(&t->messages) > 0) {
     1973                printf("Extra message yay");
     1974                return -2;
     1975        }
     1976        if (libtrace_halt) {
     1977                return 0;
     1978        }
     1979        /* Wait a while, polling on memory degrades performance
     1980         * This relieves the pressure on memory allowing the NIC to DMA */
     1981        rte_delay_us(10);
     1982    }
     1983
    17821984    /* We'll never get here - but if we did it would be bad */
    17831985    return -1;
     
    17871989    struct timeval tv;
    17881990    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1789    
     1991
    17901992    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    17911993    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
     
    17961998    struct timespec ts;
    17971999    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1798    
     2000
    17992001    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    18002002    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
     
    18232025static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
    18242026    struct rte_eth_stats stats = {0};
    1825    
     2027
    18262028    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1827         return UINT64_MAX;
     2029        return UINT64_MAX;
    18282030    /* Grab the current stats */
    18292031    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1830    
     2032
    18312033    /* Get the drop counter */
    18322034    return (uint64_t) stats.ierrors;
     
    18352037static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
    18362038    struct rte_eth_stats stats = {0};
    1837    
     2039
    18382040    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1839         return UINT64_MAX;
     2041        return UINT64_MAX;
    18402042    /* Grab the current stats */
    18412043    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1842    
     2044
    18432045    /* Get the drop counter */
    18442046    return (uint64_t) stats.ipackets;
     
    18482050 * This is the number of packets filtered by the NIC
    18492051 * and maybe ahead of number read using libtrace.
    1850  * 
     2052 *
    18512053 * XXX we are yet to implement any filtering, but if it was this should
    18522054 * get the result. So this will just return 0 for now.
     
    18542056static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
    18552057    struct rte_eth_stats stats = {0};
    1856    
     2058
    18572059    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1858         return UINT64_MAX;
     2060        return UINT64_MAX;
    18592061    /* Grab the current stats */
    18602062    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1861    
     2063
    18622064    /* Get the drop counter */
    18632065    return (uint64_t) stats.fdirmiss;
     
    18692071 */
    18702072static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
    1871                                         libtrace_packet_t *packet) {
     2073                                        libtrace_packet_t *packet) {
    18722074    libtrace_eventobj_t event = {0,0,0.0,0};
    18732075    int nb_rx; /* Number of receive packets we've read */
    18742076    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
    1875    
     2077
    18762078    do {
    1877    
    1878         /* See if we already have a packet waiting */
    1879         nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
    1880                         FORMAT(trace)->queue_id, pkts_burst, 1);
    1881        
    1882         if (nb_rx > 0) {
    1883             /* Free the last packet buffer */
    1884             if (packet->buffer != NULL) {
    1885                 /* Buffer is owned by DPDK */
    1886                 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1887                     rte_pktmbuf_free(packet->buffer);
    1888                     packet->buffer = NULL;
    1889                 } else
    1890                 /* Buffer is owned by packet i.e. has been malloc'd */
    1891                 if (packet->buf_control == TRACE_CTRL_PACKET) {
    1892                     free(packet->buffer);
    1893                     packet->buffer = NULL;
    1894                 }
    1895             }
    1896            
    1897             packet->buf_control = TRACE_CTRL_EXTERNAL;
    1898             packet->type = TRACE_RT_DATA_DPDK;
    1899             event.type = TRACE_EVENT_PACKET;
    1900             event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
    1901            
    1902             /* XXX - Check this passes the filter trace_read_packet normally
    1903              * does this for us but this wont */
    1904             if (trace->filter) {
    1905                 if (!trace_apply_filter(trace->filter, packet)) {
    1906                     /* Failed the filter so we loop for another packet */
    1907                     trace->filtered_packets ++;
    1908                     continue;
    1909                 }
    1910             }
    1911             trace->accepted_packets ++;
    1912         } else {
    1913             /* We only want to sleep for a very short time - we are non-blocking */
    1914             event.type = TRACE_EVENT_SLEEP;
    1915             event.seconds = 0.0001;
    1916             event.size = 0;
    1917         }
    1918        
    1919         /* If we get here we have our event */
    1920         break;
     2079
     2080        /* See if we already have a packet waiting */
     2081        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
     2082                        FORMAT(trace)->queue_id, pkts_burst, 1);
     2083
     2084        if (nb_rx > 0) {
     2085            /* Free the last packet buffer */
     2086            if (packet->buffer != NULL) {
     2087                /* Buffer is owned by DPDK */
     2088                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     2089                    rte_pktmbuf_free(packet->buffer);
     2090                    packet->buffer = NULL;
     2091                } else
     2092                /* Buffer is owned by packet i.e. has been malloc'd */
     2093                if (packet->buf_control == TRACE_CTRL_PACKET) {
     2094                    free(packet->buffer);
     2095                    packet->buffer = NULL;
     2096                }
     2097            }
     2098
     2099            packet->buf_control = TRACE_CTRL_EXTERNAL;
     2100            packet->type = TRACE_RT_DATA_DPDK;
     2101            event.type = TRACE_EVENT_PACKET;
     2102            dpdk_ready_pkts(FORMAT(trace), &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
     2103            event.size = 1; // TODO should be bytes read, which essentially useless anyway
     2104
     2105            /* XXX - Check this passes the filter trace_read_packet normally
     2106             * does this for us but this wont */
     2107            if (trace->filter) {
     2108                if (!trace_apply_filter(trace->filter, packet)) {
     2109                    /* Failed the filter so we loop for another packet */
     2110                    trace->filtered_packets ++;
     2111                    continue;
     2112                }
     2113            }
     2114            trace->accepted_packets ++;
     2115        } else {
     2116            /* We only want to sleep for a very short time - we are non-blocking */
     2117            event.type = TRACE_EVENT_SLEEP;
     2118            event.seconds = 0.0001;
     2119            event.size = 0;
     2120        }
     2121
     2122        /* If we get here we have our event */
     2123        break;
    19212124    } while (1);
    19222125
     
    19862189    {true, 8},              /* Live, NICs typically have 8 threads */
    19872190    dpdk_pstart_input, /* pstart_input */
    1988         dpdk_pread_packet, /* pread_packet */
     2191        dpdk_pread_packets, /* pread_packets */
    19892192        dpdk_pause_input, /* ppause */
    19902193        dpdk_fin_input, /* p_fin */
Note: See TracChangeset for help on using the changeset viewer.