source: lib/format_dpdk.c @ 9e429e8

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 9e429e8 was 9e429e8, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Merge remote-tracking branch 'upsteam/master' into develop

Conflicts:

README
lib/format_dpdk.c
lib/format_linux.c
lib/trace.c

  • Property mode set to 100644
File size: 69.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#define _GNU_SOURCE
44
45#include "config.h"
46#include "libtrace.h"
47#include "libtrace_int.h"
48#include "format_helper.h"
49#include "libtrace_arphrd.h"
50#include "hash_toeplitz.h"
51
52#ifdef HAVE_INTTYPES_H
53#  include <inttypes.h>
54#else
55# error "Can't find inttypes.h"
56#endif
57
58#include <stdlib.h>
59#include <assert.h>
60#include <unistd.h>
61#include <endian.h>
62#include <string.h>
63
64/* We can deal with any minor differences by checking the RTE VERSION
65 * Typically DPDK backports some fixes (typically for building against
66 * newer kernels) to the older version of DPDK.
67 *
68 * These get released with the rX suffix. The following macros where added
69 * in these new releases.
70 *
71 * Below this is a log of version that required changes to the libtrace
72 * code (that we still attempt to support).
73 *
74 * Currently 1.5 to 1.7 is supported.
75 */
76#include <rte_eal.h>
77#include <rte_version.h>
78#ifndef RTE_VERSION_NUM
79#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
80#endif
81#ifndef RTE_VER_PATCH_RELEASE
82#       define RTE_VER_PATCH_RELEASE 0
83#endif
84#ifndef RTE_VERSION
85#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
86        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
87#endif
88
89/* 1.6.0r2 :
90 *      rte_eal_pci_set_blacklist() is removed
91 *      device_list is renamed ot pci_device_list
92 *
93 * Replaced by:
94 *      rte_devargs (we can simply whitelist)
95 */
96#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
97#       define DPDK_USE_BLACKLIST 1
98#else
99#       define DPDK_USE_BLACKLIST 0
100#endif
101
102/*
103 * 1.7.0 :
104 *      rte_pmd_init_all is removed
105 *
106 * Replaced by:
107 *      Nothing, no longer needed
108 */
109#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
110#       define DPDK_USE_PMD_INIT 1
111#else
112#       define DPDK_USE_PMD_INIT 0
113#endif
114
115#include <rte_per_lcore.h>
116#include <rte_debug.h>
117#include <rte_errno.h>
118#include <rte_common.h>
119#include <rte_log.h>
120#include <rte_memcpy.h>
121#include <rte_prefetch.h>
122#include <rte_branch_prediction.h>
123#include <rte_pci.h>
124#include <rte_ether.h>
125#include <rte_ethdev.h>
126#include <rte_ring.h>
127#include <rte_mempool.h>
128#include <rte_mbuf.h>
129#include <rte_launch.h>
130#include <rte_lcore.h>
131#include <rte_per_lcore.h>
132#include <pthread.h>
133
134/* The default size of memory buffers to use - This is the max size of standard
135 * ethernet packet less the size of the MAC CHECKSUM */
136#define RX_MBUF_SIZE 1514
137
138/* The minimum number of memory buffers per queue tx or rx. Search for
139 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
140 */
141#define MIN_NB_BUF 64
142
143/* Number of receive memory buffers to use
144 * By default this is limited by driver to 4k and must be a multiple of 128.
145 * A modification can be made to the driver to remove this limit.
146 * This can be increased in the driver and here.
147 * Should be at least MIN_NB_BUF.
148 */
149#define NB_RX_MBUF 4096
150
151/* Number of send memory buffers to use.
152 * Same limits apply as those to NB_TX_MBUF.
153 */
154#define NB_TX_MBUF 1024
155
156/* The size of the PCI blacklist needs to be big enough to contain
157 * every PCI device address (listed by lspci every bus:device.function tuple).
158 */
159#define BLACK_LIST_SIZE 50
160
161/* The maximum number of characters the mempool name can be */
162#define MEMPOOL_NAME_LEN 20
163
164#define MBUF(x) ((struct rte_mbuf *) x)
165/* Get the original placement of the packet data */
166#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
167#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
168#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
169
170#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
171                        (uint64_t) tv.tv_usec*1000ull)
172#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
173                        (uint64_t) ts.tv_nsec)
174
175#if RTE_PKTMBUF_HEADROOM != 128
176#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
177         "any libtrace instance processing these packet must be have the" \
178         "same RTE_PKTMBUF_HEADROOM set"
179#endif
180
181/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
182 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
183 *
184 * Make sure you understand what these are doing before enabling them.
185 * They might make traces incompatable with other builds etc.
186 *
187 * These are also included to show how to do somethings which aren't
188 * obvious in the DPDK documentation.
189 */
190
191/* Print verbose messages to stdout */
192#define DEBUG 1
193
194/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
195 * only turn on if you know clock_gettime is a vsyscall on your system
196 * overwise could be a large overhead. Again gettimeofday() should be
197 * vsyscall also if it's not you should seriously consider updating your
198 * kernel.
199 */
200#ifdef HAVE_LIBRT
201/* You can turn this on (set to 1) to prefer clock_gettime */
202#define USE_CLOCK_GETTIME 0
203#else
204/* DONT CHANGE THIS !!! */
205#define USE_CLOCK_GETTIME 0
206#endif
207
208/* This is fairly safe to turn on - currently there appears to be a 'bug'
209 * in DPDK that will remove the checksum by making the packet appear 4bytes
210 * smaller than what it really is. Most formats don't include the checksum
211 * hence writing out a port such as int: ring: and dpdk: assumes there
212 * is no checksum and will attempt to write the checksum as part of the
213 * packet
214 */
215#define GET_MAC_CRC_CHECKSUM 0
216
217/* This requires a modification of the pmd drivers (inside Intel DPDK)
218 */
219#define HAS_HW_TIMESTAMPS_82580 0
220
221#if HAS_HW_TIMESTAMPS_82580
222# define TS_NBITS_82580     40
223/* The maximum on the +ve or -ve side that we can be, make it half way */
224# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
225#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
226#endif
227
228/* As per Intel 82580 specification - mismatch in 82580 datasheet
229 * it states ts is stored in Big Endian, however its actually Little */
230struct hw_timestamp_82580 {
231    uint64_t reserved;
232    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
233};
234
235enum paused_state {
236    DPDK_NEVER_STARTED,
237    DPDK_RUNNING,
238    DPDK_PAUSED,
239};
240
241struct dpdk_per_lcore_t
242{
243        // TODO move time stamp stuff here
244        uint16_t queue_id;
245        uint8_t port;
246};
247
248/* Used by both input and output however some fields are not used
249 * for output */
250struct dpdk_format_data_t {
251    int8_t promisc; /* promiscuous mode - RX only */
252    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
253    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
254    uint8_t paused; /* See paused_state */ 
255    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
256    int snaplen; /* The snap length for the capture - RX only */
257    /* We always have to setup both rx and tx queues even if we don't want them */
258    int nb_rx_buf; /* The number of packet buffers in the rx ring */
259    int nb_tx_buf; /* The number of packet buffers in the tx ring */
260    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
261#if DPDK_USE_BLACKLIST
262    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
263        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
264#endif
265    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
266    uint8_t rss_key[40]; // This is the RSS KEY
267#if HAS_HW_TIMESTAMPS_82580
268    /* Timestamping only relevent to RX */
269    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
270    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
271    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
272#endif
273        // DPDK normally seems to have a limit of 8 queues for a given card
274        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
275};
276
277enum dpdk_addt_hdr_flags {
278    INCLUDES_CHECKSUM = 0x1,
279    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
280};
281
282/**
283 * A structure placed in front of the packet where we can store
284 * additional information about the given packet.
285 * +--------------------------+
286 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
287 * +--------------------------+
288 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
289 * +--------------------------+
290 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
291 * +--------------------------+
292 * |   sizeof(dpdk_addt_hdr)  | 1 byte
293 * +--------------------------+
294 * *   hw_timestamp_82580     * 16 bytes Optional
295 * +--------------------------+
296 * |       Packet data        | Variable Size
297 * |                          |
298 */
299struct dpdk_addt_hdr {
300    uint64_t timestamp;
301    uint8_t flags;
302    uint8_t direction;
303    uint8_t reserved1;
304    uint8_t reserved2;
305    uint32_t cap_len; /* The size to say the capture is */
306};
307
308/**
309 * We want to blacklist all devices except those on the whitelist
310 * (I say list, but yes it is only the one).
311 *
312 * The default behaviour of rte_pci_probe() will map every possible device
313 * to its DPDK driver. The DPDK driver will take the ethernet device
314 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
315 *
316 * So blacklist all devices except the one that we wish to use so that
317 * the others can still be used as standard ethernet ports.
318 *
319 * @return 0 if successful, otherwise -1 on error.
320 */
321#if DPDK_USE_BLACKLIST
322static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
323{
324        struct rte_pci_device *dev = NULL;
325        format_data->nb_blacklist = 0;
326
327        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
328
329        TAILQ_FOREACH(dev, &device_list, next) {
330        if (whitelist != NULL && whitelist->domain == dev->addr.domain
331            && whitelist->bus == dev->addr.bus
332            && whitelist->devid == dev->addr.devid
333            && whitelist->function == dev->addr.function)
334            continue;
335                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
336                                / sizeof (format_data->blacklist[0])) {
337                        printf("Warning: too many devices to blacklist consider"
338                                        " increasing BLACK_LIST_SIZE");
339                        break;
340                }
341                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
342                ++format_data->nb_blacklist;
343        }
344
345        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
346        return 0;
347}
348#else /* DPDK_USE_BLACKLIST */
349#include <rte_devargs.h>
350static int blacklist_devices(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
351{
352        char pci_str[20] = {0};
353        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
354                 whitelist->domain,
355                 whitelist->bus,
356                 whitelist->devid,
357                 whitelist->function);
358        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
359                return -1;
360        }
361        return 0;
362}
363#endif
364
365/**
366 * Parse the URI format as a pci address
367 * Fills in addr, note core is optional and is unchanged if
368 * a value for it is not provided.
369 *
370 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
371 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
372 */
373static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
374    char * wrkstr;
375    char * pch;
376    assert(str);
377    wrkstr = strdup(str);
378   
379    pch = strtok(wrkstr,":");
380    if (pch == NULL || pch[0] == 0) {
381        free(wrkstr); return -1;
382    }
383    addr->domain = (uint16_t) atoi(pch);
384
385    pch = strtok(NULL,":");
386    if (pch == NULL || pch[0] == 0) {
387        free(wrkstr); return -1;
388    }
389    addr->bus = (uint8_t) atoi(pch);
390
391    pch = strtok(NULL,".");
392    if (pch == NULL || pch[0] == 0) {
393        free(wrkstr); return -1;
394    }
395    addr->devid = (uint8_t) atoi(pch);
396
397    pch = strtok(NULL,"-"); /* Might not find the '-' it's optional */
398    if (pch == NULL || pch[0] == 0) {
399        free(wrkstr); return -1;
400    }
401    addr->function = (uint8_t) atoi(pch);
402
403    pch = strtok(NULL, ""); /* Find end of string */
404   
405    if (pch != NULL && pch[0] != 0) {
406        *core = (long) atoi(pch);
407    }
408
409    free(wrkstr);
410    return 0;
411}
412
413#if DEBUG
414/* For debugging */
415static inline void dump_configuration()
416{
417    struct rte_config * global_config;
418    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
419   
420    if (nb_cpu <= 0) {
421        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
422        nb_cpu = 1; /* fallback to just 1 core */
423    }
424    if (nb_cpu > RTE_MAX_LCORE)
425        nb_cpu = RTE_MAX_LCORE;
426   
427    global_config = rte_eal_get_configuration();
428   
429    if (global_config != NULL) {
430        int i;
431        fprintf(stderr, "Intel DPDK setup\n"
432               "---Version      : %"PRIu32"\n"
433               "---Magic        : %"PRIu32"\n"
434               "---Master LCore : %"PRIu32"\n"
435               "---LCore Count  : %"PRIu32"\n",
436               global_config->version, global_config->magic, 
437               global_config->master_lcore, global_config->lcore_count);
438       
439        for (i = 0 ; i < nb_cpu; i++) {
440            fprintf(stderr, "   ---Core %d : %s\n", i, 
441                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
442        }
443       
444        const char * proc_type;
445        switch (global_config->process_type) {
446            case RTE_PROC_AUTO:
447                proc_type = "auto";
448                break;
449            case RTE_PROC_PRIMARY:
450                proc_type = "primary";
451                break;
452            case RTE_PROC_SECONDARY:
453                proc_type = "secondary";
454                break;
455            case RTE_PROC_INVALID:
456                proc_type = "invalid";
457                break;
458            default:
459                proc_type = "something worse than invalid!!";
460        }
461        fprintf(stderr, "---Process Type : %s\n", proc_type);
462    }
463   
464}
465#endif
466
467/**
468 * Expects to be called from the master lcore and moves it to the given dpdk id
469 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
470 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
471 *               and not already in use.
472 * @return 0 is successful otherwise -1 on error.
473 */
474static inline int dpdk_move_master_lcore(size_t core) {
475    struct rte_config *cfg = rte_eal_get_configuration();
476    cpu_set_t cpuset;
477    int i;
478
479    assert (core < RTE_MAX_LCORE);
480    assert (rte_get_master_lcore() == rte_lcore_id());
481
482    if (core == rte_lcore_id())
483        return 0;
484
485    // Make sure we are not overwriting someone else
486    assert(!rte_lcore_is_enabled(core));
487
488    // Move the core
489    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
490    cfg->lcore_role[core] = ROLE_RTE;
491    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
492    rte_eal_get_configuration()->master_lcore = core;
493    RTE_PER_LCORE(_lcore_id) = core;
494
495    // Now change the affinity
496    CPU_ZERO(&cpuset);
497
498    if (lcore_config[core].detected) {
499        CPU_SET(core, &cpuset);
500    } else {
501        for (i = 0; i < RTE_MAX_LCORE; ++i) {
502            if (lcore_config[i].detected)
503                CPU_SET(i, &cpuset);
504        }
505    }
506
507    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
508    if (i != 0) {
509        // TODO proper libtrace style error here!!
510        fprintf(stderr, "pthread_setaffinity_np failed\n");
511        return -1;
512    }
513    return 0;
514}
515
516
517/**
518 * XXX This is very bad XXX
519 * But we have to do something to allow getopts nesting
520 * Luckly normally the format is last so it doesn't matter
521 * DPDK only supports modern systems so hopefully this
522 * will continue to work
523 */
524struct saved_getopts {
525        char *optarg;
526        int optind;
527        int opterr;
528        int optopt;
529};
530
531static void save_getopts(struct saved_getopts *opts) {
532        opts->optarg = optarg;
533        opts->optind = optind;
534        opts->opterr = opterr;
535        opts->optopt = optopt;
536}
537
538static void restore_getopts(struct saved_getopts *opts) {
539        optarg = opts->optarg;
540        optind = opts->optind;
541        opterr = opts->opterr;
542        optopt = opts->optopt;
543}
544
545static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
546                                        char * err, int errlen) {
547    int ret; /* Returned error codes */
548    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
549    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
550    char mem_map[20] = {0}; /* The memory name */
551    long nb_cpu; /* The number of CPUs in the system */
552    long my_cpu; /* The CPU number we want to bind to */
553    int i;
554    struct rte_config *cfg = rte_eal_get_configuration();
555        struct saved_getopts save_opts;
556   
557#if DEBUG
558    rte_set_log_level(RTE_LOG_DEBUG);
559#else
560    rte_set_log_level(RTE_LOG_WARNING);
561#endif
562    /*
563     * Using unique file prefixes mean separate memory is used, unlinking
564     * the two processes. However be careful we still cannot access a
565     * port that already in use.
566     *
567     * Using unique file prefixes mean separate memory is used, unlinking
568     * the two processes. However be careful we still cannot access a
569     * port that already in use.
570     */
571    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
572                "--file-prefix", mem_map, "-m", "256", NULL};
573    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
574   
575    /* This initialises the Environment Abstraction Layer (EAL)
576     * If we had slave workers these are put into WAITING state
577     *
578     * Basically binds this thread to a fixed core, which we choose as
579     * the last core on the machine (assuming fewer interrupts mapped here).
580     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
581     * "-n" the number of memory channels into the CPU (hardware specific)
582     *      - Most likely to be half the number of ram slots in your machine.
583     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
584     * Controls where in memory packets are stored and should spread across
585     * the channels. We just use 1 to be safe.
586     */
587
588    /* Get the number of cpu cores in the system and use the last core */
589    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
590    if (nb_cpu <= 0) {
591        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
592        nb_cpu = 1; /* fallback to the first core */
593    }
594    if (nb_cpu > RTE_MAX_LCORE)
595        nb_cpu = RTE_MAX_LCORE;
596
597    my_cpu = nb_cpu;
598    /* This allows the user to specify the core - we would try to do this
599     * automatically but it's hard to tell that this is secondary
600     * before running rte_eal_init(...). Currently we are limited to 1
601     * instance per core due to the way memory is allocated. */
602    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
603        snprintf(err, errlen, "Failed to parse URI");
604        return -1;
605    }
606
607    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
608                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
609
610    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
611        snprintf(err, errlen, 
612          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
613          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
614        return -1;
615    }
616
617    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
618       info older versions */
619    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
620    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
621
622
623        /* Give the memory map a unique name */
624        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
625    /* rte_eal_init it makes a call to getopt so we need to reset the
626     * global optind variable of getopt otherwise this fails */
627        save_getopts(&save_opts);
628    optind = 1;
629    if ((ret = rte_eal_init(argc, argv)) < 0) {
630        snprintf(err, errlen, 
631          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
632        return -1;
633    }
634        restore_getopts(&save_opts);
635    // These are still running but will never do anything with DPDK v1.7 we
636    // should remove this XXX in the future
637    for(i = 0; i < RTE_MAX_LCORE; ++i) {
638        if (rte_lcore_is_enabled(i) && i != rte_get_master_lcore()) {
639            cfg->lcore_role[i] = ROLE_OFF;
640            cfg->lcore_count--;
641        }
642    }
643    // Only the master should be running
644    assert(cfg->lcore_count == 1);
645
646    dpdk_move_master_lcore(my_cpu-1);
647
648#if DEBUG
649    dump_configuration();
650#endif
651
652#if DPDK_USE_PMD_INIT
653    /* This registers all available NICs with Intel DPDK
654     * These are not loaded until rte_eal_pci_probe() is called.
655     */
656    if ((ret = rte_pmd_init_all()) < 0) {
657        snprintf(err, errlen, 
658          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
659        return -1;
660    }
661#endif
662
663    /* Black list all ports besides the one that we want to use */
664        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
665                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
666                         " are you sure the address is correct?: %s", strerror(-ret));
667                return -1;
668        }
669
670    /* This loads DPDK drivers against all ports that are not blacklisted */
671        if ((ret = rte_eal_pci_probe()) < 0) {
672        snprintf(err, errlen, 
673            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
674        return -1;
675    }
676
677    format_data->nb_ports = rte_eth_dev_count();
678
679    if (format_data->nb_ports != 1) {
680        snprintf(err, errlen, 
681            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
682            format_data->nb_ports);
683        return -1;
684    }
685   
686    struct rte_eth_dev_info dev_info;
687    rte_eth_dev_info_get(0, &dev_info);
688    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d", 
689                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
690
691    return 0;
692}
693
694static int dpdk_init_input (libtrace_t *libtrace) {
695    char err[500];
696    err[0] = 0;
697   
698    libtrace->format_data = (struct dpdk_format_data_t *)
699                            malloc(sizeof(struct dpdk_format_data_t));
700    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
701    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
702    FORMAT(libtrace)->nb_ports = 0;
703    FORMAT(libtrace)->snaplen = 0; /* Use default */
704    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
705    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
706    FORMAT(libtrace)->promisc = -1;
707    FORMAT(libtrace)->pktmbuf_pool = NULL;
708#if DPDK_USE_BLACKLIST
709    FORMAT(libtrace)->nb_blacklist = 0;
710#endif
711    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
712    FORMAT(libtrace)->mempool_name[0] = 0;
713#if HAS_HW_TIMESTAMPS_82580
714    FORMAT(libtrace)->ts_first_sys = 0;
715    FORMAT(libtrace)->ts_last_sys = 0;
716    FORMAT(libtrace)->wrap_count = 0;
717#endif
718       
719    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
720        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
721        free(libtrace->format_data);
722        libtrace->format_data = NULL;
723        return -1;
724    }
725    return 0;
726};
727
728static int dpdk_init_output(libtrace_out_t *libtrace)
729{
730    char err[500];
731    err[0] = 0;
732   
733    libtrace->format_data = (struct dpdk_format_data_t *)
734                            malloc(sizeof(struct dpdk_format_data_t));
735    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
736    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
737    FORMAT(libtrace)->nb_ports = 0;
738    FORMAT(libtrace)->snaplen = 0; /* Use default */
739    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
740    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
741    FORMAT(libtrace)->promisc = -1;
742    FORMAT(libtrace)->pktmbuf_pool = NULL;
743#if DPDK_USE_BLACKLIST
744    FORMAT(libtrace)->nb_blacklist = 0;
745#endif
746    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
747    FORMAT(libtrace)->mempool_name[0] = 0;
748#if HAS_HW_TIMESTAMPS_82580
749    FORMAT(libtrace)->ts_first_sys = 0;
750    FORMAT(libtrace)->ts_last_sys = 0;
751    FORMAT(libtrace)->wrap_count = 0;
752#endif
753
754    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
755        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
756        free(libtrace->format_data);
757        libtrace->format_data = NULL;
758        return -1;
759    }
760    return 0;
761};
762
763static int dpdk_pconfig_input (libtrace_t *libtrace,
764                                trace_parallel_option_t option,
765                                void *data) {
766        switch (option) {
767                case TRACE_OPTION_SET_HASHER:
768                        switch (*((enum hasher_types *) data))
769                        {
770                                case HASHER_BALANCE:
771                                case HASHER_UNIDIRECTIONAL:
772                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
773                                        return 0;
774                                case HASHER_BIDIRECTIONAL:
775                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
776                                        return 0;
777                                case HASHER_HARDWARE:
778                                case HASHER_CUSTOM:
779                                        // We don't support these
780                                        return -1;
781                        }
782        break;
783        }
784        return -1;
785}
786/**
787 * Note here snaplen excludes the MAC checksum. Packets over
788 * the requested snaplen will be dropped. (Excluding MAC checksum)
789 *
790 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
791 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
792 * is set the maximum size of the returned packet would be 1518 otherwise
793 * 1514 would be the largest size possibly returned.
794 *
795 */
796static int dpdk_config_input (libtrace_t *libtrace,
797                                        trace_option_t option,
798                                        void *data) {
799    switch (option) {
800        case TRACE_OPTION_SNAPLEN:
801            /* Only support changing snaplen before a call to start is
802             * made */
803            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
804                FORMAT(libtrace)->snaplen=*(int*)data;
805            else
806                return -1;
807            return 0;
808                case TRACE_OPTION_PROMISC:
809                        FORMAT(libtrace)->promisc=*(int*)data;
810            return 0;
811        case TRACE_OPTION_FILTER:
812            /* TODO filtering */
813            break;
814        case TRACE_OPTION_META_FREQ:
815            break;
816        case TRACE_OPTION_EVENT_REALTIME:
817            break;
818        /* Avoid default: so that future options will cause a warning
819         * here to remind us to implement it, or flag it as
820         * unimplementable
821         */
822    }
823
824        /* Don't set an error - trace_config will try to deal with the
825         * option and will set an error if it fails */
826    return -1;
827}
828
829/* Can set jumbo frames/ or limit the size of a frame by setting both
830 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
831 *
832 */
833static struct rte_eth_conf port_conf = {
834        .rxmode = {
835                .mq_mode = ETH_RSS,
836                .split_hdr_size = 0,
837                .header_split   = 0, /**< Header Split disabled */
838                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
839                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
840                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
841        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
842#if GET_MAC_CRC_CHECKSUM
843/* So it appears that if hw_strip_crc is turned off the driver will still
844 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
845 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
846 * So lets just add it back on when we receive the packet.
847 */
848                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
849#else
850/* By default strip the MAC checksum because it's a bit of a hack to
851 * actually read these. And don't want to rely on disabling this to actualy
852 * always cut off the checksum in the future
853 */
854        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
855#endif
856        },
857        .txmode = {
858                .mq_mode = ETH_DCB_NONE,
859        },
860        .rx_adv_conf = {
861                .rss_conf = {
862                        // .rss_key = &rss_key, // We set this per format
863                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
864                },
865        },
866};
867
868static const struct rte_eth_rxconf rx_conf = {
869        .rx_thresh = {
870                .pthresh = 8,/* RX_PTHRESH prefetch */
871                .hthresh = 8,/* RX_HTHRESH host */
872                .wthresh = 4,/* RX_WTHRESH writeback */
873        },
874    .rx_free_thresh = 0,
875    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
876};
877
878static const struct rte_eth_txconf tx_conf = {
879        .tx_thresh = {
880        /**
881         * TX_PTHRESH prefetch
882         * Set on the NIC, if the number of unprocessed descriptors to queued on
883         * the card fall below this try grab at least hthresh more unprocessed
884         * descriptors.
885         */
886                .pthresh = 36,
887
888        /* TX_HTHRESH host
889         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
890         */
891                .hthresh = 0,
892       
893        /* TX_WTHRESH writeback
894         * Set on the NIC, the number of sent descriptors before writing back
895         * status to confirm the transmission. This is done more efficiently as
896         * a bulk DMA-transfer rather than writing one at a time.
897         * Similar to tx_free_thresh however this is applied to the NIC, where
898         * as tx_free_thresh is when DPDK will check these. This is extended
899         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
900         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
901         */
902                .wthresh = 4,
903        },
904
905    /* Used internally by DPDK rather than passed to the NIC. The number of
906     * packet descriptors to send before checking for any responses written
907     * back (to confirm the transmission). Default = 32 if set to 0)
908     */
909        .tx_free_thresh = 0,
910
911    /* This is the Report Status threshold, used by 10Gbit cards,
912     * This signals the card to only write back status (such as
913     * transmission successful) after this minimum number of transmit
914     * descriptors are seen. The default is 32 (if set to 0) however if set
915     * to greater than 1 TX wthresh must be set to zero, because this is kindof
916     * a replacement. See the dpdk programmers guide for more restrictions.
917     */
918        .tx_rs_thresh = 1,
919};
920
921/* Attach memory to the port and start the port or restart the port.
922 */
923static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
924    int ret; /* Check return values for errors */
925    struct rte_eth_link link_info; /* Wait for link */
926   
927    /* Already started */
928    if (format_data->paused == DPDK_RUNNING)
929        return 0;
930
931    /* First time started we need to alloc our memory, doing this here
932     * rather than in environment setup because we don't have snaplen then */
933    if (format_data->paused == DPDK_NEVER_STARTED) {
934        if (format_data->snaplen == 0) {
935            format_data->snaplen = RX_MBUF_SIZE;
936            port_conf.rxmode.jumbo_frame = 0;
937            port_conf.rxmode.max_rx_pkt_len = 0;
938        } else {
939            /* Use jumbo frames */
940            port_conf.rxmode.jumbo_frame = 1;
941            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
942        }
943
944        /* This is additional overhead so make sure we allow space for this */
945#if GET_MAC_CRC_CHECKSUM
946        format_data->snaplen += ETHER_CRC_LEN;
947#endif
948#if HAS_HW_TIMESTAMPS_82580
949        format_data->snaplen += sizeof(struct hw_timestamp_82580);
950#endif
951
952        /* Create the mbuf pool, which is the place our packets are allocated
953         * from - TODO figure out if there is is a free function (I cannot see one)
954         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
955         * allocate however that extra 1 packet is not used.
956         * (I assume <= vs < error some where in DPDK code)
957         * TX requires nb_tx_buffers + 1 in the case the queue is full
958         * so that will fill the new buffer and wait until slots in the
959         * ring become available.
960         */
961#if DEBUG
962    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
963#endif
964        format_data->pktmbuf_pool =
965            rte_mempool_create(format_data->mempool_name,
966                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
967                       format_data->snaplen + sizeof(struct rte_mbuf) 
968                                        + RTE_PKTMBUF_HEADROOM,
969                       8, sizeof(struct rte_pktmbuf_pool_private),
970                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
971                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
972
973        if (format_data->pktmbuf_pool == NULL) {
974            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
975                        "pool failed: %s", strerror(rte_errno));
976            return -1;
977        }
978    }
979   
980    /* ----------- Now do the setup for the port mapping ------------ */
981    /* Order of calls must be
982     * rte_eth_dev_configure()
983     * rte_eth_tx_queue_setup()
984     * rte_eth_rx_queue_setup()
985     * rte_eth_dev_start()
986     * other rte_eth calls
987     */
988   
989   
990    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
991   
992    /* This must be called first before another *eth* function
993     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
994    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
995    if (ret < 0) {
996        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
997                            " %"PRIu8" : %s", format_data->port,
998                            strerror(-ret));
999        return -1;
1000    }
1001    /* Initialise the TX queue a minimum value if using this port for
1002     * receiving. Otherwise a larger size if writing packets.
1003     */
1004    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1005                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1006    if (ret < 0) {
1007        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1008                            " %"PRIu8" : %s", format_data->port,
1009                            strerror(-ret));
1010        return -1;
1011    }
1012    /* Initialise the RX queue with some packets from memory */
1013    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
1014                            format_data->nb_rx_buf, SOCKET_ID_ANY, 
1015                            &rx_conf, format_data->pktmbuf_pool);
1016    if (ret < 0) {
1017        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1018                    " %"PRIu8" : %s", format_data->port,
1019                    strerror(-ret));
1020        return -1;
1021    }
1022   
1023    /* Start device */
1024    ret = rte_eth_dev_start(format_data->port);
1025    if (ret < 0) {
1026        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1027                    strerror(-ret));
1028        return -1;
1029    }
1030
1031    /* Default promiscuous to on */
1032    if (format_data->promisc == -1)
1033        format_data->promisc = 1;
1034   
1035    if (format_data->promisc == 1)
1036        rte_eth_promiscuous_enable(format_data->port);
1037    else
1038        rte_eth_promiscuous_disable(format_data->port);
1039   
1040    /* Wait for the link to come up */
1041    rte_eth_link_get(format_data->port, &link_info);
1042#if DEBUG
1043    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1044            (int) link_info.link_duplex, (int) link_info.link_speed);
1045#endif
1046
1047    /* We have now successfully started/unpaused */
1048    format_data->paused = DPDK_RUNNING;
1049   
1050    return 0;
1051}
1052
1053/* Attach memory to the port and start (or restart) the port/s.
1054 */
1055static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues){
1056    int ret, i; /* Check return values for errors */
1057    struct rte_eth_link link_info; /* Wait for link */
1058   
1059    /* Already started */
1060    if (format_data->paused == DPDK_RUNNING)
1061        return 0;
1062
1063    /* First time started we need to alloc our memory, doing this here
1064     * rather than in environment setup because we don't have snaplen then */
1065    if (format_data->paused == DPDK_NEVER_STARTED) {
1066        if (format_data->snaplen == 0) {
1067            format_data->snaplen = RX_MBUF_SIZE;
1068            port_conf.rxmode.jumbo_frame = 0;
1069            port_conf.rxmode.max_rx_pkt_len = 0;
1070        } else {
1071            /* Use jumbo frames */
1072            port_conf.rxmode.jumbo_frame = 1;
1073            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1074        }
1075
1076        /* This is additional overhead so make sure we allow space for this */
1077#if GET_MAC_CRC_CHECKSUM
1078        format_data->snaplen += ETHER_CRC_LEN;
1079#endif
1080#if HAS_HW_TIMESTAMPS_82580
1081        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1082#endif
1083
1084        /* Create the mbuf pool, which is the place our packets are allocated
1085         * from - TODO figure out if there is is a free function (I cannot see one)
1086         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1087         * allocate however that extra 1 packet is not used.
1088         * (I assume <= vs < error some where in DPDK code)
1089         * TX requires nb_tx_buffers + 1 in the case the queue is full
1090         * so that will fill the new buffer and wait until slots in the
1091         * ring become available.
1092         */
1093#if DEBUG
1094    printf("Creating mempool named %s\n", format_data->mempool_name);
1095#endif
1096        format_data->pktmbuf_pool =
1097            rte_mempool_create(format_data->mempool_name,
1098                       format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1,
1099                       format_data->snaplen + sizeof(struct rte_mbuf) 
1100                                        + RTE_PKTMBUF_HEADROOM,
1101                       8, sizeof(struct rte_pktmbuf_pool_private),
1102                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1103                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
1104
1105        if (format_data->pktmbuf_pool == NULL) {
1106            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1107                        "pool failed: %s", strerror(rte_errno));
1108            return -1;
1109        }
1110    }
1111   
1112    /* ----------- Now do the setup for the port mapping ------------ */
1113    /* Order of calls must be
1114     * rte_eth_dev_configure()
1115     * rte_eth_tx_queue_setup()
1116     * rte_eth_rx_queue_setup()
1117     * rte_eth_dev_start()
1118     * other rte_eth calls
1119     */
1120   
1121    /* This must be called first before another *eth* function
1122     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1123    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1124    if (ret < 0) {
1125        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1126                            " %"PRIu8" : %s", format_data->port,
1127                            strerror(-ret));
1128        return -1;
1129    }
1130#if DEBUG
1131    printf("Doing dev configure\n");
1132#endif
1133    /* Initialise the TX queue a minimum value if using this port for
1134     * receiving. Otherwise a larger size if writing packets.
1135     */
1136    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1137                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1138    if (ret < 0) {
1139        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1140                            " %"PRIu8" : %s", format_data->port,
1141                            strerror(-ret));
1142        return -1;
1143    }
1144   
1145    for (i=0; i < rx_queues; i++) {
1146#if DEBUG
1147    printf("Doing queue configure\n");
1148#endif 
1149                /* Initialise the RX queue with some packets from memory */
1150                ret = rte_eth_rx_queue_setup(format_data->port, i,
1151                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
1152                                                                &rx_conf, format_data->pktmbuf_pool);
1153        /* Init per_thread data structures */
1154        format_data->per_lcore[i].port = format_data->port;
1155        format_data->per_lcore[i].queue_id = i;
1156
1157                if (ret < 0) {
1158                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1159                                                " %"PRIu8" : %s", format_data->port,
1160                                                strerror(-ret));
1161                        return -1;
1162                }
1163        }
1164   
1165#if DEBUG
1166    fprintf(stderr, "Doing start device\n");
1167#endif 
1168    /* Start device */
1169    ret = rte_eth_dev_start(format_data->port);
1170#if DEBUG
1171    fprintf(stderr, "Done start device\n");
1172#endif 
1173    if (ret < 0) {
1174        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1175                    strerror(-ret));
1176        return -1;
1177    }
1178
1179
1180    /* Default promiscuous to on */
1181    if (format_data->promisc == -1)
1182        format_data->promisc = 1;
1183   
1184    if (format_data->promisc == 1)
1185        rte_eth_promiscuous_enable(format_data->port);
1186    else
1187        rte_eth_promiscuous_disable(format_data->port);
1188   
1189   
1190    /* We have now successfully started/unpased */
1191    format_data->paused = DPDK_RUNNING;
1192   
1193    // Can use remote launch for all
1194    /*RTE_LCORE_FOREACH_SLAVE(i) {
1195                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
1196        }*/
1197   
1198    /* Wait for the link to come up */
1199    rte_eth_link_get(format_data->port, &link_info);
1200#if DEBUG
1201    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1202            (int) link_info.link_duplex, (int) link_info.link_speed);
1203#endif
1204
1205    return 0;
1206}
1207
1208static int dpdk_start_input (libtrace_t *libtrace) {
1209    char err[500];
1210    err[0] = 0;
1211
1212    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1213        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1214        free(libtrace->format_data);
1215        libtrace->format_data = NULL;
1216        return -1;
1217    }
1218    return 0;
1219}
1220
1221static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1222    struct rte_eth_dev_info dev_info;
1223    rte_eth_dev_info_get(port_id, &dev_info);
1224    return dev_info.max_rx_queues;
1225}
1226
1227static inline size_t dpdk_processor_count () {
1228    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1229    if (nb_cpu <= 0)
1230        return 1;
1231    else
1232        return (size_t) nb_cpu;
1233}
1234
1235static int dpdk_pstart_input (libtrace_t *libtrace) {
1236    char err[500];
1237    int i=0, phys_cores=0;
1238    int tot = libtrace->perpkt_thread_count;
1239    err[0] = 0;
1240
1241    if (rte_lcore_id() != rte_get_master_lcore())
1242        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
1243
1244    // If the master is not on the last thread we move it there
1245    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1246        // Consider error handling here
1247        dpdk_move_master_lcore(RTE_MAX_LCORE - 1) == -1;
1248    }
1249
1250    // Don't exceed the number of cores in the system/detected by dpdk
1251    // We don't have to force this but performance wont be good if we don't
1252    for (i = 0; i < RTE_MAX_LCORE; ++i) {
1253        if (lcore_config[i].detected) {
1254            if (rte_lcore_is_enabled(i))
1255                fprintf(stderr, "Found core %d already in use!\n", i);
1256            else
1257                phys_cores++;
1258        }
1259    }
1260
1261        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1262    tot = MIN(tot, phys_cores);
1263
1264        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
1265       
1266    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1267        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1268        free(libtrace->format_data);
1269        libtrace->format_data = NULL;
1270        return -1;
1271    }
1272
1273    // Make sure we only start the number that we should
1274    libtrace->perpkt_thread_count = tot;
1275    return 0;
1276}
1277
1278
1279/**
1280 * Register a thread with the DPDK system,
1281 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1282 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1283 * gives it.
1284 *
1285 * We then allow a mapper thread to be started on every real core as DPDK would
1286 * we also bind these to the corresponding CPU cores.
1287 *
1288 * @param libtrace A pointer to the trace
1289 * @param reading True if the thread will be used to read packets, i.e. will
1290 *                call pread_packet(), false if thread used to process packet
1291 *                in any other manner including statistics functions.
1292 */
1293static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1294{
1295    struct rte_config *cfg = rte_eal_get_configuration();
1296    int i;
1297    int new_id = -1;
1298
1299    // If 'reading packets' fill in cores from 0 up and bind affinity
1300    // otherwise start from the MAX core (which is also the master) and work backwards
1301    // in this case physical cores on the system will not exist so we don't bind
1302    // these to any particular physical core
1303    if (reading) {
1304        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1305            if (!rte_lcore_is_enabled(i)) {
1306                new_id = i;
1307                if (!lcore_config[i].detected)
1308                    fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
1309                break;
1310            }
1311        }
1312    } else {
1313        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1314            if (!rte_lcore_is_enabled(i)) {
1315                new_id = i;
1316                break;
1317            }
1318        }
1319    }
1320
1321    if (new_id == -1) {
1322        assert(cfg->lcore_count == RTE_MAX_LCORE);
1323        // TODO proper libtrace style error here!!
1324        fprintf(stderr, "Too many threads for DPDK!!\n");
1325        return -1;
1326    }
1327
1328    // Enable the core in global DPDK structs
1329    cfg->lcore_role[new_id] = ROLE_RTE;
1330    cfg->lcore_count++;
1331    // Set TLS to reflect our new number
1332    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
1333    fprintf(stderr, "original id%d", rte_lcore_id());
1334    RTE_PER_LCORE(_lcore_id) = new_id;
1335    fprintf(stderr, " new id%d\n", rte_lcore_id());
1336
1337    if (reading) {
1338        // Set affinity bind to corresponding core
1339        cpu_set_t cpuset;
1340        CPU_ZERO(&cpuset);
1341        CPU_SET(rte_lcore_id(), &cpuset);
1342        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1343        if (i != 0) {
1344            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
1345            return -1;
1346        }
1347    }
1348
1349    // Map our TLS to the thread data
1350    if (reading) {
1351        if(t->type == THREAD_PERPKT) {
1352            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
1353        } else {
1354            t->format_data = &FORMAT(libtrace)->per_lcore[0];
1355        }
1356    }
1357}
1358
1359
1360/**
1361 * Unregister a thread with the DPDK system.
1362 *
1363 * Only previously registered threads should be calling this just before
1364 * they are destroyed.
1365 */
1366static int dpdk_punregister_thread(libtrace_t libtrace, libtrace_thread_t *t UNUSED)
1367{
1368    struct rte_config *cfg = rte_eal_get_configuration();
1369
1370    assert(rte_lcore_id() >= 0 && rte_lcore_id() < RTE_MAX_LCORE);
1371
1372    // Skip if master!!
1373    if (rte_lcore_id() == rte_get_master_lcore()) {
1374        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1375        return 0;
1376    }
1377
1378    // Disable this core in global DPDK structs
1379    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1380    cfg->lcore_count--;
1381    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1382    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1383    return 0;
1384}
1385
1386static int dpdk_start_output(libtrace_out_t *libtrace)
1387{
1388    char err[500];
1389    err[0] = 0;
1390   
1391    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1392        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1393        free(libtrace->format_data);
1394        libtrace->format_data = NULL;
1395        return -1;
1396    }
1397    return 0;
1398}
1399
1400static int dpdk_pause_input(libtrace_t * libtrace){
1401    /* This stops the device, but can be restarted using rte_eth_dev_start() */
1402    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1403#if DEBUG     
1404        fprintf(stderr, "Pausing port\n");
1405#endif
1406        rte_eth_dev_stop(FORMAT(libtrace)->port);
1407        FORMAT(libtrace)->paused = DPDK_PAUSED;
1408        /* If we pause it the driver will be reset and likely our counter */
1409#if HAS_HW_TIMESTAMPS_82580
1410        FORMAT(libtrace)->ts_first_sys = 0;
1411        FORMAT(libtrace)->ts_last_sys = 0;
1412#endif
1413    }
1414    return 0;
1415}
1416
1417static int dpdk_write_packet(libtrace_out_t *trace, 
1418                libtrace_packet_t *packet){
1419    struct rte_mbuf* m_buff[1];
1420   
1421    int wirelen = trace_get_wire_length(packet);
1422    int caplen = trace_get_capture_length(packet);
1423   
1424    /* Check for a checksum and remove it */
1425    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1426                                            wirelen == caplen)
1427        caplen -= ETHER_CRC_LEN;
1428
1429    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1430    if (m_buff[0] == NULL) {
1431        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1432        return -1;
1433    } else {
1434        int ret;
1435        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1436        do {
1437            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1438        } while (ret != 1);
1439    }
1440
1441    return 0;
1442}
1443
1444static int dpdk_fin_input(libtrace_t * libtrace) {
1445    /* Free our memory structures */
1446    if (libtrace->format_data != NULL) {
1447        /* Close the device completely, device cannot be restarted */
1448        if (FORMAT(libtrace)->port != 0xFF)
1449            rte_eth_dev_close(FORMAT(libtrace)->port);
1450        /* filter here if we used it */
1451                free(libtrace->format_data);
1452        }
1453
1454    /* Revert to the original PCI drivers */
1455    /* No longer in DPDK
1456    rte_eal_pci_exit(); */
1457    return 0;
1458}
1459
1460
1461static int dpdk_fin_output(libtrace_out_t * libtrace) {
1462    /* Free our memory structures */
1463    if (libtrace->format_data != NULL) {
1464        /* Close the device completely, device cannot be restarted */
1465        if (FORMAT(libtrace)->port != 0xFF)
1466            rte_eth_dev_close(FORMAT(libtrace)->port);
1467        /* filter here if we used it */
1468                free(libtrace->format_data);
1469        }
1470
1471    /* Revert to the original PCI drivers */
1472    /* No longer in DPDK
1473    rte_eal_pci_exit(); */
1474    return 0;
1475}
1476
1477/**
1478 * Get the start of additional header that we added to a packet.
1479 */
1480static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1481    uint8_t *hdrsize;
1482    assert(packet);
1483    assert(packet->buffer);
1484    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
1485    /* The byte before the original packet data denotes the size in bytes
1486     * of our additional header that we added sits before the 'size byte' */
1487    hdrsize--;
1488    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
1489}
1490
1491static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1492    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1493    return hdr->cap_len;
1494}
1495
1496static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1497    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1498    if (size > hdr->cap_len) {
1499        /* Cannot make a packet bigger */
1500                return trace_get_capture_length(packet);
1501        }
1502
1503    /* Reset the cached capture length first*/
1504    packet->capture_length = -1;
1505    hdr->cap_len = (uint32_t) size;
1506        return trace_get_capture_length(packet);
1507}
1508
1509static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1510    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1511    int org_cap_size; /* The original capture size */
1512    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1513        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1514                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
1515                            sizeof(struct hw_timestamp_82580);
1516    } else {
1517        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1518                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
1519    }
1520    if (hdr->flags & INCLUDES_CHECKSUM) {
1521        return org_cap_size;
1522    } else {
1523        /* DPDK packets are always TRACE_TYPE_ETH packets */
1524        return org_cap_size + ETHER_CRC_LEN;
1525    }
1526}
1527static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1528    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1529    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1530        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1531                sizeof(struct hw_timestamp_82580);
1532    else
1533        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1534}
1535
1536static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1537                libtrace_packet_t *packet, void *buffer,
1538                libtrace_rt_types_t rt_type, uint32_t flags) {
1539    assert(packet);
1540    if (packet->buffer != buffer &&
1541        packet->buf_control == TRACE_CTRL_PACKET) {
1542        free(packet->buffer);
1543    }
1544
1545    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1546        packet->buf_control = TRACE_CTRL_PACKET;
1547    } else
1548        packet->buf_control = TRACE_CTRL_EXTERNAL;
1549
1550    packet->buffer = buffer;
1551    packet->header = buffer;
1552
1553    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1554    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1555    packet->type = rt_type;
1556    return 0;
1557}
1558
1559/*
1560 * Does any extra preperation to a captured packet.
1561 * This includes adding our extra header to it with the timestamp
1562 */
1563static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1564                                                        struct rte_mbuf* pkt){
1565    uint8_t * hdr_size;
1566    struct dpdk_addt_hdr *hdr;
1567#if HAS_HW_TIMESTAMPS_82580
1568    struct hw_timestamp_82580 *hw_ts;
1569    struct timeval cur_sys_time;
1570    uint64_t cur_sys_time_ns;
1571    uint64_t estimated_wraps;
1572   
1573    /* Using gettimeofday because it's most likely to be a vsyscall
1574     * We don't want to slow down anything with systemcalls we dont need
1575     * accauracy */
1576    gettimeofday(&cur_sys_time, NULL);
1577#else
1578# if USE_CLOCK_GETTIME
1579    struct timespec cur_sys_time;
1580   
1581    /* This looks terrible and I feel bad doing it. But it's OK
1582     * on new kernels, because this is a vsyscall */
1583    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1584# else
1585    struct timeval cur_sys_time;
1586    /* Should be a vsyscall */
1587    gettimeofday(&cur_sys_time, NULL);
1588# endif
1589#endif
1590
1591    /* Record the size of our header */
1592    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1593    *hdr_size = sizeof(struct dpdk_addt_hdr);
1594    /* Now put our header in front of that size */
1595    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1596    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1597   
1598#if GET_MAC_CRC_CHECKSUM
1599    /* Add back in the CRC sum */
1600    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1601    pkt->pkt.data_len += ETHER_CRC_LEN;
1602    hdr->flags |= INCLUDES_CHECKSUM;
1603#endif
1604
1605#if HAS_HW_TIMESTAMPS_82580
1606    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1607     *
1608     *        +----------+---+   +--------------+
1609     *  82580 |    24    | 8 |   |      32      |
1610     *        +----------+---+   +--------------+
1611     *          reserved  \______ 40 bits _____/
1612     *
1613     * The 40 bit 82580 SYSTIM overflows every
1614     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1615     *
1616     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1617     * Endian (for the full 64 bits) i.e. picture is mirrored
1618     */
1619   
1620    /* The timestamp is sitting before our packet and is included in pkt_len */
1621    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1622    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1623   
1624    /* Despite what the documentation says this is in Little
1625     * Endian byteorder. Mask the reserved section out.
1626     */
1627    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1628                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1629               
1630    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1631    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1632        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1633        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1634    }
1635   
1636    /* This will have serious problems if packets aren't read quickly
1637     * that is within a couple of seconds because our clock cycles every
1638     * 18 seconds */
1639    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1640                            / (1ull<<TS_NBITS_82580);
1641   
1642    /* Estimated_wraps gives the number of times the counter should have
1643     * wrapped (however depending on value last time it could have wrapped
1644     * twice more (if hw clock is close to its max value) or once less (allowing
1645     * for a bit of variance between hw and sys clock). But if the clock
1646     * shouldn't have wrapped once then don't allow it to go backwards in time */
1647    if (unlikely(estimated_wraps >= 2)) {
1648        /* 2 or more wrap arounds add all but the very last wrap */
1649        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1650    }
1651   
1652    /* Set the timestamp to the lowest possible value we're considering */
1653    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1654                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1655   
1656    /* In most runs only the first if() will need evaluating - i.e our
1657     * estimate is correct. */
1658    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1659                                hdr->timestamp, MAXSKEW_82580))) {
1660        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1661        FORMAT(libtrace)->wrap_count++;
1662        hdr->timestamp += (1ull<<TS_NBITS_82580);
1663        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1664                                hdr->timestamp, MAXSKEW_82580)) {
1665            /* Failed to match estimated_wraps */
1666            FORMAT(libtrace)->wrap_count++;
1667            hdr->timestamp += (1ull<<TS_NBITS_82580);
1668            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1669                                hdr->timestamp, MAXSKEW_82580)) {
1670                if (estimated_wraps == 0) {
1671                    /* 0 case Failed to match estimated_wraps+2 */
1672                    printf("WARNING - Hardware Timestamp failed to"
1673                                            " match using systemtime!\n");
1674                    hdr->timestamp = cur_sys_time_ns;
1675                } else {
1676                    /* Failed to match estimated_wraps+1 */
1677                    FORMAT(libtrace)->wrap_count++;
1678                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1679                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1680                                hdr->timestamp, MAXSKEW_82580)) {
1681                        /* Failed to match estimated_wraps+2 */
1682                        printf("WARNING - Hardware Timestamp failed to"
1683                                            " match using systemtime!!\n");
1684                    }
1685                }
1686            }
1687        }
1688    }
1689
1690    /* Log our previous for the next loop */
1691    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1692
1693#else
1694# if USE_CLOCK_GETTIME
1695    hdr->timestamp = TS_TO_NS(cur_sys_time);
1696# else
1697    hdr->timestamp = TV_TO_NS(cur_sys_time);
1698# endif
1699#endif
1700
1701    /* Intels samples prefetch into level 0 cache lets assume it is a good
1702     * idea and do the same */
1703    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1704    packet->buffer = pkt;
1705    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1706
1707    /* Set our capture length for the first time */
1708    hdr->cap_len = dpdk_get_wire_length(packet);
1709    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1710        hdr->cap_len -= ETHER_CRC_LEN;
1711    }
1712   
1713
1714    return dpdk_get_framing_length(packet) +
1715                        dpdk_get_capture_length(packet);
1716}
1717
1718
1719static void dpdk_fin_packet(libtrace_packet_t *packet)
1720{
1721        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1722                rte_pktmbuf_free(packet->buffer);
1723                packet->buffer = NULL;
1724        }
1725}
1726
1727static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1728    int nb_rx; /* Number of rx packets we've recevied */
1729    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1730
1731    /* Free the last packet buffer */
1732    if (packet->buffer != NULL) {
1733        /* Buffer is owned by DPDK */
1734        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1735            rte_pktmbuf_free(packet->buffer);
1736            packet->buffer = NULL;
1737        } else
1738        /* Buffer is owned by packet i.e. has been malloc'd */
1739        if (packet->buf_control == TRACE_CTRL_PACKET) {
1740            free(packet->buffer);
1741            packet->buffer = NULL;
1742        }
1743    }
1744   
1745    packet->buf_control = TRACE_CTRL_EXTERNAL;
1746    packet->type = TRACE_RT_DATA_DPDK;
1747   
1748    /* Wait for a packet */
1749    while (1) {
1750        /* Poll for a single packet */
1751        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1752                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1753        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1754            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1755        }
1756    }
1757   
1758    /* We'll never get here - but if we did it would be bad */
1759    return -1;
1760}
1761
1762static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
1763    int nb_rx; /* Number of rx packets we've recevied */
1764    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1765
1766    /* Free the last packet buffer */
1767    if (packet->buffer != NULL) {
1768        /* Buffer is owned by DPDK */
1769        if ( packet->buf_control == TRACE_CTRL_EXTERNAL) {
1770            rte_pktmbuf_free(packet->buffer);
1771            packet->buffer = NULL;
1772        } else
1773        /* Buffer is owned by packet i.e. has been malloc'd */
1774        if (packet->buf_control == TRACE_CTRL_PACKET) {
1775            free(packet->buffer);
1776            packet->buffer = NULL;
1777        }
1778    }
1779   
1780    packet->buf_control = TRACE_CTRL_EXTERNAL;
1781    packet->type = TRACE_RT_DATA_DPDK;
1782   
1783    /* Wait for a packet */
1784    while (1) {
1785        /* Poll for a single packet */
1786        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
1787                            PERPKT_FORMAT(t)->queue_id, pkts_burst, 1);
1788        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1789                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1790            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1791        }
1792        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
1793        if (libtrace_message_queue_count(&t->messages) > 0) {
1794                        printf("Extra message yay");
1795                        return -2;
1796                }
1797    }
1798   
1799    /* We'll never get here - but if we did it would be bad */
1800    return -1;
1801}
1802
1803static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1804    struct timeval tv;
1805    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1806   
1807    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1808    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1809    return tv;
1810}
1811
1812static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1813    struct timespec ts;
1814    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1815   
1816    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1817    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1818    return ts;
1819}
1820
1821static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1822    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1823}
1824
1825static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1826    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1827    return (libtrace_direction_t) hdr->direction;
1828}
1829
1830static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1831    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1832    hdr->direction = (uint8_t) direction;
1833    return (libtrace_direction_t) hdr->direction;
1834}
1835
1836/*
1837 * NOTE: Drops could occur for other reasons than running out of buffer
1838 * space. Such as failed MAC checksums and oversized packets.
1839 */
1840static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1841    struct rte_eth_stats stats = {0};
1842   
1843    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1844        return UINT64_MAX;
1845    /* Grab the current stats */
1846    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1847   
1848    /* Get the drop counter */
1849    return (uint64_t) stats.ierrors;
1850}
1851
1852static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1853    struct rte_eth_stats stats = {0};
1854   
1855    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1856        return UINT64_MAX;
1857    /* Grab the current stats */
1858    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1859   
1860    /* Get the drop counter */
1861    return (uint64_t) stats.ipackets;
1862}
1863
1864/*
1865 * This is the number of packets filtered by the NIC
1866 * and maybe ahead of number read using libtrace.
1867 *
1868 * XXX we are yet to implement any filtering, but if it was this should
1869 * get the result. So this will just return 0 for now.
1870 */
1871static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1872    struct rte_eth_stats stats = {0};
1873   
1874    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1875        return UINT64_MAX;
1876    /* Grab the current stats */
1877    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1878   
1879    /* Get the drop counter */
1880    return (uint64_t) stats.fdirmiss;
1881}
1882
1883/* Attempts to read a packet in a non-blocking fashion. If one is not
1884 * available a SLEEP event is returned. We do not have the ability to
1885 * create a select()able file descriptor in DPDK.
1886 */
1887static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1888                                        libtrace_packet_t *packet) {
1889    libtrace_eventobj_t event = {0,0,0.0,0};
1890    int nb_rx; /* Number of receive packets we've read */
1891    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1892   
1893    do {
1894   
1895        /* See if we already have a packet waiting */
1896        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1897                        FORMAT(trace)->queue_id, pkts_burst, 1);
1898       
1899        if (nb_rx > 0) {
1900            /* Free the last packet buffer */
1901            if (packet->buffer != NULL) {
1902                /* Buffer is owned by DPDK */
1903                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1904                    rte_pktmbuf_free(packet->buffer);
1905                    packet->buffer = NULL;
1906                } else
1907                /* Buffer is owned by packet i.e. has been malloc'd */
1908                if (packet->buf_control == TRACE_CTRL_PACKET) {
1909                    free(packet->buffer);
1910                    packet->buffer = NULL;
1911                }
1912            }
1913           
1914            packet->buf_control = TRACE_CTRL_EXTERNAL;
1915            packet->type = TRACE_RT_DATA_DPDK;
1916            event.type = TRACE_EVENT_PACKET;
1917            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1918           
1919            /* XXX - Check this passes the filter trace_read_packet normally
1920             * does this for us but this wont */
1921            if (trace->filter) {
1922                if (!trace_apply_filter(trace->filter, packet)) {
1923                    /* Failed the filter so we loop for another packet */
1924                    trace->filtered_packets ++;
1925                    continue;
1926                }
1927            }
1928            trace->accepted_packets ++;
1929        } else {
1930            /* We only want to sleep for a very short time - we are non-blocking */
1931            event.type = TRACE_EVENT_SLEEP;
1932            event.seconds = 0.0001;
1933            event.size = 0;
1934        }
1935       
1936        /* If we get here we have our event */
1937        break;
1938    } while (1);
1939
1940    return event;
1941}
1942
1943
1944static void dpdk_help(void) {
1945    printf("dpdk format module: $Revision: 1752 $\n");
1946    printf("Supported input URIs:\n");
1947    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1948    printf("\tThe -<coreid> is optional \n");
1949    printf("\t e.g. dpdk:0000:01:00.1\n");
1950    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1951    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1952    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1953    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1954    printf("\n");
1955    printf("Supported output URIs:\n");
1956    printf("\tSame format as the input URI.\n");
1957    printf("\t e.g. dpdk:0000:01:00.1\n");
1958    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1959    printf("\n");
1960}
1961
1962static struct libtrace_format_t dpdk = {
1963        "dpdk",
1964        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1965        TRACE_FORMAT_DPDK,
1966        NULL,                   /* probe filename */
1967        NULL,                               /* probe magic */
1968        dpdk_init_input,            /* init_input */
1969        dpdk_config_input,          /* config_input */
1970        dpdk_start_input,           /* start_input */
1971        dpdk_pause_input,           /* pause_input */
1972        dpdk_init_output,           /* init_output */
1973        NULL,                               /* config_output */
1974        dpdk_start_output,          /* start_ouput */
1975        dpdk_fin_input,             /* fin_input */
1976        dpdk_fin_output,        /* fin_output */
1977        dpdk_read_packet,           /* read_packet */
1978        dpdk_prepare_packet,    /* prepare_packet */
1979        dpdk_fin_packet,                                    /* fin_packet */
1980        dpdk_write_packet,          /* write_packet */
1981        dpdk_get_link_type,         /* get_link_type */
1982        dpdk_get_direction,         /* get_direction */
1983        dpdk_set_direction,         /* set_direction */
1984        NULL,                               /* get_erf_timestamp */
1985        dpdk_get_timeval,           /* get_timeval */
1986        dpdk_get_timespec,          /* get_timespec */
1987        NULL,                               /* get_seconds */
1988        NULL,                               /* seek_erf */
1989        NULL,                               /* seek_timeval */
1990        NULL,                               /* seek_seconds */
1991        dpdk_get_capture_length,/* get_capture_length */
1992        dpdk_get_wire_length,   /* get_wire_length */
1993        dpdk_get_framing_length,/* get_framing_length */
1994        dpdk_set_capture_length,/* set_capture_length */
1995        NULL,                               /* get_received_packets */
1996        dpdk_get_filtered_packets,/* get_filtered_packets */
1997        dpdk_get_dropped_packets,/* get_dropped_packets */
1998    dpdk_get_captured_packets,/* get_captured_packets */
1999        NULL,                       /* get_fd */
2000        dpdk_trace_event,               /* trace_event */
2001    dpdk_help,              /* help */
2002    NULL,                   /* next pointer */
2003    {true, 8},              /* Live, NICs typically have 8 threads */
2004    dpdk_pstart_input, /* pstart_input */
2005        dpdk_pread_packet, /* pread_packet */
2006        dpdk_pause_input, /* ppause */
2007        dpdk_fin_input, /* p_fin */
2008        dpdk_pconfig_input, /* pconfig_input */
2009    dpdk_pregister_thread, /* pregister_thread */
2010    dpdk_punregister_thread /* unpregister_thread */
2011};
2012
2013void dpdk_constructor(void) {
2014        register_format(&dpdk);
2015}
Note: See TracBrowser for help on using the repository browser.