source: lib/format_dpdk.c @ 4bdc4c2

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 4bdc4c2 was 4bdc4c2, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

DPDK - Allocate memory from the same NUMA node as the main thread
Otherwise this seems to cause some problems when attempting to
allocate the memory. This may be a DPDK bug?

For best performance the main thread CPU core should be
choosen such that it is on the same NUMA node as the NIC.

  • Property mode set to 100644
File size: 52.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#include "config.h"
44#include "libtrace.h"
45#include "libtrace_int.h"
46#include "format_helper.h"
47#include "libtrace_arphrd.h"
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
55#include <stdlib.h>
56#include <assert.h>
57#include <unistd.h>
58#include <endian.h>
59#include <string.h>
60
61/* We can deal with any minor differences by checking the RTE VERSION
62 * Typically DPDK backports some fixes (typically for building against
63 * newer kernels) to the older version of DPDK.
64 *
65 * These get released with the rX suffix. The following macros where added
66 * in these new releases.
67 *
68 * Below this is a log of version that required changes to the libtrace
69 * code (that we still attempt to support).
70 *
71 * Currently 1.5 to 1.7 is supported.
72 */
73#include <rte_eal.h>
74#include <rte_version.h>
75#ifndef RTE_VERSION_NUM
76#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
77#endif
78#ifndef RTE_VER_PATCH_RELEASE
79#       define RTE_VER_PATCH_RELEASE 0
80#endif
81#ifndef RTE_VERSION
82#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
83        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
84#endif
85
86/* 1.6.0r2 :
87 *      rte_eal_pci_set_blacklist() is removed
88 *      device_list is renamed to pci_device_list
89 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
90 *      as such we do apply the whitelist before rte_eal_init.
91 *      This also works correctly with DPDK 1.6.0r2.
92 *
93 * Replaced by:
94 *      rte_devargs (we can simply whitelist)
95 */
96#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
97#       define DPDK_USE_BLACKLIST 1
98#else
99#       define DPDK_USE_BLACKLIST 0
100#endif
101
102/*
103 * 1.7.0 :
104 *      rte_pmd_init_all is removed
105 *
106 * Replaced by:
107 *      Nothing, no longer needed
108 */
109#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
110#       define DPDK_USE_PMD_INIT 1
111#else
112#       define DPDK_USE_PMD_INIT 0
113#endif
114
115#include <rte_per_lcore.h>
116#include <rte_debug.h>
117#include <rte_errno.h>
118#include <rte_common.h>
119#include <rte_log.h>
120#include <rte_memcpy.h>
121#include <rte_prefetch.h>
122#include <rte_branch_prediction.h>
123#include <rte_pci.h>
124#include <rte_ether.h>
125#include <rte_ethdev.h>
126#include <rte_ring.h>
127#include <rte_mempool.h>
128#include <rte_mbuf.h>
129
130/* The default size of memory buffers to use - This is the max size of standard
131 * ethernet packet less the size of the MAC CHECKSUM */
132#define RX_MBUF_SIZE 1514
133
134/* The minimum number of memory buffers per queue tx or rx. Search for
135 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
136 */
137#define MIN_NB_BUF 64
138
139/* Number of receive memory buffers to use
140 * By default this is limited by driver to 4k and must be a multiple of 128.
141 * A modification can be made to the driver to remove this limit.
142 * This can be increased in the driver and here.
143 * Should be at least MIN_NB_BUF.
144 */
145#define NB_RX_MBUF 4096
146
147/* Number of send memory buffers to use.
148 * Same limits apply as those to NB_TX_MBUF.
149 */
150#define NB_TX_MBUF 1024
151
152/* The size of the PCI blacklist needs to be big enough to contain
153 * every PCI device address (listed by lspci every bus:device.function tuple).
154 */
155#define BLACK_LIST_SIZE 50
156
157/* The maximum number of characters the mempool name can be */
158#define MEMPOOL_NAME_LEN 20
159
160#define MBUF(x) ((struct rte_mbuf *) x)
161/* Get the original placement of the packet data */
162#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
163#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
164#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
165                        (uint64_t) tv.tv_usec*1000ull)
166#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
167                        (uint64_t) ts.tv_nsec)
168
169#if RTE_PKTMBUF_HEADROOM != 128
170#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
171         "any libtrace instance processing these packet must be have the" \
172         "same RTE_PKTMBUF_HEADROOM set"
173#endif
174
175/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
176 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
177 *
178 * Make sure you understand what these are doing before enabling them.
179 * They might make traces incompatable with other builds etc.
180 *
181 * These are also included to show how to do somethings which aren't
182 * obvious in the DPDK documentation.
183 */
184
185/* Print verbose messages to stdout */
186#define DEBUG 0
187
188/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
189 * only turn on if you know clock_gettime is a vsyscall on your system
190 * overwise could be a large overhead. Again gettimeofday() should be
191 * vsyscall also if it's not you should seriously consider updating your
192 * kernel.
193 */
194#ifdef HAVE_LIBRT
195/* You can turn this on (set to 1) to prefer clock_gettime */
196#define USE_CLOCK_GETTIME 0
197#else
198/* DONT CHANGE THIS !!! */
199#define USE_CLOCK_GETTIME 0
200#endif
201
202/* This is fairly safe to turn on - currently there appears to be a 'bug'
203 * in DPDK that will remove the checksum by making the packet appear 4bytes
204 * smaller than what it really is. Most formats don't include the checksum
205 * hence writing out a port such as int: ring: and dpdk: assumes there
206 * is no checksum and will attempt to write the checksum as part of the
207 * packet
208 */
209#define GET_MAC_CRC_CHECKSUM 0
210
211/* This requires a modification of the pmd drivers (inside Intel DPDK)
212 */
213#define HAS_HW_TIMESTAMPS_82580 0
214
215#if HAS_HW_TIMESTAMPS_82580
216# define TS_NBITS_82580     40
217/* The maximum on the +ve or -ve side that we can be, make it half way */
218# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
219#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
220#endif
221
222/* As per Intel 82580 specification - mismatch in 82580 datasheet
223 * it states ts is stored in Big Endian, however its actually Little */
224struct hw_timestamp_82580 {
225    uint64_t reserved;
226    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
227};
228
229enum paused_state {
230    DPDK_NEVER_STARTED,
231    DPDK_RUNNING,
232    DPDK_PAUSED,
233};
234
235/* Used by both input and output however some fields are not used
236 * for output */
237struct dpdk_format_data_t {
238    int8_t promisc; /* promiscuous mode - RX only */
239    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
240    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
241    uint8_t paused; /* See paused_state */ 
242    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
243    int snaplen; /* The snap length for the capture - RX only */
244    /* We always have to setup both rx and tx queues even if we don't want them */
245    int nb_rx_buf; /* The number of packet buffers in the rx ring */
246    int nb_tx_buf; /* The number of packet buffers in the tx ring */
247    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
248#if DPDK_USE_BLACKLIST
249    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
250        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
251#endif
252    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
253#if HAS_HW_TIMESTAMPS_82580
254    /* Timestamping only relevent to RX */
255    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
256    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
257    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
258#endif
259};
260
261enum dpdk_addt_hdr_flags {
262    INCLUDES_CHECKSUM = 0x1,
263    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
264};
265
266/**
267 * A structure placed in front of the packet where we can store
268 * additional information about the given packet.
269 * +--------------------------+
270 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
271 * +--------------------------+
272 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
273 * +--------------------------+
274 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
275 * +--------------------------+
276 * |   sizeof(dpdk_addt_hdr)  | 1 byte
277 * +--------------------------+
278 * *   hw_timestamp_82580     * 16 bytes Optional
279 * +--------------------------+
280 * |       Packet data        | Variable Size
281 * |                          |
282 */
283struct dpdk_addt_hdr {
284    uint64_t timestamp;
285    uint8_t flags;
286    uint8_t direction;
287    uint8_t reserved1;
288    uint8_t reserved2;
289    uint32_t cap_len; /* The size to say the capture is */
290};
291
292/**
293 * We want to blacklist all devices except those on the whitelist
294 * (I say list, but yes it is only the one).
295 *
296 * The default behaviour of rte_pci_probe() will map every possible device
297 * to its DPDK driver. The DPDK driver will take the ethernet device
298 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
299 *
300 * So blacklist all devices except the one that we wish to use so that
301 * the others can still be used as standard ethernet ports.
302 *
303 * @return 0 if successful, otherwise -1 on error.
304 */
305#if DPDK_USE_BLACKLIST
306static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
307{
308        struct rte_pci_device *dev = NULL;
309        format_data->nb_blacklist = 0;
310
311        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
312
313        TAILQ_FOREACH(dev, &device_list, next) {
314        if (whitelist != NULL && whitelist->domain == dev->addr.domain
315            && whitelist->bus == dev->addr.bus
316            && whitelist->devid == dev->addr.devid
317            && whitelist->function == dev->addr.function)
318            continue;
319                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
320                                / sizeof (format_data->blacklist[0])) {
321                        printf("Warning: too many devices to blacklist consider"
322                                        " increasing BLACK_LIST_SIZE");
323                        break;
324                }
325                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
326                ++format_data->nb_blacklist;
327        }
328
329        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
330        return 0;
331}
332#else /* DPDK_USE_BLACKLIST */
333#include <rte_devargs.h>
334static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
335{
336        char pci_str[20] = {0};
337        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
338                 whitelist->domain,
339                 whitelist->bus,
340                 whitelist->devid,
341                 whitelist->function);
342        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
343                return -1;
344        }
345        return 0;
346}
347#endif
348
349/**
350 * Parse the URI format as a pci address
351 * Fills in addr, note core is optional and is unchanged if
352 * a value for it is not provided.
353 *
354 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
355 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
356 */
357static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
358    int matches;
359    assert(str);
360    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
361                     &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
362    if (matches >= 4) {
363        return 0;
364    } else {
365        return -1;
366    }
367}
368
369#if DEBUG
370/* For debugging */
371static inline void dump_configuration()
372{
373    struct rte_config * global_config;
374    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
375
376    if (nb_cpu <= 0) {
377        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
378        nb_cpu = 1; /* fallback to just 1 core */
379    }
380    if (nb_cpu > RTE_MAX_LCORE)
381        nb_cpu = RTE_MAX_LCORE;
382
383    global_config = rte_eal_get_configuration();
384
385    if (global_config != NULL) {
386        int i;
387        fprintf(stderr, "Intel DPDK setup\n"
388               "---Version      : %s\n"
389               "---Master LCore : %"PRIu32"\n"
390               "---LCore Count  : %"PRIu32"\n",
391               rte_version(),
392               global_config->master_lcore, global_config->lcore_count);
393
394        for (i = 0 ; i < nb_cpu; i++) {
395            fprintf(stderr, "   ---Core %d : %s\n", i,
396                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
397        }
398
399        const char * proc_type;
400        switch (global_config->process_type) {
401            case RTE_PROC_AUTO:
402                proc_type = "auto";
403                break;
404            case RTE_PROC_PRIMARY:
405                proc_type = "primary";
406                break;
407            case RTE_PROC_SECONDARY:
408                proc_type = "secondary";
409                break;
410            case RTE_PROC_INVALID:
411                proc_type = "invalid";
412                break;
413            default:
414                proc_type = "something worse than invalid!!";
415        }
416        fprintf(stderr, "---Process Type : %s\n", proc_type);
417    }
418
419}
420#endif
421
422/**
423 * XXX This is very bad XXX
424 * But we have to do something to allow getopts nesting
425 * Luckly normally the format is last so it doesn't matter
426 * DPDK only supports modern systems so hopefully this
427 * will continue to work
428 */
429struct saved_getopts {
430        char *optarg;
431        int optind;
432        int opterr;
433        int optopt;
434};
435
436static void save_getopts(struct saved_getopts *opts) {
437        opts->optarg = optarg;
438        opts->optind = optind;
439        opts->opterr = opterr;
440        opts->optopt = optopt;
441}
442
443static void restore_getopts(struct saved_getopts *opts) {
444        optarg = opts->optarg;
445        optind = opts->optind;
446        opterr = opts->opterr;
447        optopt = opts->optopt;
448}
449
450static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
451                                        char * err, int errlen) {
452    int ret; /* Returned error codes */
453    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
454    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
455    char mem_map[20] = {0}; /* The memory name */
456    long nb_cpu; /* The number of CPUs in the system */
457    long my_cpu; /* The CPU number we want to bind to */
458        struct saved_getopts save_opts;
459   
460#if DEBUG
461    rte_set_log_level(RTE_LOG_DEBUG);
462#else
463    rte_set_log_level(RTE_LOG_WARNING);
464#endif
465    /*
466     * Using unique file prefixes mean separate memory is used, unlinking
467     * the two processes. However be careful we still cannot access a
468     * port that already in use.
469     */
470    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
471                "--file-prefix", mem_map, "-m", "256", NULL};
472    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
473
474    /* This initialises the Environment Abstraction Layer (EAL)
475     * If we had slave workers these are put into WAITING state
476     *
477     * Basically binds this thread to a fixed core, which we choose as
478     * the last core on the machine (assuming fewer interrupts mapped here).
479     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
480     * "-n" the number of memory channels into the CPU (hardware specific)
481     *      - Most likely to be half the number of ram slots in your machine.
482     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
483     * Controls where in memory packets are stored and should spread across
484     * the channels. We just use 1 to be safe.
485     */
486
487    /* Get the number of cpu cores in the system and use the last core */
488    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
489    if (nb_cpu <= 0) {
490        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
491        nb_cpu = 1; /* fallback to the first core */
492    }
493    if (nb_cpu > RTE_MAX_LCORE)
494        nb_cpu = RTE_MAX_LCORE;
495
496    my_cpu = nb_cpu;
497    /* This allows the user to specify the core - we would try to do this
498     * automatically but it's hard to tell that this is secondary
499     * before running rte_eal_init(...). Currently we are limited to 1
500     * instance per core due to the way memory is allocated. */
501    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
502        snprintf(err, errlen, "Failed to parse URI");
503        return -1;
504    }
505
506    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
507                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
508
509    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
510        snprintf(err, errlen, 
511          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
512          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
513        return -1;
514    }
515
516    /* Make our mask */
517    snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
518
519#if !DPDK_USE_BLACKLIST
520    /* Black list all ports besides the one that we want to use */
521    if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
522        snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
523                 " are you sure the address is correct?: %s", strerror(-ret));
524        return -1;
525    }
526#endif
527
528        /* Give the memory map a unique name */
529        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
530    /* rte_eal_init it makes a call to getopt so we need to reset the
531     * global optind variable of getopt otherwise this fails */
532        save_getopts(&save_opts);
533    optind = 1;
534    if ((ret = rte_eal_init(argc, argv)) < 0) {
535        snprintf(err, errlen, 
536          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
537        return -1;
538    }
539        restore_getopts(&save_opts);
540
541#if DEBUG
542    dump_configuration();
543#endif
544
545#if DPDK_USE_PMD_INIT
546    /* This registers all available NICs with Intel DPDK
547     * These are not loaded until rte_eal_pci_probe() is called.
548     */
549    if ((ret = rte_pmd_init_all()) < 0) {
550        snprintf(err, errlen, 
551          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
552        return -1;
553    }
554#endif
555
556#if DPDK_USE_BLACKLIST
557    /* Blacklist all ports besides the one that we want to use */
558        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
559                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
560                         " are you sure the address is correct?: %s", strerror(-ret));
561                return -1;
562        }
563#endif
564
565    /* This loads DPDK drivers against all ports that are not blacklisted */
566        if ((ret = rte_eal_pci_probe()) < 0) {
567        snprintf(err, errlen, 
568            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
569        return -1;
570    }
571
572    format_data->nb_ports = rte_eth_dev_count();
573
574    if (format_data->nb_ports != 1) {
575        snprintf(err, errlen, 
576            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
577            format_data->nb_ports);
578        return -1;
579    }
580
581    return 0;
582}
583
584static int dpdk_init_input (libtrace_t *libtrace) {
585    char err[500];
586    err[0] = 0;
587   
588    libtrace->format_data = (struct dpdk_format_data_t *)
589                            malloc(sizeof(struct dpdk_format_data_t));
590    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
591    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
592    FORMAT(libtrace)->nb_ports = 0;
593    FORMAT(libtrace)->snaplen = 0; /* Use default */
594    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
595    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
596    FORMAT(libtrace)->promisc = -1;
597    FORMAT(libtrace)->pktmbuf_pool = NULL;
598#if DPDK_USE_BLACKLIST
599    FORMAT(libtrace)->nb_blacklist = 0;
600#endif
601    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
602    FORMAT(libtrace)->mempool_name[0] = 0;
603#if HAS_HW_TIMESTAMPS_82580
604    FORMAT(libtrace)->ts_first_sys = 0;
605    FORMAT(libtrace)->ts_last_sys = 0;
606    FORMAT(libtrace)->wrap_count = 0;
607#endif
608
609    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
610        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
611        free(libtrace->format_data);
612        libtrace->format_data = NULL;
613        return -1;
614    }
615    return 0;
616};
617
618static int dpdk_init_output(libtrace_out_t *libtrace)
619{
620    char err[500];
621    err[0] = 0;
622   
623    libtrace->format_data = (struct dpdk_format_data_t *)
624                            malloc(sizeof(struct dpdk_format_data_t));
625    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
626    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
627    FORMAT(libtrace)->nb_ports = 0;
628    FORMAT(libtrace)->snaplen = 0; /* Use default */
629    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
630    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
631    FORMAT(libtrace)->promisc = -1;
632    FORMAT(libtrace)->pktmbuf_pool = NULL;
633#if DPDK_USE_BLACKLIST
634    FORMAT(libtrace)->nb_blacklist = 0;
635#endif
636    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
637    FORMAT(libtrace)->mempool_name[0] = 0;
638#if HAS_HW_TIMESTAMPS_82580
639    FORMAT(libtrace)->ts_first_sys = 0;
640    FORMAT(libtrace)->ts_last_sys = 0;
641    FORMAT(libtrace)->wrap_count = 0;
642#endif
643
644    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
645        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
646        free(libtrace->format_data);
647        libtrace->format_data = NULL;
648        return -1;
649    }
650    return 0;
651};
652
653/**
654 * Note here snaplen excludes the MAC checksum. Packets over
655 * the requested snaplen will be dropped. (Excluding MAC checksum)
656 *
657 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
658 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
659 * is set the maximum size of the returned packet would be 1518 otherwise
660 * 1514 would be the largest size possibly returned.
661 *
662 */
663static int dpdk_config_input (libtrace_t *libtrace,
664                                        trace_option_t option,
665                                        void *data) {
666    switch (option) {
667        case TRACE_OPTION_SNAPLEN:
668            /* Only support changing snaplen before a call to start is
669             * made */
670            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
671                FORMAT(libtrace)->snaplen=*(int*)data;
672            else
673                return -1;
674            return 0;
675                case TRACE_OPTION_PROMISC:
676                        FORMAT(libtrace)->promisc=*(int*)data;
677            return 0;
678        case TRACE_OPTION_FILTER:
679            /* TODO filtering */
680            break;
681        case TRACE_OPTION_META_FREQ:
682            break;
683        case TRACE_OPTION_EVENT_REALTIME:
684            break;
685        /* Avoid default: so that future options will cause a warning
686         * here to remind us to implement it, or flag it as
687         * unimplementable
688         */
689    }
690
691        /* Don't set an error - trace_config will try to deal with the
692         * option and will set an error if it fails */
693    return -1;
694}
695
696/* Can set jumbo frames/ or limit the size of a frame by setting both
697 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
698 *
699 */
700static struct rte_eth_conf port_conf = {
701        .rxmode = {
702                .split_hdr_size = 0,
703                .header_split   = 0, /**< Header Split disabled */
704                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
705                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
706                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
707        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
708#if GET_MAC_CRC_CHECKSUM
709/* So it appears that if hw_strip_crc is turned off the driver will still
710 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
711 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
712 * So lets just add it back on when we receive the packet.
713 */
714                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
715#else
716/* By default strip the MAC checksum because it's a bit of a hack to
717 * actually read these. And don't want to rely on disabling this to actualy
718 * always cut off the checksum in the future
719 */
720        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
721#endif
722        },
723        .txmode = {
724                .mq_mode = ETH_DCB_NONE,
725        },
726};
727
728static const struct rte_eth_rxconf rx_conf = {
729        .rx_thresh = {
730                .pthresh = 8,/* RX_PTHRESH prefetch */
731                .hthresh = 8,/* RX_HTHRESH host */
732                .wthresh = 4,/* RX_WTHRESH writeback */
733        },
734    .rx_free_thresh = 0,
735    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
736};
737
738static const struct rte_eth_txconf tx_conf = {
739        .tx_thresh = {
740        /**
741         * TX_PTHRESH prefetch
742         * Set on the NIC, if the number of unprocessed descriptors to queued on
743         * the card fall below this try grab at least hthresh more unprocessed
744         * descriptors.
745         */
746                .pthresh = 36,
747
748        /* TX_HTHRESH host
749         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
750         */
751                .hthresh = 0,
752       
753        /* TX_WTHRESH writeback
754         * Set on the NIC, the number of sent descriptors before writing back
755         * status to confirm the transmission. This is done more efficiently as
756         * a bulk DMA-transfer rather than writing one at a time.
757         * Similar to tx_free_thresh however this is applied to the NIC, where
758         * as tx_free_thresh is when DPDK will check these. This is extended
759         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
760         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
761         */
762                .wthresh = 4,
763        },
764
765    /* Used internally by DPDK rather than passed to the NIC. The number of
766     * packet descriptors to send before checking for any responses written
767     * back (to confirm the transmission). Default = 32 if set to 0)
768     */
769        .tx_free_thresh = 0,
770
771    /* This is the Report Status threshold, used by 10Gbit cards,
772     * This signals the card to only write back status (such as
773     * transmission successful) after this minimum number of transmit
774     * descriptors are seen. The default is 32 (if set to 0) however if set
775     * to greater than 1 TX wthresh must be set to zero, because this is kindof
776     * a replacement. See the dpdk programmers guide for more restrictions.
777     */
778        .tx_rs_thresh = 1,
779};
780
781/* Attach memory to the port and start the port or restart the port.
782 */
783static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
784    int ret; /* Check return values for errors */
785    struct rte_eth_link link_info; /* Wait for link */
786   
787    /* Already started */
788    if (format_data->paused == DPDK_RUNNING)
789        return 0;
790
791    /* First time started we need to alloc our memory, doing this here
792     * rather than in environment setup because we don't have snaplen then */
793    if (format_data->paused == DPDK_NEVER_STARTED) {
794        if (format_data->snaplen == 0) {
795            format_data->snaplen = RX_MBUF_SIZE;
796            port_conf.rxmode.jumbo_frame = 0;
797            port_conf.rxmode.max_rx_pkt_len = 0;
798        } else {
799            /* Use jumbo frames */
800            port_conf.rxmode.jumbo_frame = 1;
801            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
802        }
803
804        /* This is additional overhead so make sure we allow space for this */
805#if GET_MAC_CRC_CHECKSUM
806        format_data->snaplen += ETHER_CRC_LEN;
807#endif
808#if HAS_HW_TIMESTAMPS_82580
809        format_data->snaplen += sizeof(struct hw_timestamp_82580);
810#endif
811
812        /* Create the mbuf pool, which is the place our packets are allocated
813         * from - TODO figure out if there is is a free function (I cannot see one)
814         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
815         * allocate however that extra 1 packet is not used.
816         * (I assume <= vs < error some where in DPDK code)
817         * TX requires nb_tx_buffers + 1 in the case the queue is full
818         * so that will fill the new buffer and wait until slots in the
819         * ring become available.
820         */
821#if DEBUG
822    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
823#endif
824        format_data->pktmbuf_pool =
825            rte_mempool_create(format_data->mempool_name,
826                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
827                       format_data->snaplen + sizeof(struct rte_mbuf) 
828                                        + RTE_PKTMBUF_HEADROOM,
829                       8, sizeof(struct rte_pktmbuf_pool_private),
830                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
831                       rte_socket_id(), MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
832
833        if (format_data->pktmbuf_pool == NULL) {
834            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
835                        "pool failed: %s", strerror(rte_errno));
836            return -1;
837        }
838    }
839   
840    /* ----------- Now do the setup for the port mapping ------------ */
841    /* Order of calls must be
842     * rte_eth_dev_configure()
843     * rte_eth_tx_queue_setup()
844     * rte_eth_rx_queue_setup()
845     * rte_eth_dev_start()
846     * other rte_eth calls
847     */
848   
849    /* This must be called first before another *eth* function
850     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
851    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
852    if (ret < 0) {
853        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
854                            " %"PRIu8" : %s", format_data->port,
855                            strerror(-ret));
856        return -1;
857    }
858    /* Initialise the TX queue a minimum value if using this port for
859     * receiving. Otherwise a larger size if writing packets.
860     */
861    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
862                        format_data->nb_tx_buf, rte_socket_id(), &tx_conf);
863    if (ret < 0) {
864        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
865                            " %"PRIu8" : %s", format_data->port,
866                            strerror(-ret));
867        return -1;
868    }
869    /* Initialise the RX queue with some packets from memory */
870    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
871                            format_data->nb_rx_buf, rte_socket_id(),
872                            &rx_conf, format_data->pktmbuf_pool);
873    if (ret < 0) {
874        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
875                    " %"PRIu8" : %s", format_data->port,
876                    strerror(-ret));
877        return -1;
878    }
879   
880    /* Start device */
881    ret = rte_eth_dev_start(format_data->port);
882    if (ret < 0) {
883        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
884                    strerror(-ret));
885        return -1;
886    }
887
888    /* Default promiscuous to on */
889    if (format_data->promisc == -1)
890        format_data->promisc = 1;
891   
892    if (format_data->promisc == 1)
893        rte_eth_promiscuous_enable(format_data->port);
894    else
895        rte_eth_promiscuous_disable(format_data->port);
896   
897    /* Wait for the link to come up */
898    rte_eth_link_get(format_data->port, &link_info);
899#if DEBUG
900    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
901            (int) link_info.link_duplex, (int) link_info.link_speed);
902#endif
903
904    /* We have now successfully started/unpaused */
905    format_data->paused = DPDK_RUNNING;
906   
907    return 0;
908}
909
910static int dpdk_start_input (libtrace_t *libtrace) {
911    char err[500];
912    err[0] = 0;
913
914    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
915        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
916        free(libtrace->format_data);
917        libtrace->format_data = NULL;
918        return -1;
919    }
920    return 0;
921}
922
923static int dpdk_start_output(libtrace_out_t *libtrace)
924{
925    char err[500];
926    err[0] = 0;
927   
928    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
929        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
930        free(libtrace->format_data);
931        libtrace->format_data = NULL;
932        return -1;
933    }
934    return 0;
935}
936
937static int dpdk_pause_input(libtrace_t * libtrace){
938    /* This stops the device, but can be restarted using rte_eth_dev_start() */
939    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
940#if DEBUG     
941        fprintf(stderr, "Pausing port\n");
942#endif
943        rte_eth_dev_stop(FORMAT(libtrace)->port);
944        FORMAT(libtrace)->paused = DPDK_PAUSED;
945        /* If we pause it the driver will be reset and likely our counter */
946#if HAS_HW_TIMESTAMPS_82580
947        FORMAT(libtrace)->ts_first_sys = 0;
948        FORMAT(libtrace)->ts_last_sys = 0;
949#endif
950    }
951    return 0;
952}
953
954static int dpdk_write_packet(libtrace_out_t *trace, 
955                libtrace_packet_t *packet){
956    struct rte_mbuf* m_buff[1];
957   
958    int wirelen = trace_get_wire_length(packet);
959    int caplen = trace_get_capture_length(packet);
960   
961    /* Check for a checksum and remove it */
962    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
963                                            wirelen == caplen)
964        caplen -= ETHER_CRC_LEN;
965
966    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
967    if (m_buff[0] == NULL) {
968        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
969        return -1;
970    } else {
971        int ret;
972        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
973        do {
974            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
975        } while (ret != 1);
976    }
977
978    return 0;
979}
980
981static int dpdk_fin_input(libtrace_t * libtrace) {
982    /* Free our memory structures */
983    if (libtrace->format_data != NULL) {
984        /* Close the device completely, device cannot be restarted */
985        if (FORMAT(libtrace)->port != 0xFF)
986            rte_eth_dev_close(FORMAT(libtrace)->port);
987        /* filter here if we used it */
988                free(libtrace->format_data);
989        }
990
991    /* Revert to the original PCI drivers */
992    /* No longer in DPDK
993    rte_eal_pci_exit(); */
994    return 0;
995}
996
997
998static int dpdk_fin_output(libtrace_out_t * libtrace) {
999    /* Free our memory structures */
1000    if (libtrace->format_data != NULL) {
1001        /* Close the device completely, device cannot be restarted */
1002        if (FORMAT(libtrace)->port != 0xFF)
1003            rte_eth_dev_close(FORMAT(libtrace)->port);
1004        /* filter here if we used it */
1005                free(libtrace->format_data);
1006        }
1007
1008    /* Revert to the original PCI drivers */
1009    /* No longer in DPDK
1010    rte_eal_pci_exit(); */
1011    return 0;
1012}
1013
1014/**
1015 * Get the start of additional header that we added to a packet.
1016 */
1017static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1018    uint8_t *hdrsize;
1019    assert(packet);
1020    assert(packet->buffer);
1021    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
1022    /* The byte before the original packet data denotes the size in bytes
1023     * of our additional header that we added sits before the 'size byte' */
1024    hdrsize--;
1025    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
1026}
1027
1028static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1029    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1030    return hdr->cap_len;
1031}
1032
1033static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1034    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1035    if (size > hdr->cap_len) {
1036        /* Cannot make a packet bigger */
1037                return trace_get_capture_length(packet);
1038        }
1039
1040    /* Reset the cached capture length first*/
1041    packet->capture_length = -1;
1042    hdr->cap_len = (uint32_t) size;
1043        return trace_get_capture_length(packet);
1044}
1045
1046static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1047    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1048    int org_cap_size; /* The original capture size */
1049    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1050        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1051                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
1052                            sizeof(struct hw_timestamp_82580);
1053    } else {
1054        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1055                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
1056    }
1057    if (hdr->flags & INCLUDES_CHECKSUM) {
1058        return org_cap_size;
1059    } else {
1060        /* DPDK packets are always TRACE_TYPE_ETH packets */
1061        return org_cap_size + ETHER_CRC_LEN;
1062    }
1063}
1064static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1065    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1066    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1067        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1068                sizeof(struct hw_timestamp_82580);
1069    else
1070        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1071}
1072
1073static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1074                libtrace_packet_t *packet, void *buffer,
1075                libtrace_rt_types_t rt_type, uint32_t flags) {
1076    assert(packet);
1077    if (packet->buffer != buffer &&
1078        packet->buf_control == TRACE_CTRL_PACKET) {
1079        free(packet->buffer);
1080    }
1081
1082    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1083        packet->buf_control = TRACE_CTRL_PACKET;
1084    } else
1085        packet->buf_control = TRACE_CTRL_EXTERNAL;
1086
1087    packet->buffer = buffer;
1088    packet->header = buffer;
1089
1090    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1091    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1092    packet->type = rt_type;
1093    return 0;
1094}
1095
1096/*
1097 * Does any extra preperation to a captured packet.
1098 * This includes adding our extra header to it with the timestamp
1099 */
1100static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1101                                                        struct rte_mbuf* pkt){
1102    uint8_t * hdr_size;
1103    struct dpdk_addt_hdr *hdr;
1104#if HAS_HW_TIMESTAMPS_82580
1105    struct hw_timestamp_82580 *hw_ts;
1106    struct timeval cur_sys_time;
1107    uint64_t cur_sys_time_ns;
1108    uint64_t estimated_wraps;
1109   
1110    /* Using gettimeofday because it's most likely to be a vsyscall
1111     * We don't want to slow down anything with systemcalls we dont need
1112     * accauracy */
1113    gettimeofday(&cur_sys_time, NULL);
1114#else
1115# if USE_CLOCK_GETTIME
1116    struct timespec cur_sys_time;
1117   
1118    /* This looks terrible and I feel bad doing it. But it's OK
1119     * on new kernels, because this is a vsyscall */
1120    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1121# else
1122    struct timeval cur_sys_time;
1123    /* Should be a vsyscall */
1124    gettimeofday(&cur_sys_time, NULL);
1125# endif
1126#endif
1127
1128    /* Record the size of our header */
1129    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1130    *hdr_size = sizeof(struct dpdk_addt_hdr);
1131    /* Now put our header in front of that size */
1132    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1133    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1134   
1135#if GET_MAC_CRC_CHECKSUM
1136    /* Add back in the CRC sum */
1137    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1138    pkt->pkt.data_len += ETHER_CRC_LEN;
1139    hdr->flags |= INCLUDES_CHECKSUM;
1140#endif
1141
1142#if HAS_HW_TIMESTAMPS_82580
1143    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1144     *
1145     *        +----------+---+   +--------------+
1146     *  82580 |    24    | 8 |   |      32      |
1147     *        +----------+---+   +--------------+
1148     *          reserved  \______ 40 bits _____/
1149     *
1150     * The 40 bit 82580 SYSTIM overflows every
1151     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1152     *
1153     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1154     * Endian (for the full 64 bits) i.e. picture is mirrored
1155     */
1156   
1157    /* The timestamp is sitting before our packet and is included in pkt_len */
1158    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1159    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1160   
1161    /* Despite what the documentation says this is in Little
1162     * Endian byteorder. Mask the reserved section out.
1163     */
1164    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1165                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1166               
1167    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1168    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1169        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1170        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1171    }
1172   
1173    /* This will have serious problems if packets aren't read quickly
1174     * that is within a couple of seconds because our clock cycles every
1175     * 18 seconds */
1176    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1177                            / (1ull<<TS_NBITS_82580);
1178   
1179    /* Estimated_wraps gives the number of times the counter should have
1180     * wrapped (however depending on value last time it could have wrapped
1181     * twice more (if hw clock is close to its max value) or once less (allowing
1182     * for a bit of variance between hw and sys clock). But if the clock
1183     * shouldn't have wrapped once then don't allow it to go backwards in time */
1184    if (unlikely(estimated_wraps >= 2)) {
1185        /* 2 or more wrap arounds add all but the very last wrap */
1186        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1187    }
1188   
1189    /* Set the timestamp to the lowest possible value we're considering */
1190    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1191                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1192   
1193    /* In most runs only the first if() will need evaluating - i.e our
1194     * estimate is correct. */
1195    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1196                                hdr->timestamp, MAXSKEW_82580))) {
1197        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1198        FORMAT(libtrace)->wrap_count++;
1199        hdr->timestamp += (1ull<<TS_NBITS_82580);
1200        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1201                                hdr->timestamp, MAXSKEW_82580)) {
1202            /* Failed to match estimated_wraps */
1203            FORMAT(libtrace)->wrap_count++;
1204            hdr->timestamp += (1ull<<TS_NBITS_82580);
1205            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1206                                hdr->timestamp, MAXSKEW_82580)) {
1207                if (estimated_wraps == 0) {
1208                    /* 0 case Failed to match estimated_wraps+2 */
1209                    printf("WARNING - Hardware Timestamp failed to"
1210                                            " match using systemtime!\n");
1211                    hdr->timestamp = cur_sys_time_ns;
1212                } else {
1213                    /* Failed to match estimated_wraps+1 */
1214                    FORMAT(libtrace)->wrap_count++;
1215                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1216                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1217                                hdr->timestamp, MAXSKEW_82580)) {
1218                        /* Failed to match estimated_wraps+2 */
1219                        printf("WARNING - Hardware Timestamp failed to"
1220                                            " match using systemtime!!\n");
1221                    }
1222                }
1223            }
1224        }
1225    }
1226
1227    /* Log our previous for the next loop */
1228    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1229
1230#else
1231# if USE_CLOCK_GETTIME
1232    hdr->timestamp = TS_TO_NS(cur_sys_time);
1233# else
1234    hdr->timestamp = TV_TO_NS(cur_sys_time);
1235# endif
1236#endif
1237
1238    /* Intels samples prefetch into level 0 cache lets assume it is a good
1239     * idea and do the same */
1240    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1241    packet->buffer = pkt;
1242    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1243
1244    /* Set our capture length for the first time */
1245    hdr->cap_len = dpdk_get_wire_length(packet);
1246    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1247        hdr->cap_len -= ETHER_CRC_LEN;
1248    }
1249   
1250
1251    return dpdk_get_framing_length(packet) +
1252                        dpdk_get_capture_length(packet);
1253}
1254
1255static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1256    int nb_rx; /* Number of rx packets we've recevied */
1257    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1258
1259    /* Free the last packet buffer */
1260    if (packet->buffer != NULL) {
1261        /* Buffer is owned by DPDK */
1262        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1263            rte_pktmbuf_free(packet->buffer);
1264            packet->buffer = NULL;
1265        } else
1266        /* Buffer is owned by packet i.e. has been malloc'd */
1267        if (packet->buf_control == TRACE_CTRL_PACKET) {
1268            free(packet->buffer);
1269            packet->buffer = NULL;
1270        }
1271    }
1272   
1273    packet->buf_control = TRACE_CTRL_EXTERNAL;
1274    packet->type = TRACE_RT_DATA_DPDK;
1275   
1276    /* Wait for a packet */
1277    while (1) {
1278        /* Poll for a single packet */
1279        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1280                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1281        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1282            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1283        }
1284    }
1285   
1286    /* We'll never get here - but if we did it would be bad */
1287    return -1;
1288}
1289
1290static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1291    struct timeval tv;
1292    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1293   
1294    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1295    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1296    return tv;
1297}
1298
1299static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1300    struct timespec ts;
1301    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1302   
1303    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1304    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1305    return ts;
1306}
1307
1308static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1309    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1310}
1311
1312static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1313    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1314    return (libtrace_direction_t) hdr->direction;
1315}
1316
1317static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1318    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1319    hdr->direction = (uint8_t) direction;
1320    return (libtrace_direction_t) hdr->direction;
1321}
1322
1323/*
1324 * NOTE: Drops could occur for other reasons than running out of buffer
1325 * space. Such as failed MAC checksums and oversized packets.
1326 */
1327static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1328    struct rte_eth_stats stats = {0};
1329   
1330    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1331        return UINT64_MAX;
1332    /* Grab the current stats */
1333    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1334   
1335    /* Get the drop counter */
1336    return (uint64_t) stats.ierrors;
1337}
1338
1339static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1340    struct rte_eth_stats stats = {0};
1341   
1342    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1343        return UINT64_MAX;
1344    /* Grab the current stats */
1345    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1346   
1347    /* Get the drop counter */
1348    return (uint64_t) stats.ipackets;
1349}
1350
1351/*
1352 * This is the number of packets filtered by the NIC
1353 * and maybe ahead of number read using libtrace.
1354 *
1355 * XXX we are yet to implement any filtering, but if it was this should
1356 * get the result. So this will just return 0 for now.
1357 */
1358static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1359    struct rte_eth_stats stats = {0};
1360   
1361    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1362        return UINT64_MAX;
1363    /* Grab the current stats */
1364    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1365   
1366    /* Get the drop counter */
1367    return (uint64_t) stats.fdirmiss;
1368}
1369
1370/* Attempts to read a packet in a non-blocking fashion. If one is not
1371 * available a SLEEP event is returned. We do not have the ability to
1372 * create a select()able file descriptor in DPDK.
1373 */
1374static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1375                                        libtrace_packet_t *packet) {
1376    libtrace_eventobj_t event = {0,0,0.0,0};
1377    int nb_rx; /* Number of receive packets we've read */
1378    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1379   
1380    do {
1381   
1382        /* See if we already have a packet waiting */
1383        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1384                        FORMAT(trace)->queue_id, pkts_burst, 1);
1385       
1386        if (nb_rx > 0) {
1387            /* Free the last packet buffer */
1388            if (packet->buffer != NULL) {
1389                /* Buffer is owned by DPDK */
1390                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1391                    rte_pktmbuf_free(packet->buffer);
1392                    packet->buffer = NULL;
1393                } else
1394                /* Buffer is owned by packet i.e. has been malloc'd */
1395                if (packet->buf_control == TRACE_CTRL_PACKET) {
1396                    free(packet->buffer);
1397                    packet->buffer = NULL;
1398                }
1399            }
1400           
1401            packet->buf_control = TRACE_CTRL_EXTERNAL;
1402            packet->type = TRACE_RT_DATA_DPDK;
1403            event.type = TRACE_EVENT_PACKET;
1404            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1405           
1406            /* XXX - Check this passes the filter trace_read_packet normally
1407             * does this for us but this wont */
1408            if (trace->filter) {
1409                if (!trace_apply_filter(trace->filter, packet)) {
1410                    /* Failed the filter so we loop for another packet */
1411                    trace->filtered_packets ++;
1412                    continue;
1413                }
1414            }
1415            trace->accepted_packets ++;
1416        } else {
1417            /* We only want to sleep for a very short time - we are non-blocking */
1418            event.type = TRACE_EVENT_SLEEP;
1419            event.seconds = 0.0001;
1420            event.size = 0;
1421        }
1422       
1423        /* If we get here we have our event */
1424        break;
1425    } while (1);
1426
1427    return event;
1428}
1429
1430
1431static void dpdk_help(void) {
1432    printf("dpdk format module: $Revision: 1752 $\n");
1433    printf("Supported input URIs:\n");
1434    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1435    printf("\tThe -<coreid> is optional \n");
1436    printf("\t e.g. dpdk:0000:01:00.1\n");
1437    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1438    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1439    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1440    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1441    printf("\n");
1442    printf("Supported output URIs:\n");
1443    printf("\tSame format as the input URI.\n");
1444    printf("\t e.g. dpdk:0000:01:00.1\n");
1445    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1446    printf("\n");
1447}
1448
1449 static struct libtrace_format_t dpdk = {
1450        "dpdk",
1451        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1452        TRACE_FORMAT_DPDK,
1453        NULL,                   /* probe filename */
1454        NULL,                               /* probe magic */
1455        dpdk_init_input,            /* init_input */
1456        dpdk_config_input,          /* config_input */
1457        dpdk_start_input,           /* start_input */
1458        dpdk_pause_input,           /* pause_input */
1459        dpdk_init_output,           /* init_output */
1460        NULL,                               /* config_output */
1461        dpdk_start_output,          /* start_ouput */
1462        dpdk_fin_input,             /* fin_input */
1463        dpdk_fin_output,        /* fin_output */
1464        dpdk_read_packet,           /* read_packet */
1465        dpdk_prepare_packet,    /* prepare_packet */
1466        NULL,                               /* fin_packet */
1467        dpdk_write_packet,          /* write_packet */
1468        dpdk_get_link_type,         /* get_link_type */
1469        dpdk_get_direction,         /* get_direction */
1470        dpdk_set_direction,         /* set_direction */
1471        NULL,                               /* get_erf_timestamp */
1472        dpdk_get_timeval,           /* get_timeval */
1473        dpdk_get_timespec,          /* get_timespec */
1474        NULL,                               /* get_seconds */
1475        NULL,                               /* seek_erf */
1476        NULL,                               /* seek_timeval */
1477        NULL,                               /* seek_seconds */
1478        dpdk_get_capture_length,/* get_capture_length */
1479        dpdk_get_wire_length,   /* get_wire_length */
1480        dpdk_get_framing_length,/* get_framing_length */
1481        dpdk_set_capture_length,/* set_capture_length */
1482        NULL,                               /* get_received_packets */
1483        dpdk_get_filtered_packets,/* get_filtered_packets */
1484        dpdk_get_dropped_packets,/* get_dropped_packets */
1485    dpdk_get_captured_packets,/* get_captured_packets */
1486        NULL,                       /* get_fd */
1487        dpdk_trace_event,               /* trace_event */
1488    dpdk_help,              /* help */
1489        NULL
1490};
1491
1492void dpdk_constructor(void) {
1493        register_format(&dpdk);
1494}
Note: See TracBrowser for help on using the repository browser.