source: lib/format_dpdk.c @ 136c19e

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 136c19e was 136c19e, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Fix parsing of DPDK PCI addresses which should be hex not base 10.

This also simplifies that code by using scanf, however one minor
downside of this is that the core mask after the "-" can be
set to a non-number and it will be ignored rather than alerting
the user that it was invalid.

  • Property mode set to 100644
File size: 52.3 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#include "config.h"
44#include "libtrace.h"
45#include "libtrace_int.h"
46#include "format_helper.h"
47#include "libtrace_arphrd.h"
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
55#include <stdlib.h>
56#include <assert.h>
57#include <unistd.h>
58#include <endian.h>
59#include <string.h>
60
61/* We can deal with any minor differences by checking the RTE VERSION
62 * Typically DPDK backports some fixes (typically for building against
63 * newer kernels) to the older version of DPDK.
64 *
65 * These get released with the rX suffix. The following macros where added
66 * in these new releases.
67 *
68 * Below this is a log of version that required changes to the libtrace
69 * code (that we still attempt to support).
70 *
71 * Currently 1.5 to 1.7 is supported.
72 */
73#include <rte_eal.h>
74#include <rte_version.h>
75#ifndef RTE_VERSION_NUM
76#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
77#endif
78#ifndef RTE_VER_PATCH_RELEASE
79#       define RTE_VER_PATCH_RELEASE 0
80#endif
81#ifndef RTE_VERSION
82#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
83        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
84#endif
85
86/* 1.6.0r2 :
87 *      rte_eal_pci_set_blacklist() is removed
88 *      device_list is renamed to pci_device_list
89 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
90 *      as such we do apply the whitelist before rte_eal_init.
91 *      This also works correctly with DPDK 1.6.0r2.
92 *
93 * Replaced by:
94 *      rte_devargs (we can simply whitelist)
95 */
96#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
97#       define DPDK_USE_BLACKLIST 1
98#else
99#       define DPDK_USE_BLACKLIST 0
100#endif
101
102/*
103 * 1.7.0 :
104 *      rte_pmd_init_all is removed
105 *
106 * Replaced by:
107 *      Nothing, no longer needed
108 */
109#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
110#       define DPDK_USE_PMD_INIT 1
111#else
112#       define DPDK_USE_PMD_INIT 0
113#endif
114
115#include <rte_per_lcore.h>
116#include <rte_debug.h>
117#include <rte_errno.h>
118#include <rte_common.h>
119#include <rte_log.h>
120#include <rte_memcpy.h>
121#include <rte_prefetch.h>
122#include <rte_branch_prediction.h>
123#include <rte_pci.h>
124#include <rte_ether.h>
125#include <rte_ethdev.h>
126#include <rte_ring.h>
127#include <rte_mempool.h>
128#include <rte_mbuf.h>
129
130/* The default size of memory buffers to use - This is the max size of standard
131 * ethernet packet less the size of the MAC CHECKSUM */
132#define RX_MBUF_SIZE 1514
133
134/* The minimum number of memory buffers per queue tx or rx. Search for
135 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
136 */
137#define MIN_NB_BUF 64
138
139/* Number of receive memory buffers to use
140 * By default this is limited by driver to 4k and must be a multiple of 128.
141 * A modification can be made to the driver to remove this limit.
142 * This can be increased in the driver and here.
143 * Should be at least MIN_NB_BUF.
144 */
145#define NB_RX_MBUF 4096
146
147/* Number of send memory buffers to use.
148 * Same limits apply as those to NB_TX_MBUF.
149 */
150#define NB_TX_MBUF 1024
151
152/* The size of the PCI blacklist needs to be big enough to contain
153 * every PCI device address (listed by lspci every bus:device.function tuple).
154 */
155#define BLACK_LIST_SIZE 50
156
157/* The maximum number of characters the mempool name can be */
158#define MEMPOOL_NAME_LEN 20
159
160#define MBUF(x) ((struct rte_mbuf *) x)
161/* Get the original placement of the packet data */
162#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
163#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
164#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
165                        (uint64_t) tv.tv_usec*1000ull)
166#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
167                        (uint64_t) ts.tv_nsec)
168
169#if RTE_PKTMBUF_HEADROOM != 128
170#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
171         "any libtrace instance processing these packet must be have the" \
172         "same RTE_PKTMBUF_HEADROOM set"
173#endif
174
175/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
176 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
177 *
178 * Make sure you understand what these are doing before enabling them.
179 * They might make traces incompatable with other builds etc.
180 *
181 * These are also included to show how to do somethings which aren't
182 * obvious in the DPDK documentation.
183 */
184
185/* Print verbose messages to stdout */
186#define DEBUG 0
187
188/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
189 * only turn on if you know clock_gettime is a vsyscall on your system
190 * overwise could be a large overhead. Again gettimeofday() should be
191 * vsyscall also if it's not you should seriously consider updating your
192 * kernel.
193 */
194#ifdef HAVE_LIBRT
195/* You can turn this on (set to 1) to prefer clock_gettime */
196#define USE_CLOCK_GETTIME 0
197#else
198/* DONT CHANGE THIS !!! */
199#define USE_CLOCK_GETTIME 0
200#endif
201
202/* This is fairly safe to turn on - currently there appears to be a 'bug'
203 * in DPDK that will remove the checksum by making the packet appear 4bytes
204 * smaller than what it really is. Most formats don't include the checksum
205 * hence writing out a port such as int: ring: and dpdk: assumes there
206 * is no checksum and will attempt to write the checksum as part of the
207 * packet
208 */
209#define GET_MAC_CRC_CHECKSUM 0
210
211/* This requires a modification of the pmd drivers (inside Intel DPDK)
212 */
213#define HAS_HW_TIMESTAMPS_82580 0
214
215#if HAS_HW_TIMESTAMPS_82580
216# define TS_NBITS_82580     40
217/* The maximum on the +ve or -ve side that we can be, make it half way */
218# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
219#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
220#endif
221
222/* As per Intel 82580 specification - mismatch in 82580 datasheet
223 * it states ts is stored in Big Endian, however its actually Little */
224struct hw_timestamp_82580 {
225    uint64_t reserved;
226    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
227};
228
229enum paused_state {
230    DPDK_NEVER_STARTED,
231    DPDK_RUNNING,
232    DPDK_PAUSED,
233};
234
235/* Used by both input and output however some fields are not used
236 * for output */
237struct dpdk_format_data_t {
238    int8_t promisc; /* promiscuous mode - RX only */
239    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
240    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
241    uint8_t paused; /* See paused_state */ 
242    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
243    int snaplen; /* The snap length for the capture - RX only */
244    /* We always have to setup both rx and tx queues even if we don't want them */
245    int nb_rx_buf; /* The number of packet buffers in the rx ring */
246    int nb_tx_buf; /* The number of packet buffers in the tx ring */
247    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
248#if DPDK_USE_BLACKLIST
249    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
250        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
251#endif
252    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
253#if HAS_HW_TIMESTAMPS_82580
254    /* Timestamping only relevent to RX */
255    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
256    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
257    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
258#endif
259};
260
261enum dpdk_addt_hdr_flags {
262    INCLUDES_CHECKSUM = 0x1,
263    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
264};
265
266/**
267 * A structure placed in front of the packet where we can store
268 * additional information about the given packet.
269 * +--------------------------+
270 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
271 * +--------------------------+
272 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
273 * +--------------------------+
274 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
275 * +--------------------------+
276 * |   sizeof(dpdk_addt_hdr)  | 1 byte
277 * +--------------------------+
278 * *   hw_timestamp_82580     * 16 bytes Optional
279 * +--------------------------+
280 * |       Packet data        | Variable Size
281 * |                          |
282 */
283struct dpdk_addt_hdr {
284    uint64_t timestamp;
285    uint8_t flags;
286    uint8_t direction;
287    uint8_t reserved1;
288    uint8_t reserved2;
289    uint32_t cap_len; /* The size to say the capture is */
290};
291
292/**
293 * We want to blacklist all devices except those on the whitelist
294 * (I say list, but yes it is only the one).
295 *
296 * The default behaviour of rte_pci_probe() will map every possible device
297 * to its DPDK driver. The DPDK driver will take the ethernet device
298 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
299 *
300 * So blacklist all devices except the one that we wish to use so that
301 * the others can still be used as standard ethernet ports.
302 *
303 * @return 0 if successful, otherwise -1 on error.
304 */
305#if DPDK_USE_BLACKLIST
306static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
307{
308        struct rte_pci_device *dev = NULL;
309        format_data->nb_blacklist = 0;
310
311        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
312
313        TAILQ_FOREACH(dev, &device_list, next) {
314        if (whitelist != NULL && whitelist->domain == dev->addr.domain
315            && whitelist->bus == dev->addr.bus
316            && whitelist->devid == dev->addr.devid
317            && whitelist->function == dev->addr.function)
318            continue;
319                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
320                                / sizeof (format_data->blacklist[0])) {
321                        printf("Warning: too many devices to blacklist consider"
322                                        " increasing BLACK_LIST_SIZE");
323                        break;
324                }
325                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
326                ++format_data->nb_blacklist;
327        }
328
329        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
330        return 0;
331}
332#else /* DPDK_USE_BLACKLIST */
333#include <rte_devargs.h>
334static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
335{
336        char pci_str[20] = {0};
337        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
338                 whitelist->domain,
339                 whitelist->bus,
340                 whitelist->devid,
341                 whitelist->function);
342        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
343                return -1;
344        }
345        return 0;
346}
347#endif
348
349/**
350 * Parse the URI format as a pci address
351 * Fills in addr, note core is optional and is unchanged if
352 * a value for it is not provided.
353 *
354 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
355 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
356 */
357static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
358    int matches;
359    assert(str);
360    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
361                     &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
362    if (matches >= 4) {
363        return 0;
364    } else {
365        return -1;
366    }
367}
368
369#if DEBUG
370/* For debugging */
371static inline void dump_configuration()
372{
373    struct rte_config * global_config;
374    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
375
376    if (nb_cpu <= 0) {
377        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
378        nb_cpu = 1; /* fallback to just 1 core */
379    }
380    if (nb_cpu > RTE_MAX_LCORE)
381        nb_cpu = RTE_MAX_LCORE;
382
383    global_config = rte_eal_get_configuration();
384
385    if (global_config != NULL) {
386        int i;
387        fprintf(stderr, "Intel DPDK setup\n"
388               "---Version      : %s\n"
389               "---Master LCore : %"PRIu32"\n"
390               "---LCore Count  : %"PRIu32"\n",
391               rte_version(),
392               global_config->master_lcore, global_config->lcore_count);
393
394        for (i = 0 ; i < nb_cpu; i++) {
395            fprintf(stderr, "   ---Core %d : %s\n", i,
396                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
397        }
398
399        const char * proc_type;
400        switch (global_config->process_type) {
401            case RTE_PROC_AUTO:
402                proc_type = "auto";
403                break;
404            case RTE_PROC_PRIMARY:
405                proc_type = "primary";
406                break;
407            case RTE_PROC_SECONDARY:
408                proc_type = "secondary";
409                break;
410            case RTE_PROC_INVALID:
411                proc_type = "invalid";
412                break;
413            default:
414                proc_type = "something worse than invalid!!";
415        }
416        fprintf(stderr, "---Process Type : %s\n", proc_type);
417    }
418
419}
420#endif
421
422/**
423 * XXX This is very bad XXX
424 * But we have to do something to allow getopts nesting
425 * Luckly normally the format is last so it doesn't matter
426 * DPDK only supports modern systems so hopefully this
427 * will continue to work
428 */
429struct saved_getopts {
430        char *optarg;
431        int optind;
432        int opterr;
433        int optopt;
434};
435
436static void save_getopts(struct saved_getopts *opts) {
437        opts->optarg = optarg;
438        opts->optind = optind;
439        opts->opterr = opterr;
440        opts->optopt = optopt;
441}
442
443static void restore_getopts(struct saved_getopts *opts) {
444        optarg = opts->optarg;
445        optind = opts->optind;
446        opterr = opts->opterr;
447        optopt = opts->optopt;
448}
449
450static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
451                                        char * err, int errlen) {
452    int ret; /* Returned error codes */
453    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
454    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
455    char mem_map[20] = {0}; /* The memory name */
456    long nb_cpu; /* The number of CPUs in the system */
457    long my_cpu; /* The CPU number we want to bind to */
458        struct saved_getopts save_opts;
459   
460#if DEBUG
461    rte_set_log_level(RTE_LOG_DEBUG);
462#else
463    rte_set_log_level(RTE_LOG_WARNING);
464#endif
465    /*
466     * Using unique file prefixes mean separate memory is used, unlinking
467     * the two processes. However be careful we still cannot access a
468     * port that already in use.
469     */
470    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
471                "--file-prefix", mem_map, "-m", "256", NULL};
472    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
473
474    /* This initialises the Environment Abstraction Layer (EAL)
475     * If we had slave workers these are put into WAITING state
476     *
477     * Basically binds this thread to a fixed core, which we choose as
478     * the last core on the machine (assuming fewer interrupts mapped here).
479     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
480     * "-n" the number of memory channels into the CPU (hardware specific)
481     *      - Most likely to be half the number of ram slots in your machine.
482     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
483     * Controls where in memory packets are stored and should spread across
484     * the channels. We just use 1 to be safe.
485     */
486
487    /* Get the number of cpu cores in the system and use the last core */
488    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
489    if (nb_cpu <= 0) {
490        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
491        nb_cpu = 1; /* fallback to the first core */
492    }
493    if (nb_cpu > RTE_MAX_LCORE)
494        nb_cpu = RTE_MAX_LCORE;
495
496    my_cpu = nb_cpu;
497    /* This allows the user to specify the core - we would try to do this
498     * automatically but it's hard to tell that this is secondary
499     * before running rte_eal_init(...). Currently we are limited to 1
500     * instance per core due to the way memory is allocated. */
501    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
502        snprintf(err, errlen, "Failed to parse URI");
503        return -1;
504    }
505
506    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
507                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
508
509    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
510        snprintf(err, errlen, 
511          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
512          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
513        return -1;
514    }
515
516    /* Make our mask */
517    snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
518
519#if !DPDK_USE_BLACKLIST
520    /* Black list all ports besides the one that we want to use */
521    if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
522        snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
523                 " are you sure the address is correct?: %s", strerror(-ret));
524        return -1;
525    }
526#endif
527
528        /* Give the memory map a unique name */
529        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
530    /* rte_eal_init it makes a call to getopt so we need to reset the
531     * global optind variable of getopt otherwise this fails */
532        save_getopts(&save_opts);
533    optind = 1;
534    if ((ret = rte_eal_init(argc, argv)) < 0) {
535        snprintf(err, errlen, 
536          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
537        return -1;
538    }
539        restore_getopts(&save_opts);
540
541#if DEBUG
542    dump_configuration();
543#endif
544
545#if DPDK_USE_PMD_INIT
546    /* This registers all available NICs with Intel DPDK
547     * These are not loaded until rte_eal_pci_probe() is called.
548     */
549    if ((ret = rte_pmd_init_all()) < 0) {
550        snprintf(err, errlen, 
551          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
552        return -1;
553    }
554#endif
555
556#if DPDK_USE_BLACKLIST
557    /* Blacklist all ports besides the one that we want to use */
558        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
559                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
560                         " are you sure the address is correct?: %s", strerror(-ret));
561                return -1;
562        }
563#endif
564
565    /* This loads DPDK drivers against all ports that are not blacklisted */
566        if ((ret = rte_eal_pci_probe()) < 0) {
567        snprintf(err, errlen, 
568            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
569        return -1;
570    }
571
572    format_data->nb_ports = rte_eth_dev_count();
573
574    if (format_data->nb_ports != 1) {
575        snprintf(err, errlen, 
576            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
577            format_data->nb_ports);
578        return -1;
579    }
580
581    return 0;
582}
583
584static int dpdk_init_input (libtrace_t *libtrace) {
585    char err[500];
586    err[0] = 0;
587   
588    libtrace->format_data = (struct dpdk_format_data_t *)
589                            malloc(sizeof(struct dpdk_format_data_t));
590    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
591    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
592    FORMAT(libtrace)->nb_ports = 0;
593    FORMAT(libtrace)->snaplen = 0; /* Use default */
594    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
595    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
596    FORMAT(libtrace)->promisc = -1;
597    FORMAT(libtrace)->pktmbuf_pool = NULL;
598#if DPDK_USE_BLACKLIST
599    FORMAT(libtrace)->nb_blacklist = 0;
600#endif
601    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
602    FORMAT(libtrace)->mempool_name[0] = 0;
603#if HAS_HW_TIMESTAMPS_82580
604    FORMAT(libtrace)->ts_first_sys = 0;
605    FORMAT(libtrace)->ts_last_sys = 0;
606    FORMAT(libtrace)->wrap_count = 0;
607#endif
608
609    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
610        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
611        free(libtrace->format_data);
612        libtrace->format_data = NULL;
613        return -1;
614    }
615    return 0;
616};
617
618static int dpdk_init_output(libtrace_out_t *libtrace)
619{
620    char err[500];
621    err[0] = 0;
622   
623    libtrace->format_data = (struct dpdk_format_data_t *)
624                            malloc(sizeof(struct dpdk_format_data_t));
625    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
626    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
627    FORMAT(libtrace)->nb_ports = 0;
628    FORMAT(libtrace)->snaplen = 0; /* Use default */
629    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
630    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
631    FORMAT(libtrace)->promisc = -1;
632    FORMAT(libtrace)->pktmbuf_pool = NULL;
633#if DPDK_USE_BLACKLIST
634    FORMAT(libtrace)->nb_blacklist = 0;
635#endif
636    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
637    FORMAT(libtrace)->mempool_name[0] = 0;
638#if HAS_HW_TIMESTAMPS_82580
639    FORMAT(libtrace)->ts_first_sys = 0;
640    FORMAT(libtrace)->ts_last_sys = 0;
641    FORMAT(libtrace)->wrap_count = 0;
642#endif
643
644    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
645        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
646        free(libtrace->format_data);
647        libtrace->format_data = NULL;
648        return -1;
649    }
650    return 0;
651};
652
653/**
654 * Note here snaplen excludes the MAC checksum. Packets over
655 * the requested snaplen will be dropped. (Excluding MAC checksum)
656 *
657 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
658 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
659 * is set the maximum size of the returned packet would be 1518 otherwise
660 * 1514 would be the largest size possibly returned.
661 *
662 */
663static int dpdk_config_input (libtrace_t *libtrace,
664                                        trace_option_t option,
665                                        void *data) {
666    switch (option) {
667        case TRACE_OPTION_SNAPLEN:
668            /* Only support changing snaplen before a call to start is
669             * made */
670            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
671                FORMAT(libtrace)->snaplen=*(int*)data;
672            else
673                return -1;
674            return 0;
675                case TRACE_OPTION_PROMISC:
676                        FORMAT(libtrace)->promisc=*(int*)data;
677            return 0;
678        case TRACE_OPTION_FILTER:
679            /* TODO filtering */
680            break;
681        case TRACE_OPTION_META_FREQ:
682            break;
683        case TRACE_OPTION_EVENT_REALTIME:
684            break;
685        /* Avoid default: so that future options will cause a warning
686         * here to remind us to implement it, or flag it as
687         * unimplementable
688         */
689    }
690
691        /* Don't set an error - trace_config will try to deal with the
692         * option and will set an error if it fails */
693    return -1;
694}
695
696/* Can set jumbo frames/ or limit the size of a frame by setting both
697 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
698 *
699 */
700static struct rte_eth_conf port_conf = {
701        .rxmode = {
702                .split_hdr_size = 0,
703                .header_split   = 0, /**< Header Split disabled */
704                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
705                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
706                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
707        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
708#if GET_MAC_CRC_CHECKSUM
709/* So it appears that if hw_strip_crc is turned off the driver will still
710 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
711 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
712 * So lets just add it back on when we receive the packet.
713 */
714                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
715#else
716/* By default strip the MAC checksum because it's a bit of a hack to
717 * actually read these. And don't want to rely on disabling this to actualy
718 * always cut off the checksum in the future
719 */
720        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
721#endif
722        },
723        .txmode = {
724                .mq_mode = ETH_DCB_NONE,
725        },
726};
727
728static const struct rte_eth_rxconf rx_conf = {
729        .rx_thresh = {
730                .pthresh = 8,/* RX_PTHRESH prefetch */
731                .hthresh = 8,/* RX_HTHRESH host */
732                .wthresh = 4,/* RX_WTHRESH writeback */
733        },
734    .rx_free_thresh = 0,
735    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
736};
737
738static const struct rte_eth_txconf tx_conf = {
739        .tx_thresh = {
740        /**
741         * TX_PTHRESH prefetch
742         * Set on the NIC, if the number of unprocessed descriptors to queued on
743         * the card fall below this try grab at least hthresh more unprocessed
744         * descriptors.
745         */
746                .pthresh = 36,
747
748        /* TX_HTHRESH host
749         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
750         */
751                .hthresh = 0,
752       
753        /* TX_WTHRESH writeback
754         * Set on the NIC, the number of sent descriptors before writing back
755         * status to confirm the transmission. This is done more efficiently as
756         * a bulk DMA-transfer rather than writing one at a time.
757         * Similar to tx_free_thresh however this is applied to the NIC, where
758         * as tx_free_thresh is when DPDK will check these. This is extended
759         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
760         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
761         */
762                .wthresh = 4,
763        },
764
765    /* Used internally by DPDK rather than passed to the NIC. The number of
766     * packet descriptors to send before checking for any responses written
767     * back (to confirm the transmission). Default = 32 if set to 0)
768     */
769        .tx_free_thresh = 0,
770
771    /* This is the Report Status threshold, used by 10Gbit cards,
772     * This signals the card to only write back status (such as
773     * transmission successful) after this minimum number of transmit
774     * descriptors are seen. The default is 32 (if set to 0) however if set
775     * to greater than 1 TX wthresh must be set to zero, because this is kindof
776     * a replacement. See the dpdk programmers guide for more restrictions.
777     */
778        .tx_rs_thresh = 1,
779};
780
781/* Attach memory to the port and start the port or restart the port.
782 */
783static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
784    int ret; /* Check return values for errors */
785    struct rte_eth_link link_info; /* Wait for link */
786   
787    /* Already started */
788    if (format_data->paused == DPDK_RUNNING)
789        return 0;
790
791    /* First time started we need to alloc our memory, doing this here
792     * rather than in environment setup because we don't have snaplen then */
793    if (format_data->paused == DPDK_NEVER_STARTED) {
794        if (format_data->snaplen == 0) {
795            format_data->snaplen = RX_MBUF_SIZE;
796            port_conf.rxmode.jumbo_frame = 0;
797            port_conf.rxmode.max_rx_pkt_len = 0;
798        } else {
799            /* Use jumbo frames */
800            port_conf.rxmode.jumbo_frame = 1;
801            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
802        }
803
804        /* This is additional overhead so make sure we allow space for this */
805#if GET_MAC_CRC_CHECKSUM
806        format_data->snaplen += ETHER_CRC_LEN;
807#endif
808#if HAS_HW_TIMESTAMPS_82580
809        format_data->snaplen += sizeof(struct hw_timestamp_82580);
810#endif
811
812        /* Create the mbuf pool, which is the place our packets are allocated
813         * from - TODO figure out if there is is a free function (I cannot see one)
814         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
815         * allocate however that extra 1 packet is not used.
816         * (I assume <= vs < error some where in DPDK code)
817         * TX requires nb_tx_buffers + 1 in the case the queue is full
818         * so that will fill the new buffer and wait until slots in the
819         * ring become available.
820         */
821#if DEBUG
822    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
823#endif
824        format_data->pktmbuf_pool =
825            rte_mempool_create(format_data->mempool_name,
826                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
827                       format_data->snaplen + sizeof(struct rte_mbuf) 
828                                        + RTE_PKTMBUF_HEADROOM,
829                       8, sizeof(struct rte_pktmbuf_pool_private),
830                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
831                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
832
833        if (format_data->pktmbuf_pool == NULL) {
834            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
835                        "pool failed: %s", strerror(rte_errno));
836            return -1;
837        }
838    }
839   
840    /* ----------- Now do the setup for the port mapping ------------ */
841    /* Order of calls must be
842     * rte_eth_dev_configure()
843     * rte_eth_tx_queue_setup()
844     * rte_eth_rx_queue_setup()
845     * rte_eth_dev_start()
846     * other rte_eth calls
847     */
848   
849    /* This must be called first before another *eth* function
850     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
851    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
852    if (ret < 0) {
853        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
854                            " %"PRIu8" : %s", format_data->port,
855                            strerror(-ret));
856        return -1;
857    }
858    /* Initialise the TX queue a minimum value if using this port for
859     * receiving. Otherwise a larger size if writing packets.
860     */
861    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
862                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
863    if (ret < 0) {
864        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
865                            " %"PRIu8" : %s", format_data->port,
866                            strerror(-ret));
867        return -1;
868    }
869    /* Initialise the RX queue with some packets from memory */
870    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
871                            format_data->nb_rx_buf, SOCKET_ID_ANY, 
872                            &rx_conf, format_data->pktmbuf_pool);
873    if (ret < 0) {
874        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
875                    " %"PRIu8" : %s", format_data->port,
876                    strerror(-ret));
877        return -1;
878    }
879   
880    /* Start device */
881    ret = rte_eth_dev_start(format_data->port);
882    if (ret < 0) {
883        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
884                    strerror(-ret));
885        return -1;
886    }
887
888    /* Default promiscuous to on */
889    if (format_data->promisc == -1)
890        format_data->promisc = 1;
891   
892    if (format_data->promisc == 1)
893        rte_eth_promiscuous_enable(format_data->port);
894    else
895        rte_eth_promiscuous_disable(format_data->port);
896   
897    /* Wait for the link to come up */
898    rte_eth_link_get(format_data->port, &link_info);
899#if DEBUG
900    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
901            (int) link_info.link_duplex, (int) link_info.link_speed);
902#endif
903
904    /* We have now successfully started/unpaused */
905    format_data->paused = DPDK_RUNNING;
906   
907    return 0;
908}
909
910static int dpdk_start_input (libtrace_t *libtrace) {
911    char err[500];
912    err[0] = 0;
913
914    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
915        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
916        free(libtrace->format_data);
917        libtrace->format_data = NULL;
918        return -1;
919    }
920    return 0;
921}
922
923static int dpdk_start_output(libtrace_out_t *libtrace)
924{
925    char err[500];
926    err[0] = 0;
927   
928    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
929        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
930        free(libtrace->format_data);
931        libtrace->format_data = NULL;
932        return -1;
933    }
934    return 0;
935}
936
937static int dpdk_pause_input(libtrace_t * libtrace){
938    /* This stops the device, but can be restarted using rte_eth_dev_start() */
939    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
940#if DEBUG     
941        fprintf(stderr, "Pausing port\n");
942#endif
943        rte_eth_dev_stop(FORMAT(libtrace)->port);
944        FORMAT(libtrace)->paused = DPDK_PAUSED;
945        /* If we pause it the driver will be reset and likely our counter */
946#if HAS_HW_TIMESTAMPS_82580
947        FORMAT(libtrace)->ts_first_sys = 0;
948        FORMAT(libtrace)->ts_last_sys = 0;
949#endif
950    }
951    return 0;
952}
953
954static int dpdk_write_packet(libtrace_out_t *trace, 
955                libtrace_packet_t *packet){
956    struct rte_mbuf* m_buff[1];
957   
958    int wirelen = trace_get_wire_length(packet);
959    int caplen = trace_get_capture_length(packet);
960   
961    /* Check for a checksum and remove it */
962    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
963                                            wirelen == caplen)
964        caplen -= ETHER_CRC_LEN;
965
966    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
967    if (m_buff[0] == NULL) {
968        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
969        return -1;
970    } else {
971        int ret;
972        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
973        do {
974            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
975        } while (ret != 1);
976    }
977
978    return 0;
979}
980
981static int dpdk_fin_input(libtrace_t * libtrace) {
982    /* Free our memory structures */
983    if (libtrace->format_data != NULL) {
984        /* Close the device completely, device cannot be restarted */
985        if (FORMAT(libtrace)->port != 0xFF)
986            rte_eth_dev_close(FORMAT(libtrace)->port);
987        /* filter here if we used it */
988                free(libtrace->format_data);
989        }
990
991    /* Revert to the original PCI drivers */
992    /* No longer in DPDK
993    rte_eal_pci_exit(); */
994    return 0;
995}
996
997
998static int dpdk_fin_output(libtrace_out_t * libtrace) {
999    /* Free our memory structures */
1000    if (libtrace->format_data != NULL) {
1001        /* Close the device completely, device cannot be restarted */
1002        if (FORMAT(libtrace)->port != 0xFF)
1003            rte_eth_dev_close(FORMAT(libtrace)->port);
1004        /* filter here if we used it */
1005                free(libtrace->format_data);
1006        }
1007
1008    /* Revert to the original PCI drivers */
1009    /* No longer in DPDK
1010    rte_eal_pci_exit(); */
1011    return 0;
1012}
1013
1014/**
1015 * Get the start of additional header that we added to a packet.
1016 */
1017static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1018    uint8_t *hdrsize;
1019    assert(packet);
1020    assert(packet->buffer);
1021    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
1022    /* The byte before the original packet data denotes the size in bytes
1023     * of our additional header that we added sits before the 'size byte' */
1024    hdrsize--;
1025    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
1026}
1027
1028static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1029    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1030    return hdr->cap_len;
1031}
1032
1033static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1034    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1035    if (size > hdr->cap_len) {
1036        /* Cannot make a packet bigger */
1037                return trace_get_capture_length(packet);
1038        }
1039
1040    /* Reset the cached capture length first*/
1041    packet->capture_length = -1;
1042    hdr->cap_len = (uint32_t) size;
1043        return trace_get_capture_length(packet);
1044}
1045
1046static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1047    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1048    int org_cap_size; /* The original capture size */
1049    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1050        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1051                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
1052                            sizeof(struct hw_timestamp_82580);
1053    } else {
1054        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1055                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
1056    }
1057    if (hdr->flags & INCLUDES_CHECKSUM) {
1058        return org_cap_size;
1059    } else {
1060        /* DPDK packets are always TRACE_TYPE_ETH packets */
1061        return org_cap_size + ETHER_CRC_LEN;
1062    }
1063}
1064static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1065    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1066    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1067        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1068                sizeof(struct hw_timestamp_82580);
1069    else
1070        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1071}
1072
1073static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1074                libtrace_packet_t *packet, void *buffer,
1075                libtrace_rt_types_t rt_type, uint32_t flags) {
1076    assert(packet);
1077    if (packet->buffer != buffer &&
1078        packet->buf_control == TRACE_CTRL_PACKET) {
1079        free(packet->buffer);
1080    }
1081
1082    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1083        packet->buf_control = TRACE_CTRL_PACKET;
1084    } else
1085        packet->buf_control = TRACE_CTRL_EXTERNAL;
1086
1087    packet->buffer = buffer;
1088    packet->header = buffer;
1089
1090    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1091    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1092    packet->type = rt_type;
1093    return 0;
1094}
1095
1096/*
1097 * Does any extra preperation to a captured packet.
1098 * This includes adding our extra header to it with the timestamp
1099 */
1100static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1101                                                        struct rte_mbuf* pkt){
1102    uint8_t * hdr_size;
1103    struct dpdk_addt_hdr *hdr;
1104#if HAS_HW_TIMESTAMPS_82580
1105    struct hw_timestamp_82580 *hw_ts;
1106    struct timeval cur_sys_time;
1107    uint64_t cur_sys_time_ns;
1108    uint64_t estimated_wraps;
1109   
1110    /* Using gettimeofday because it's most likely to be a vsyscall
1111     * We don't want to slow down anything with systemcalls we dont need
1112     * accauracy */
1113    gettimeofday(&cur_sys_time, NULL);
1114#else
1115# if USE_CLOCK_GETTIME
1116    struct timespec cur_sys_time;
1117   
1118    /* This looks terrible and I feel bad doing it. But it's OK
1119     * on new kernels, because this is a vsyscall */
1120    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1121# else
1122    struct timeval cur_sys_time;
1123    /* Should be a vsyscall */
1124    gettimeofday(&cur_sys_time, NULL);
1125# endif
1126#endif
1127
1128    /* Record the size of our header */
1129    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1130    *hdr_size = sizeof(struct dpdk_addt_hdr);
1131    /* Now put our header in front of that size */
1132    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1133    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1134   
1135#if GET_MAC_CRC_CHECKSUM
1136    /* Add back in the CRC sum */
1137    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1138    pkt->pkt.data_len += ETHER_CRC_LEN;
1139    hdr->flags |= INCLUDES_CHECKSUM;
1140#endif
1141
1142#if HAS_HW_TIMESTAMPS_82580
1143    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1144     *
1145     *        +----------+---+   +--------------+
1146     *  82580 |    24    | 8 |   |      32      |
1147     *        +----------+---+   +--------------+
1148     *          reserved  \______ 40 bits _____/
1149     *
1150     * The 40 bit 82580 SYSTIM overflows every
1151     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1152     *
1153     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1154     * Endian (for the full 64 bits) i.e. picture is mirrored
1155     */
1156   
1157    /* The timestamp is sitting before our packet and is included in pkt_len */
1158    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1159    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1160   
1161    /* Despite what the documentation says this is in Little
1162     * Endian byteorder. Mask the reserved section out.
1163     */
1164    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1165                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1166               
1167    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1168    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1169        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1170        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1171    }
1172   
1173    /* This will have serious problems if packets aren't read quickly
1174     * that is within a couple of seconds because our clock cycles every
1175     * 18 seconds */
1176    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1177                            / (1ull<<TS_NBITS_82580);
1178   
1179    /* Estimated_wraps gives the number of times the counter should have
1180     * wrapped (however depending on value last time it could have wrapped
1181     * twice more (if hw clock is close to its max value) or once less (allowing
1182     * for a bit of variance between hw and sys clock). But if the clock
1183     * shouldn't have wrapped once then don't allow it to go backwards in time */
1184    if (unlikely(estimated_wraps >= 2)) {
1185        /* 2 or more wrap arounds add all but the very last wrap */
1186        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1187    }
1188   
1189    /* Set the timestamp to the lowest possible value we're considering */
1190    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1191                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1192   
1193    /* In most runs only the first if() will need evaluating - i.e our
1194     * estimate is correct. */
1195    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1196                                hdr->timestamp, MAXSKEW_82580))) {
1197        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1198        FORMAT(libtrace)->wrap_count++;
1199        hdr->timestamp += (1ull<<TS_NBITS_82580);
1200        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1201                                hdr->timestamp, MAXSKEW_82580)) {
1202            /* Failed to match estimated_wraps */
1203            FORMAT(libtrace)->wrap_count++;
1204            hdr->timestamp += (1ull<<TS_NBITS_82580);
1205            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1206                                hdr->timestamp, MAXSKEW_82580)) {
1207                if (estimated_wraps == 0) {
1208                    /* 0 case Failed to match estimated_wraps+2 */
1209                    printf("WARNING - Hardware Timestamp failed to"
1210                                            " match using systemtime!\n");
1211                    hdr->timestamp = cur_sys_time_ns;
1212                } else {
1213                    /* Failed to match estimated_wraps+1 */
1214                    FORMAT(libtrace)->wrap_count++;
1215                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1216                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1217                                hdr->timestamp, MAXSKEW_82580)) {
1218                        /* Failed to match estimated_wraps+2 */
1219                        printf("WARNING - Hardware Timestamp failed to"
1220                                            " match using systemtime!!\n");
1221                    }
1222                }
1223            }
1224        }
1225    }
1226
1227    /* Log our previous for the next loop */
1228    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1229
1230#else
1231# if USE_CLOCK_GETTIME
1232    hdr->timestamp = TS_TO_NS(cur_sys_time);
1233# else
1234    hdr->timestamp = TV_TO_NS(cur_sys_time);
1235# endif
1236#endif
1237
1238    /* Intels samples prefetch into level 0 cache lets assume it is a good
1239     * idea and do the same */
1240    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1241    packet->buffer = pkt;
1242    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1243
1244    /* Set our capture length for the first time */
1245    hdr->cap_len = dpdk_get_wire_length(packet);
1246    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1247        hdr->cap_len -= ETHER_CRC_LEN;
1248    }
1249   
1250
1251    return dpdk_get_framing_length(packet) +
1252                        dpdk_get_capture_length(packet);
1253}
1254
1255static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1256    int nb_rx; /* Number of rx packets we've recevied */
1257    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1258
1259    /* Free the last packet buffer */
1260    if (packet->buffer != NULL) {
1261        /* Buffer is owned by DPDK */
1262        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1263            rte_pktmbuf_free(packet->buffer);
1264            packet->buffer = NULL;
1265        } else
1266        /* Buffer is owned by packet i.e. has been malloc'd */
1267        if (packet->buf_control == TRACE_CTRL_PACKET) {
1268            free(packet->buffer);
1269            packet->buffer = NULL;
1270        }
1271    }
1272   
1273    packet->buf_control = TRACE_CTRL_EXTERNAL;
1274    packet->type = TRACE_RT_DATA_DPDK;
1275   
1276    /* Wait for a packet */
1277    while (1) {
1278        /* Poll for a single packet */
1279        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1280                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1281        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1282            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1283        }
1284    }
1285   
1286    /* We'll never get here - but if we did it would be bad */
1287    return -1;
1288}
1289
1290static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1291    struct timeval tv;
1292    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1293   
1294    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1295    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1296    return tv;
1297}
1298
1299static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1300    struct timespec ts;
1301    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1302   
1303    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1304    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1305    return ts;
1306}
1307
1308static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1309    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1310}
1311
1312static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1313    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1314    return (libtrace_direction_t) hdr->direction;
1315}
1316
1317static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1318    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1319    hdr->direction = (uint8_t) direction;
1320    return (libtrace_direction_t) hdr->direction;
1321}
1322
1323/*
1324 * NOTE: Drops could occur for other reasons than running out of buffer
1325 * space. Such as failed MAC checksums and oversized packets.
1326 */
1327static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1328    struct rte_eth_stats stats = {0};
1329   
1330    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1331        return UINT64_MAX;
1332    /* Grab the current stats */
1333    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1334   
1335    /* Get the drop counter */
1336    return (uint64_t) stats.ierrors;
1337}
1338
1339static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1340    struct rte_eth_stats stats = {0};
1341   
1342    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1343        return UINT64_MAX;
1344    /* Grab the current stats */
1345    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1346   
1347    /* Get the drop counter */
1348    return (uint64_t) stats.ipackets;
1349}
1350
1351/*
1352 * This is the number of packets filtered by the NIC
1353 * and maybe ahead of number read using libtrace.
1354 *
1355 * XXX we are yet to implement any filtering, but if it was this should
1356 * get the result. So this will just return 0 for now.
1357 */
1358static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1359    struct rte_eth_stats stats = {0};
1360   
1361    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1362        return UINT64_MAX;
1363    /* Grab the current stats */
1364    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1365   
1366    /* Get the drop counter */
1367    return (uint64_t) stats.fdirmiss;
1368}
1369
1370/* Attempts to read a packet in a non-blocking fashion. If one is not
1371 * available a SLEEP event is returned. We do not have the ability to
1372 * create a select()able file descriptor in DPDK.
1373 */
1374static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1375                                        libtrace_packet_t *packet) {
1376    libtrace_eventobj_t event = {0,0,0.0,0};
1377    int nb_rx; /* Number of receive packets we've read */
1378    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1379   
1380    do {
1381   
1382        /* See if we already have a packet waiting */
1383        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1384                        FORMAT(trace)->queue_id, pkts_burst, 1);
1385       
1386        if (nb_rx > 0) {
1387            /* Free the last packet buffer */
1388            if (packet->buffer != NULL) {
1389                /* Buffer is owned by DPDK */
1390                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1391                    rte_pktmbuf_free(packet->buffer);
1392                    packet->buffer = NULL;
1393                } else
1394                /* Buffer is owned by packet i.e. has been malloc'd */
1395                if (packet->buf_control == TRACE_CTRL_PACKET) {
1396                    free(packet->buffer);
1397                    packet->buffer = NULL;
1398                }
1399            }
1400           
1401            packet->buf_control = TRACE_CTRL_EXTERNAL;
1402            packet->type = TRACE_RT_DATA_DPDK;
1403            event.type = TRACE_EVENT_PACKET;
1404            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1405           
1406            /* XXX - Check this passes the filter trace_read_packet normally
1407             * does this for us but this wont */
1408            if (trace->filter) {
1409                if (!trace_apply_filter(trace->filter, packet)) {
1410                    /* Failed the filter so we loop for another packet */
1411                    trace->filtered_packets ++;
1412                    continue;
1413                }
1414            }
1415            trace->accepted_packets ++;
1416        } else {
1417            /* We only want to sleep for a very short time - we are non-blocking */
1418            event.type = TRACE_EVENT_SLEEP;
1419            event.seconds = 0.0001;
1420            event.size = 0;
1421        }
1422       
1423        /* If we get here we have our event */
1424        break;
1425    } while (1);
1426
1427    return event;
1428}
1429
1430
1431static void dpdk_help(void) {
1432    printf("dpdk format module: $Revision: 1752 $\n");
1433    printf("Supported input URIs:\n");
1434    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1435    printf("\tThe -<coreid> is optional \n");
1436    printf("\t e.g. dpdk:0000:01:00.1\n");
1437    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1438    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1439    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1440    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1441    printf("\n");
1442    printf("Supported output URIs:\n");
1443    printf("\tSame format as the input URI.\n");
1444    printf("\t e.g. dpdk:0000:01:00.1\n");
1445    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1446    printf("\n");
1447}
1448
1449 static struct libtrace_format_t dpdk = {
1450        "dpdk",
1451        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1452        TRACE_FORMAT_DPDK,
1453        NULL,                   /* probe filename */
1454        NULL,                               /* probe magic */
1455        dpdk_init_input,            /* init_input */
1456        dpdk_config_input,          /* config_input */
1457        dpdk_start_input,           /* start_input */
1458        dpdk_pause_input,           /* pause_input */
1459        dpdk_init_output,           /* init_output */
1460        NULL,                               /* config_output */
1461        dpdk_start_output,          /* start_ouput */
1462        dpdk_fin_input,             /* fin_input */
1463        dpdk_fin_output,        /* fin_output */
1464        dpdk_read_packet,           /* read_packet */
1465        dpdk_prepare_packet,    /* prepare_packet */
1466        NULL,                               /* fin_packet */
1467        dpdk_write_packet,          /* write_packet */
1468        dpdk_get_link_type,         /* get_link_type */
1469        dpdk_get_direction,         /* get_direction */
1470        dpdk_set_direction,         /* set_direction */
1471        NULL,                               /* get_erf_timestamp */
1472        dpdk_get_timeval,           /* get_timeval */
1473        dpdk_get_timespec,          /* get_timespec */
1474        NULL,                               /* get_seconds */
1475        NULL,                               /* seek_erf */
1476        NULL,                               /* seek_timeval */
1477        NULL,                               /* seek_seconds */
1478        dpdk_get_capture_length,/* get_capture_length */
1479        dpdk_get_wire_length,   /* get_wire_length */
1480        dpdk_get_framing_length,/* get_framing_length */
1481        dpdk_set_capture_length,/* set_capture_length */
1482        NULL,                               /* get_received_packets */
1483        dpdk_get_filtered_packets,/* get_filtered_packets */
1484        dpdk_get_dropped_packets,/* get_dropped_packets */
1485    dpdk_get_captured_packets,/* get_captured_packets */
1486        NULL,                       /* get_fd */
1487        dpdk_trace_event,               /* trace_event */
1488    dpdk_help,              /* help */
1489        NULL
1490};
1491
1492void dpdk_constructor(void) {
1493        register_format(&dpdk);
1494}
Note: See TracBrowser for help on using the repository browser.