source: lib/format_dpdk.c @ bb0a1f4

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since bb0a1f4 was bb0a1f4, checked in by Richard Sanger <rsanger@…>, 5 years ago

Use the default DPDK device driver rx/tx thresholds

DPDK 1.8 introduced default thresholds per device driver, these
perform better then what we were using.

Quickly tested rx and tx on DPDK 1.7.1, 1.8 & 2.0 with the
1G 82580 NIC and 10G 82599. All appear to functional, and
should be faster with the i40e.

Fixes issue #26

  • Property mode set to 100644
File size: 53.7 KB
Line 
1
2/*
3 * This file is part of libtrace
4 *
5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
6 * New Zealand.
7 *
8 * Author: Richard Sanger
9 *         
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
30 *
31 */
32
33/* This format module deals with using the Intel Data Plane Development
34 * Kit capture format.
35 *
36 * Intel Data Plane Development Kit is a LIVE capture format.
37 *
38 * This format also supports writing which will write packets out to the
39 * network as a form of packet replay. This should not be confused with the
40 * RT protocol which is intended to transfer captured packet records between
41 * RT-speaking programs.
42 */
43
44#include "config.h"
45#include "libtrace.h"
46#include "libtrace_int.h"
47#include "format_helper.h"
48#include "libtrace_arphrd.h"
49
50#ifdef HAVE_INTTYPES_H
51#  include <inttypes.h>
52#else
53# error "Can't find inttypes.h"
54#endif
55
56#include <stdlib.h>
57#include <assert.h>
58#include <unistd.h>
59#include <endian.h>
60#include <string.h>
61
62/* We can deal with any minor differences by checking the RTE VERSION
63 * Typically DPDK backports some fixes (typically for building against
64 * newer kernels) to the older version of DPDK.
65 *
66 * These get released with the rX suffix. The following macros where added
67 * in these new releases.
68 *
69 * Below this is a log of version that required changes to the libtrace
70 * code (that we still attempt to support).
71 *
72 * DPDK v1.7.1 is recommended.
73 * However 1.5 to 1.8 are likely supported.
74 */
75#include <rte_eal.h>
76#include <rte_version.h>
77#ifndef RTE_VERSION_NUM
78#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
79#endif
80#ifndef RTE_VER_PATCH_RELEASE
81#       define RTE_VER_PATCH_RELEASE 0
82#endif
83#ifndef RTE_VERSION
84#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
85        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
86#endif
87
88/* 1.6.0r2 :
89 *      rte_eal_pci_set_blacklist() is removed
90 *      device_list is renamed to pci_device_list
91 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
92 *      as such we do apply the whitelist before rte_eal_init.
93 *      This also works correctly with DPDK 1.6.0r2.
94 *
95 * Replaced by:
96 *      rte_devargs (we can simply whitelist)
97 */
98#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
99#       define DPDK_USE_BLACKLIST 1
100#else
101#       define DPDK_USE_BLACKLIST 0
102#endif
103
104/*
105 * 1.7.0 :
106 *      rte_pmd_init_all is removed
107 *
108 * Replaced by:
109 *      Nothing, no longer needed
110 */
111#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
112#       define DPDK_USE_PMD_INIT 1
113#else
114#       define DPDK_USE_PMD_INIT 0
115#endif
116
117/* 1.7.0-rc3 :
118 *
119 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
120 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
121 * it twice.
122 */
123#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
124#       define DPDK_USE_PCI_PROBE 1
125#else
126#       define DPDK_USE_PCI_PROBE 0
127#endif
128
129/* 1.8.0-rc1 :
130 * LOG LEVEL is a command line option which overrides what
131 * we previously set it to.
132 */
133#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
134#       define DPDK_USE_LOG_LEVEL 1
135#else
136#       define DPDK_USE_LOG_LEVEL 0
137#endif
138
139/* 1.8.0-rc2
140 * rx/tx_conf thresholds can be set to NULL in rte_eth_rx/tx_queue_setup
141 * this uses the default values, which are better tuned per device
142 * See issue #26
143 */
144#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 2)
145#       define DPDK_USE_NULL_QUEUE_CONFIG 1
146#else
147#       define DPDK_USE_NULL_QUEUE_CONFIG 0
148#endif
149
150#include <rte_per_lcore.h>
151#include <rte_debug.h>
152#include <rte_errno.h>
153#include <rte_common.h>
154#include <rte_log.h>
155#include <rte_memcpy.h>
156#include <rte_prefetch.h>
157#include <rte_branch_prediction.h>
158#include <rte_pci.h>
159#include <rte_ether.h>
160#include <rte_ethdev.h>
161#include <rte_ring.h>
162#include <rte_mempool.h>
163#include <rte_mbuf.h>
164
165/* The default size of memory buffers to use - This is the max size of standard
166 * ethernet packet less the size of the MAC CHECKSUM */
167#define RX_MBUF_SIZE 1514
168
169/* The minimum number of memory buffers per queue tx or rx. Search for
170 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
171 */
172#define MIN_NB_BUF 64
173
174/* Number of receive memory buffers to use
175 * By default this is limited by driver to 4k and must be a multiple of 128.
176 * A modification can be made to the driver to remove this limit.
177 * This can be increased in the driver and here.
178 * Should be at least MIN_NB_BUF.
179 */
180#define NB_RX_MBUF 4096
181
182/* Number of send memory buffers to use.
183 * Same limits apply as those to NB_TX_MBUF.
184 */
185#define NB_TX_MBUF 1024
186
187/* The size of the PCI blacklist needs to be big enough to contain
188 * every PCI device address (listed by lspci every bus:device.function tuple).
189 */
190#define BLACK_LIST_SIZE 50
191
192/* The maximum number of characters the mempool name can be */
193#define MEMPOOL_NAME_LEN 20
194
195#define MBUF(x) ((struct rte_mbuf *) x)
196/* Get the original placement of the packet data */
197#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
198#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
199#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
200                        (uint64_t) tv.tv_usec*1000ull)
201#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
202                        (uint64_t) ts.tv_nsec)
203
204#if RTE_PKTMBUF_HEADROOM != 128
205#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
206         "any libtrace instance processing these packet must be have the" \
207         "same RTE_PKTMBUF_HEADROOM set"
208#endif
209
210/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
211 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
212 *
213 * Make sure you understand what these are doing before enabling them.
214 * They might make traces incompatable with other builds etc.
215 *
216 * These are also included to show how to do somethings which aren't
217 * obvious in the DPDK documentation.
218 */
219
220/* Print verbose messages to stdout */
221#define DEBUG 0
222
223/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
224 * only turn on if you know clock_gettime is a vsyscall on your system
225 * overwise could be a large overhead. Again gettimeofday() should be
226 * vsyscall also if it's not you should seriously consider updating your
227 * kernel.
228 */
229#ifdef HAVE_LIBRT
230/* You can turn this on (set to 1) to prefer clock_gettime */
231#define USE_CLOCK_GETTIME 0
232#else
233/* DONT CHANGE THIS !!! */
234#define USE_CLOCK_GETTIME 0
235#endif
236
237/* This is fairly safe to turn on - currently there appears to be a 'bug'
238 * in DPDK that will remove the checksum by making the packet appear 4bytes
239 * smaller than what it really is. Most formats don't include the checksum
240 * hence writing out a port such as int: ring: and dpdk: assumes there
241 * is no checksum and will attempt to write the checksum as part of the
242 * packet
243 */
244#define GET_MAC_CRC_CHECKSUM 0
245
246/* This requires a modification of the pmd drivers (inside Intel DPDK)
247 */
248#define HAS_HW_TIMESTAMPS_82580 0
249
250#if HAS_HW_TIMESTAMPS_82580
251# define TS_NBITS_82580     40
252/* The maximum on the +ve or -ve side that we can be, make it half way */
253# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
254#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
255#endif
256
257/* As per Intel 82580 specification - mismatch in 82580 datasheet
258 * it states ts is stored in Big Endian, however its actually Little */
259struct hw_timestamp_82580 {
260    uint64_t reserved;
261    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
262};
263
264enum paused_state {
265    DPDK_NEVER_STARTED,
266    DPDK_RUNNING,
267    DPDK_PAUSED,
268};
269
270/* Used by both input and output however some fields are not used
271 * for output */
272struct dpdk_format_data_t {
273    int8_t promisc; /* promiscuous mode - RX only */
274    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
275    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
276    uint8_t paused; /* See paused_state */ 
277    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
278    int snaplen; /* The snap length for the capture - RX only */
279    /* We always have to setup both rx and tx queues even if we don't want them */
280    int nb_rx_buf; /* The number of packet buffers in the rx ring */
281    int nb_tx_buf; /* The number of packet buffers in the tx ring */
282    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
283#if DPDK_USE_BLACKLIST
284    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
285        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
286#endif
287    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
288#if HAS_HW_TIMESTAMPS_82580
289    /* Timestamping only relevent to RX */
290    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
291    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
292    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
293#endif
294};
295
296enum dpdk_addt_hdr_flags {
297    INCLUDES_CHECKSUM = 0x1,
298    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
299};
300
301/**
302 * A structure placed in front of the packet where we can store
303 * additional information about the given packet.
304 * +--------------------------+
305 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
306 * +--------------------------+
307 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
308 * +--------------------------+
309 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
310 * +--------------------------+
311 * |   sizeof(dpdk_addt_hdr)  | 1 byte
312 * +--------------------------+
313 * *   hw_timestamp_82580     * 16 bytes Optional
314 * +--------------------------+
315 * |       Packet data        | Variable Size
316 * |                          |
317 */
318struct dpdk_addt_hdr {
319    uint64_t timestamp;
320    uint8_t flags;
321    uint8_t direction;
322    uint8_t reserved1;
323    uint8_t reserved2;
324    uint32_t cap_len; /* The size to say the capture is */
325};
326
327/**
328 * We want to blacklist all devices except those on the whitelist
329 * (I say list, but yes it is only the one).
330 *
331 * The default behaviour of rte_pci_probe() will map every possible device
332 * to its DPDK driver. The DPDK driver will take the ethernet device
333 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
334 *
335 * So blacklist all devices except the one that we wish to use so that
336 * the others can still be used as standard ethernet ports.
337 *
338 * @return 0 if successful, otherwise -1 on error.
339 */
340#if DPDK_USE_BLACKLIST
341static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
342{
343        struct rte_pci_device *dev = NULL;
344        format_data->nb_blacklist = 0;
345
346        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
347
348        TAILQ_FOREACH(dev, &device_list, next) {
349        if (whitelist != NULL && whitelist->domain == dev->addr.domain
350            && whitelist->bus == dev->addr.bus
351            && whitelist->devid == dev->addr.devid
352            && whitelist->function == dev->addr.function)
353            continue;
354                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
355                                / sizeof (format_data->blacklist[0])) {
356                        printf("Warning: too many devices to blacklist consider"
357                                        " increasing BLACK_LIST_SIZE");
358                        break;
359                }
360                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
361                ++format_data->nb_blacklist;
362        }
363
364        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
365        return 0;
366}
367#else /* DPDK_USE_BLACKLIST */
368#include <rte_devargs.h>
369static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
370{
371        char pci_str[20] = {0};
372        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
373                 whitelist->domain,
374                 whitelist->bus,
375                 whitelist->devid,
376                 whitelist->function);
377        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
378                return -1;
379        }
380        return 0;
381}
382#endif
383
384/**
385 * Parse the URI format as a pci address
386 * Fills in addr, note core is optional and is unchanged if
387 * a value for it is not provided.
388 *
389 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
390 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
391 */
392static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
393    int matches;
394    assert(str);
395    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
396                     &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
397    if (matches >= 4) {
398        return 0;
399    } else {
400        return -1;
401    }
402}
403
404#if DEBUG
405/* For debugging */
406static inline void dump_configuration()
407{
408    struct rte_config * global_config;
409    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
410
411    if (nb_cpu <= 0) {
412        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
413        nb_cpu = 1; /* fallback to just 1 core */
414    }
415    if (nb_cpu > RTE_MAX_LCORE)
416        nb_cpu = RTE_MAX_LCORE;
417
418    global_config = rte_eal_get_configuration();
419
420    if (global_config != NULL) {
421        int i;
422        fprintf(stderr, "Intel DPDK setup\n"
423               "---Version      : %s\n"
424               "---Master LCore : %"PRIu32"\n"
425               "---LCore Count  : %"PRIu32"\n",
426               rte_version(),
427               global_config->master_lcore, global_config->lcore_count);
428
429        for (i = 0 ; i < nb_cpu; i++) {
430            fprintf(stderr, "   ---Core %d : %s\n", i,
431                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
432        }
433
434        const char * proc_type;
435        switch (global_config->process_type) {
436            case RTE_PROC_AUTO:
437                proc_type = "auto";
438                break;
439            case RTE_PROC_PRIMARY:
440                proc_type = "primary";
441                break;
442            case RTE_PROC_SECONDARY:
443                proc_type = "secondary";
444                break;
445            case RTE_PROC_INVALID:
446                proc_type = "invalid";
447                break;
448            default:
449                proc_type = "something worse than invalid!!";
450        }
451        fprintf(stderr, "---Process Type : %s\n", proc_type);
452    }
453
454}
455#endif
456
457/**
458 * XXX This is very bad XXX
459 * But we have to do something to allow getopts nesting
460 * Luckly normally the format is last so it doesn't matter
461 * DPDK only supports modern systems so hopefully this
462 * will continue to work
463 */
464struct saved_getopts {
465        char *optarg;
466        int optind;
467        int opterr;
468        int optopt;
469};
470
471static void save_getopts(struct saved_getopts *opts) {
472        opts->optarg = optarg;
473        opts->optind = optind;
474        opts->opterr = opterr;
475        opts->optopt = optopt;
476}
477
478static void restore_getopts(struct saved_getopts *opts) {
479        optarg = opts->optarg;
480        optind = opts->optind;
481        opterr = opts->opterr;
482        optopt = opts->optopt;
483}
484
485static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
486                                        char * err, int errlen) {
487    int ret; /* Returned error codes */
488    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
489    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
490    char mem_map[20] = {0}; /* The memory name */
491    long nb_cpu; /* The number of CPUs in the system */
492    long my_cpu; /* The CPU number we want to bind to */
493        struct saved_getopts save_opts;
494   
495#if DEBUG
496    rte_set_log_level(RTE_LOG_DEBUG);
497#else
498    rte_set_log_level(RTE_LOG_WARNING);
499#endif
500    /*
501     * Using unique file prefixes mean separate memory is used, unlinking
502     * the two processes. However be careful we still cannot access a
503     * port that already in use.
504     */
505    char* argv[] = {"libtrace",
506                    "-c", cpu_number,
507                    "-n", "1",
508                    "--proc-type", "auto",
509                    "--file-prefix", mem_map,
510                    "-m", "256",
511#if DPDK_USE_LOG_LEVEL
512#       if DEBUG
513                    "--log-level", "8", /* RTE_LOG_DEBUG */
514#       else
515                    "--log-level", "5", /* RTE_LOG_WARNING */
516#       endif
517#endif
518                    NULL};
519    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
520
521    /* This initialises the Environment Abstraction Layer (EAL)
522     * If we had slave workers these are put into WAITING state
523     *
524     * Basically binds this thread to a fixed core, which we choose as
525     * the last core on the machine (assuming fewer interrupts mapped here).
526     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
527     * "-n" the number of memory channels into the CPU (hardware specific)
528     *      - Most likely to be half the number of ram slots in your machine.
529     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
530     * Controls where in memory packets are stored and should spread across
531     * the channels. We just use 1 to be safe.
532     */
533
534    /* Get the number of cpu cores in the system and use the last core */
535    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
536    if (nb_cpu <= 0) {
537        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
538        nb_cpu = 1; /* fallback to the first core */
539    }
540    if (nb_cpu > RTE_MAX_LCORE)
541        nb_cpu = RTE_MAX_LCORE;
542
543    my_cpu = nb_cpu;
544    /* This allows the user to specify the core - we would try to do this
545     * automatically but it's hard to tell that this is secondary
546     * before running rte_eal_init(...). Currently we are limited to 1
547     * instance per core due to the way memory is allocated. */
548    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
549        snprintf(err, errlen, "Failed to parse URI");
550        return -1;
551    }
552
553    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
554                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
555
556    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
557        snprintf(err, errlen, 
558          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
559          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
560        return -1;
561    }
562
563    /* Make our mask */
564    snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
565
566#if !DPDK_USE_BLACKLIST
567    /* Black list all ports besides the one that we want to use */
568    if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
569        snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
570                 " are you sure the address is correct?: %s", strerror(-ret));
571        return -1;
572    }
573#endif
574
575        /* Give the memory map a unique name */
576        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
577    /* rte_eal_init it makes a call to getopt so we need to reset the
578     * global optind variable of getopt otherwise this fails */
579        save_getopts(&save_opts);
580    optind = 1;
581    if ((ret = rte_eal_init(argc, argv)) < 0) {
582        snprintf(err, errlen, 
583          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
584        return -1;
585    }
586        restore_getopts(&save_opts);
587
588#if DEBUG
589    dump_configuration();
590#endif
591
592#if DPDK_USE_PMD_INIT
593    /* This registers all available NICs with Intel DPDK
594     * These are not loaded until rte_eal_pci_probe() is called.
595     */
596    if ((ret = rte_pmd_init_all()) < 0) {
597        snprintf(err, errlen, 
598          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
599        return -1;
600    }
601#endif
602
603#if DPDK_USE_BLACKLIST
604    /* Blacklist all ports besides the one that we want to use */
605        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
606                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
607                         " are you sure the address is correct?: %s", strerror(-ret));
608                return -1;
609        }
610#endif
611
612#if DPDK_USE_PCI_PROBE
613    /* This loads DPDK drivers against all ports that are not blacklisted */
614        if ((ret = rte_eal_pci_probe()) < 0) {
615        snprintf(err, errlen, 
616            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
617        return -1;
618    }
619#endif
620
621    format_data->nb_ports = rte_eth_dev_count();
622
623    if (format_data->nb_ports != 1) {
624        snprintf(err, errlen, 
625            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
626            format_data->nb_ports);
627        return -1;
628    }
629
630    return 0;
631}
632
633static int dpdk_init_input (libtrace_t *libtrace) {
634    char err[500];
635    err[0] = 0;
636   
637    libtrace->format_data = (struct dpdk_format_data_t *)
638                            malloc(sizeof(struct dpdk_format_data_t));
639    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
640    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
641    FORMAT(libtrace)->nb_ports = 0;
642    FORMAT(libtrace)->snaplen = 0; /* Use default */
643    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
644    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
645    FORMAT(libtrace)->promisc = -1;
646    FORMAT(libtrace)->pktmbuf_pool = NULL;
647#if DPDK_USE_BLACKLIST
648    FORMAT(libtrace)->nb_blacklist = 0;
649#endif
650    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
651    FORMAT(libtrace)->mempool_name[0] = 0;
652#if HAS_HW_TIMESTAMPS_82580
653    FORMAT(libtrace)->ts_first_sys = 0;
654    FORMAT(libtrace)->ts_last_sys = 0;
655    FORMAT(libtrace)->wrap_count = 0;
656#endif
657
658    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
659        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
660        free(libtrace->format_data);
661        libtrace->format_data = NULL;
662        return -1;
663    }
664    return 0;
665};
666
667static int dpdk_init_output(libtrace_out_t *libtrace)
668{
669    char err[500];
670    err[0] = 0;
671   
672    libtrace->format_data = (struct dpdk_format_data_t *)
673                            malloc(sizeof(struct dpdk_format_data_t));
674    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
675    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
676    FORMAT(libtrace)->nb_ports = 0;
677    FORMAT(libtrace)->snaplen = 0; /* Use default */
678    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
679    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
680    FORMAT(libtrace)->promisc = -1;
681    FORMAT(libtrace)->pktmbuf_pool = NULL;
682#if DPDK_USE_BLACKLIST
683    FORMAT(libtrace)->nb_blacklist = 0;
684#endif
685    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
686    FORMAT(libtrace)->mempool_name[0] = 0;
687#if HAS_HW_TIMESTAMPS_82580
688    FORMAT(libtrace)->ts_first_sys = 0;
689    FORMAT(libtrace)->ts_last_sys = 0;
690    FORMAT(libtrace)->wrap_count = 0;
691#endif
692
693    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
694        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
695        free(libtrace->format_data);
696        libtrace->format_data = NULL;
697        return -1;
698    }
699    return 0;
700};
701
702/**
703 * Note here snaplen excludes the MAC checksum. Packets over
704 * the requested snaplen will be dropped. (Excluding MAC checksum)
705 *
706 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
707 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
708 * is set the maximum size of the returned packet would be 1518 otherwise
709 * 1514 would be the largest size possibly returned.
710 *
711 */
712static int dpdk_config_input (libtrace_t *libtrace,
713                                        trace_option_t option,
714                                        void *data) {
715    switch (option) {
716        case TRACE_OPTION_SNAPLEN:
717            /* Only support changing snaplen before a call to start is
718             * made */
719            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
720                FORMAT(libtrace)->snaplen=*(int*)data;
721            else
722                return -1;
723            return 0;
724                case TRACE_OPTION_PROMISC:
725                        FORMAT(libtrace)->promisc=*(int*)data;
726            return 0;
727        case TRACE_OPTION_FILTER:
728            /* TODO filtering */
729            break;
730        case TRACE_OPTION_META_FREQ:
731            break;
732        case TRACE_OPTION_EVENT_REALTIME:
733            break;
734        /* Avoid default: so that future options will cause a warning
735         * here to remind us to implement it, or flag it as
736         * unimplementable
737         */
738    }
739
740        /* Don't set an error - trace_config will try to deal with the
741         * option and will set an error if it fails */
742    return -1;
743}
744
745/* Can set jumbo frames/ or limit the size of a frame by setting both
746 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
747 *
748 */
749static struct rte_eth_conf port_conf = {
750        .rxmode = {
751                .split_hdr_size = 0,
752                .header_split   = 0, /**< Header Split disabled */
753                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
754                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
755                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
756        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
757#if GET_MAC_CRC_CHECKSUM
758/* So it appears that if hw_strip_crc is turned off the driver will still
759 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
760 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
761 * So lets just add it back on when we receive the packet.
762 */
763                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
764#else
765/* By default strip the MAC checksum because it's a bit of a hack to
766 * actually read these. And don't want to rely on disabling this to actualy
767 * always cut off the checksum in the future
768 */
769        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
770#endif
771        },
772        .txmode = {
773                .mq_mode = ETH_DCB_NONE,
774        },
775};
776
777static const struct rte_eth_rxconf rx_conf = {
778        .rx_thresh = {
779                .pthresh = 8,/* RX_PTHRESH prefetch */
780                .hthresh = 8,/* RX_HTHRESH host */
781                .wthresh = 4,/* RX_WTHRESH writeback */
782        },
783    .rx_free_thresh = 0,
784    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
785};
786
787static const struct rte_eth_txconf tx_conf = {
788        .tx_thresh = {
789        /**
790         * TX_PTHRESH prefetch
791         * Set on the NIC, if the number of unprocessed descriptors to queued on
792         * the card fall below this try grab at least hthresh more unprocessed
793         * descriptors.
794         */
795                .pthresh = 36,
796
797        /* TX_HTHRESH host
798         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
799         */
800                .hthresh = 0,
801       
802        /* TX_WTHRESH writeback
803         * Set on the NIC, the number of sent descriptors before writing back
804         * status to confirm the transmission. This is done more efficiently as
805         * a bulk DMA-transfer rather than writing one at a time.
806         * Similar to tx_free_thresh however this is applied to the NIC, where
807         * as tx_free_thresh is when DPDK will check these. This is extended
808         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
809         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
810         */
811                .wthresh = 4,
812        },
813
814    /* Used internally by DPDK rather than passed to the NIC. The number of
815     * packet descriptors to send before checking for any responses written
816     * back (to confirm the transmission). Default = 32 if set to 0)
817     */
818        .tx_free_thresh = 0,
819
820    /* This is the Report Status threshold, used by 10Gbit cards,
821     * This signals the card to only write back status (such as
822     * transmission successful) after this minimum number of transmit
823     * descriptors are seen. The default is 32 (if set to 0) however if set
824     * to greater than 1 TX wthresh must be set to zero, because this is kindof
825     * a replacement. See the dpdk programmers guide for more restrictions.
826     */
827        .tx_rs_thresh = 1,
828};
829
830/* Attach memory to the port and start the port or restart the port.
831 */
832static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
833    int ret; /* Check return values for errors */
834    struct rte_eth_link link_info; /* Wait for link */
835   
836    /* Already started */
837    if (format_data->paused == DPDK_RUNNING)
838        return 0;
839
840    /* First time started we need to alloc our memory, doing this here
841     * rather than in environment setup because we don't have snaplen then */
842    if (format_data->paused == DPDK_NEVER_STARTED) {
843        if (format_data->snaplen == 0) {
844            format_data->snaplen = RX_MBUF_SIZE;
845            port_conf.rxmode.jumbo_frame = 0;
846            port_conf.rxmode.max_rx_pkt_len = 0;
847        } else {
848            /* Use jumbo frames */
849            port_conf.rxmode.jumbo_frame = 1;
850            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
851        }
852
853        /* This is additional overhead so make sure we allow space for this */
854#if GET_MAC_CRC_CHECKSUM
855        format_data->snaplen += ETHER_CRC_LEN;
856#endif
857#if HAS_HW_TIMESTAMPS_82580
858        format_data->snaplen += sizeof(struct hw_timestamp_82580);
859#endif
860
861        /* Create the mbuf pool, which is the place our packets are allocated
862         * from - TODO figure out if there is is a free function (I cannot see one)
863         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
864         * allocate however that extra 1 packet is not used.
865         * (I assume <= vs < error some where in DPDK code)
866         * TX requires nb_tx_buffers + 1 in the case the queue is full
867         * so that will fill the new buffer and wait until slots in the
868         * ring become available.
869         */
870#if DEBUG
871    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
872#endif
873        format_data->pktmbuf_pool =
874            rte_mempool_create(format_data->mempool_name,
875                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
876                       format_data->snaplen + sizeof(struct rte_mbuf) 
877                                        + RTE_PKTMBUF_HEADROOM,
878                       8, sizeof(struct rte_pktmbuf_pool_private),
879                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
880                       rte_socket_id(), MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
881
882        if (format_data->pktmbuf_pool == NULL) {
883            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
884                        "pool failed: %s", strerror(rte_errno));
885            return -1;
886        }
887    }
888   
889    /* ----------- Now do the setup for the port mapping ------------ */
890    /* Order of calls must be
891     * rte_eth_dev_configure()
892     * rte_eth_tx_queue_setup()
893     * rte_eth_rx_queue_setup()
894     * rte_eth_dev_start()
895     * other rte_eth calls
896     */
897   
898    /* This must be called first before another *eth* function
899     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
900    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
901    if (ret < 0) {
902        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
903                            " %"PRIu8" : %s", format_data->port,
904                            strerror(-ret));
905        return -1;
906    }
907    /* Initialise the TX queue a minimum value if using this port for
908     * receiving. Otherwise a larger size if writing packets.
909     */
910    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
911                        format_data->nb_tx_buf, rte_socket_id(),
912                        DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
913    if (ret < 0) {
914        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
915                            " %"PRIu8" : %s", format_data->port,
916                            strerror(-ret));
917        return -1;
918    }
919    /* Initialise the RX queue with some packets from memory */
920    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
921                            format_data->nb_rx_buf, rte_socket_id(),
922                            DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &rx_conf,
923                            format_data->pktmbuf_pool);
924    if (ret < 0) {
925        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
926                    " %"PRIu8" : %s", format_data->port,
927                    strerror(-ret));
928        return -1;
929    }
930   
931    /* Start device */
932    ret = rte_eth_dev_start(format_data->port);
933    if (ret < 0) {
934        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
935                    strerror(-ret));
936        return -1;
937    }
938
939    /* Default promiscuous to on */
940    if (format_data->promisc == -1)
941        format_data->promisc = 1;
942   
943    if (format_data->promisc == 1)
944        rte_eth_promiscuous_enable(format_data->port);
945    else
946        rte_eth_promiscuous_disable(format_data->port);
947   
948    /* Wait for the link to come up */
949    rte_eth_link_get(format_data->port, &link_info);
950#if DEBUG
951    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
952            (int) link_info.link_duplex, (int) link_info.link_speed);
953#endif
954
955    /* We have now successfully started/unpaused */
956    format_data->paused = DPDK_RUNNING;
957   
958    return 0;
959}
960
961static int dpdk_start_input (libtrace_t *libtrace) {
962    char err[500];
963    err[0] = 0;
964
965    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
966        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
967        free(libtrace->format_data);
968        libtrace->format_data = NULL;
969        return -1;
970    }
971    return 0;
972}
973
974static int dpdk_start_output(libtrace_out_t *libtrace)
975{
976    char err[500];
977    err[0] = 0;
978   
979    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
980        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
981        free(libtrace->format_data);
982        libtrace->format_data = NULL;
983        return -1;
984    }
985    return 0;
986}
987
988static int dpdk_pause_input(libtrace_t * libtrace){
989    /* This stops the device, but can be restarted using rte_eth_dev_start() */
990    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
991#if DEBUG     
992        fprintf(stderr, "Pausing port\n");
993#endif
994        rte_eth_dev_stop(FORMAT(libtrace)->port);
995        FORMAT(libtrace)->paused = DPDK_PAUSED;
996        /* If we pause it the driver will be reset and likely our counter */
997#if HAS_HW_TIMESTAMPS_82580
998        FORMAT(libtrace)->ts_first_sys = 0;
999        FORMAT(libtrace)->ts_last_sys = 0;
1000#endif
1001    }
1002    return 0;
1003}
1004
1005static int dpdk_write_packet(libtrace_out_t *trace, 
1006                libtrace_packet_t *packet){
1007    struct rte_mbuf* m_buff[1];
1008   
1009    int wirelen = trace_get_wire_length(packet);
1010    int caplen = trace_get_capture_length(packet);
1011   
1012    /* Check for a checksum and remove it */
1013    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1014                                            wirelen == caplen)
1015        caplen -= ETHER_CRC_LEN;
1016
1017    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1018    if (m_buff[0] == NULL) {
1019        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1020        return -1;
1021    } else {
1022        int ret;
1023        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1024        do {
1025            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1026        } while (ret != 1);
1027    }
1028
1029    return 0;
1030}
1031
1032static int dpdk_fin_input(libtrace_t * libtrace) {
1033    /* Free our memory structures */
1034    if (libtrace->format_data != NULL) {
1035        /* Close the device completely, device cannot be restarted */
1036        if (FORMAT(libtrace)->port != 0xFF)
1037            rte_eth_dev_close(FORMAT(libtrace)->port);
1038        /* filter here if we used it */
1039                free(libtrace->format_data);
1040        }
1041
1042    /* Revert to the original PCI drivers */
1043    /* No longer in DPDK
1044    rte_eal_pci_exit(); */
1045    return 0;
1046}
1047
1048
1049static int dpdk_fin_output(libtrace_out_t * libtrace) {
1050    /* Free our memory structures */
1051    if (libtrace->format_data != NULL) {
1052        /* Close the device completely, device cannot be restarted */
1053        if (FORMAT(libtrace)->port != 0xFF)
1054            rte_eth_dev_close(FORMAT(libtrace)->port);
1055        /* filter here if we used it */
1056                free(libtrace->format_data);
1057        }
1058
1059    /* Revert to the original PCI drivers */
1060    /* No longer in DPDK
1061    rte_eal_pci_exit(); */
1062    return 0;
1063}
1064
1065/**
1066 * Get the start of additional header that we added to a packet.
1067 */
1068static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1069    uint8_t *hdrsize;
1070    assert(packet);
1071    assert(packet->buffer);
1072    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
1073    /* The byte before the original packet data denotes the size in bytes
1074     * of our additional header that we added sits before the 'size byte' */
1075    hdrsize--;
1076    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
1077}
1078
1079static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1080    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1081    return hdr->cap_len;
1082}
1083
1084static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1085    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1086    if (size > hdr->cap_len) {
1087        /* Cannot make a packet bigger */
1088                return trace_get_capture_length(packet);
1089        }
1090
1091    /* Reset the cached capture length first*/
1092    packet->capture_length = -1;
1093    hdr->cap_len = (uint32_t) size;
1094        return trace_get_capture_length(packet);
1095}
1096
1097static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1098    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1099    int org_cap_size; /* The original capture size */
1100    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1101        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1102                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
1103                            sizeof(struct hw_timestamp_82580);
1104    } else {
1105        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1106                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
1107    }
1108    if (hdr->flags & INCLUDES_CHECKSUM) {
1109        return org_cap_size;
1110    } else {
1111        /* DPDK packets are always TRACE_TYPE_ETH packets */
1112        return org_cap_size + ETHER_CRC_LEN;
1113    }
1114}
1115static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1116    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1117    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1118        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1119                sizeof(struct hw_timestamp_82580);
1120    else
1121        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1122}
1123
1124static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1125                libtrace_packet_t *packet, void *buffer,
1126                libtrace_rt_types_t rt_type, uint32_t flags) {
1127    assert(packet);
1128    if (packet->buffer != buffer &&
1129        packet->buf_control == TRACE_CTRL_PACKET) {
1130        free(packet->buffer);
1131    }
1132
1133    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1134        packet->buf_control = TRACE_CTRL_PACKET;
1135    } else
1136        packet->buf_control = TRACE_CTRL_EXTERNAL;
1137
1138    packet->buffer = buffer;
1139    packet->header = buffer;
1140
1141    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1142    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1143    packet->type = rt_type;
1144    return 0;
1145}
1146
1147/*
1148 * Does any extra preperation to a captured packet.
1149 * This includes adding our extra header to it with the timestamp
1150 */
1151static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1152                                                        struct rte_mbuf* pkt){
1153    uint8_t * hdr_size;
1154    struct dpdk_addt_hdr *hdr;
1155#if HAS_HW_TIMESTAMPS_82580
1156    struct hw_timestamp_82580 *hw_ts;
1157    struct timeval cur_sys_time;
1158    uint64_t cur_sys_time_ns;
1159    uint64_t estimated_wraps;
1160   
1161    /* Using gettimeofday because it's most likely to be a vsyscall
1162     * We don't want to slow down anything with systemcalls we dont need
1163     * accauracy */
1164    gettimeofday(&cur_sys_time, NULL);
1165#else
1166# if USE_CLOCK_GETTIME
1167    struct timespec cur_sys_time;
1168   
1169    /* This looks terrible and I feel bad doing it. But it's OK
1170     * on new kernels, because this is a vsyscall */
1171    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1172# else
1173    struct timeval cur_sys_time;
1174    /* Should be a vsyscall */
1175    gettimeofday(&cur_sys_time, NULL);
1176# endif
1177#endif
1178
1179    /* Record the size of our header */
1180    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1181    *hdr_size = sizeof(struct dpdk_addt_hdr);
1182    /* Now put our header in front of that size */
1183    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1184    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1185   
1186#if GET_MAC_CRC_CHECKSUM
1187    /* Add back in the CRC sum */
1188    rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1189    rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1190    hdr->flags |= INCLUDES_CHECKSUM;
1191#endif
1192
1193#if HAS_HW_TIMESTAMPS_82580
1194    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1195     *
1196     *        +----------+---+   +--------------+
1197     *  82580 |    24    | 8 |   |      32      |
1198     *        +----------+---+   +--------------+
1199     *          reserved  \______ 40 bits _____/
1200     *
1201     * The 40 bit 82580 SYSTIM overflows every
1202     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1203     *
1204     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1205     * Endian (for the full 64 bits) i.e. picture is mirrored
1206     */
1207   
1208    /* The timestamp is sitting before our packet and is included in pkt_len */
1209    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1210    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1211   
1212    /* Despite what the documentation says this is in Little
1213     * Endian byteorder. Mask the reserved section out.
1214     */
1215    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1216                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1217               
1218    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1219    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1220        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1221        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1222    }
1223   
1224    /* This will have serious problems if packets aren't read quickly
1225     * that is within a couple of seconds because our clock cycles every
1226     * 18 seconds */
1227    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1228                            / (1ull<<TS_NBITS_82580);
1229   
1230    /* Estimated_wraps gives the number of times the counter should have
1231     * wrapped (however depending on value last time it could have wrapped
1232     * twice more (if hw clock is close to its max value) or once less (allowing
1233     * for a bit of variance between hw and sys clock). But if the clock
1234     * shouldn't have wrapped once then don't allow it to go backwards in time */
1235    if (unlikely(estimated_wraps >= 2)) {
1236        /* 2 or more wrap arounds add all but the very last wrap */
1237        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1238    }
1239   
1240    /* Set the timestamp to the lowest possible value we're considering */
1241    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1242                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1243   
1244    /* In most runs only the first if() will need evaluating - i.e our
1245     * estimate is correct. */
1246    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1247                                hdr->timestamp, MAXSKEW_82580))) {
1248        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1249        FORMAT(libtrace)->wrap_count++;
1250        hdr->timestamp += (1ull<<TS_NBITS_82580);
1251        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1252                                hdr->timestamp, MAXSKEW_82580)) {
1253            /* Failed to match estimated_wraps */
1254            FORMAT(libtrace)->wrap_count++;
1255            hdr->timestamp += (1ull<<TS_NBITS_82580);
1256            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1257                                hdr->timestamp, MAXSKEW_82580)) {
1258                if (estimated_wraps == 0) {
1259                    /* 0 case Failed to match estimated_wraps+2 */
1260                    printf("WARNING - Hardware Timestamp failed to"
1261                                            " match using systemtime!\n");
1262                    hdr->timestamp = cur_sys_time_ns;
1263                } else {
1264                    /* Failed to match estimated_wraps+1 */
1265                    FORMAT(libtrace)->wrap_count++;
1266                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1267                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1268                                hdr->timestamp, MAXSKEW_82580)) {
1269                        /* Failed to match estimated_wraps+2 */
1270                        printf("WARNING - Hardware Timestamp failed to"
1271                                            " match using systemtime!!\n");
1272                    }
1273                }
1274            }
1275        }
1276    }
1277
1278    /* Log our previous for the next loop */
1279    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1280
1281#else
1282# if USE_CLOCK_GETTIME
1283    hdr->timestamp = TS_TO_NS(cur_sys_time);
1284# else
1285    hdr->timestamp = TV_TO_NS(cur_sys_time);
1286# endif
1287#endif
1288
1289    /* Intels samples prefetch into level 0 cache lets assume it is a good
1290     * idea and do the same */
1291    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1292    packet->buffer = pkt;
1293    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1294
1295    /* Set our capture length for the first time */
1296    hdr->cap_len = dpdk_get_wire_length(packet);
1297    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1298        hdr->cap_len -= ETHER_CRC_LEN;
1299    }
1300   
1301
1302    return dpdk_get_framing_length(packet) +
1303                        dpdk_get_capture_length(packet);
1304}
1305
1306static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1307    int nb_rx; /* Number of rx packets we've recevied */
1308    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1309
1310    /* Free the last packet buffer */
1311    if (packet->buffer != NULL) {
1312        /* Buffer is owned by DPDK */
1313        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1314            rte_pktmbuf_free(packet->buffer);
1315            packet->buffer = NULL;
1316        } else
1317        /* Buffer is owned by packet i.e. has been malloc'd */
1318        if (packet->buf_control == TRACE_CTRL_PACKET) {
1319            free(packet->buffer);
1320            packet->buffer = NULL;
1321        }
1322    }
1323   
1324    packet->buf_control = TRACE_CTRL_EXTERNAL;
1325    packet->type = TRACE_RT_DATA_DPDK;
1326   
1327    /* Wait for a packet */
1328    while (1) {
1329        /* Poll for a single packet */
1330        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1331                            FORMAT(libtrace)->queue_id, pkts_burst, 1);
1332        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1333            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1334        }
1335        if (libtrace_halt) {
1336            return 0;
1337        }
1338    }
1339   
1340    /* We'll never get here - but if we did it would be bad */
1341    return -1;
1342}
1343
1344static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1345    struct timeval tv;
1346    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1347   
1348    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1349    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1350    return tv;
1351}
1352
1353static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1354    struct timespec ts;
1355    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1356   
1357    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1358    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1359    return ts;
1360}
1361
1362static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1363    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1364}
1365
1366static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1367    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1368    return (libtrace_direction_t) hdr->direction;
1369}
1370
1371static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1372    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1373    hdr->direction = (uint8_t) direction;
1374    return (libtrace_direction_t) hdr->direction;
1375}
1376
1377/*
1378 * NOTE: Drops could occur for other reasons than running out of buffer
1379 * space. Such as failed MAC checksums and oversized packets.
1380 */
1381static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1382    struct rte_eth_stats stats = {0};
1383   
1384    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1385        return UINT64_MAX;
1386    /* Grab the current stats */
1387    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1388   
1389    /* Get the drop counter */
1390    return (uint64_t) stats.ierrors;
1391}
1392
1393static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1394    struct rte_eth_stats stats = {0};
1395   
1396    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1397        return UINT64_MAX;
1398    /* Grab the current stats */
1399    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1400   
1401    /* Get the drop counter */
1402    return (uint64_t) stats.ipackets;
1403}
1404
1405/*
1406 * This is the number of packets filtered by the NIC
1407 * and maybe ahead of number read using libtrace.
1408 *
1409 * XXX we are yet to implement any filtering, but if it was this should
1410 * get the result. So this will just return 0 for now.
1411 */
1412static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1413    struct rte_eth_stats stats = {0};
1414   
1415    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1416        return UINT64_MAX;
1417    /* Grab the current stats */
1418    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1419   
1420    /* Get the drop counter */
1421    return (uint64_t) stats.fdirmiss;
1422}
1423
1424/* Attempts to read a packet in a non-blocking fashion. If one is not
1425 * available a SLEEP event is returned. We do not have the ability to
1426 * create a select()able file descriptor in DPDK.
1427 */
1428static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1429                                        libtrace_packet_t *packet) {
1430    libtrace_eventobj_t event = {0,0,0.0,0};
1431    int nb_rx; /* Number of receive packets we've read */
1432    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1433   
1434    do {
1435   
1436        /* See if we already have a packet waiting */
1437        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1438                        FORMAT(trace)->queue_id, pkts_burst, 1);
1439       
1440        if (nb_rx > 0) {
1441            /* Free the last packet buffer */
1442            if (packet->buffer != NULL) {
1443                /* Buffer is owned by DPDK */
1444                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1445                    rte_pktmbuf_free(packet->buffer);
1446                    packet->buffer = NULL;
1447                } else
1448                /* Buffer is owned by packet i.e. has been malloc'd */
1449                if (packet->buf_control == TRACE_CTRL_PACKET) {
1450                    free(packet->buffer);
1451                    packet->buffer = NULL;
1452                }
1453            }
1454           
1455            packet->buf_control = TRACE_CTRL_EXTERNAL;
1456            packet->type = TRACE_RT_DATA_DPDK;
1457            event.type = TRACE_EVENT_PACKET;
1458            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1459           
1460            /* XXX - Check this passes the filter trace_read_packet normally
1461             * does this for us but this wont */
1462            if (trace->filter) {
1463                if (!trace_apply_filter(trace->filter, packet)) {
1464                    /* Failed the filter so we loop for another packet */
1465                    trace->filtered_packets ++;
1466                    continue;
1467                }
1468            }
1469            trace->accepted_packets ++;
1470        } else {
1471            /* We only want to sleep for a very short time - we are non-blocking */
1472            event.type = TRACE_EVENT_SLEEP;
1473            event.seconds = 0.0001;
1474            event.size = 0;
1475        }
1476       
1477        /* If we get here we have our event */
1478        break;
1479    } while (1);
1480
1481    return event;
1482}
1483
1484
1485static void dpdk_help(void) {
1486    printf("dpdk format module: $Revision: 1752 $\n");
1487    printf("Supported input URIs:\n");
1488    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1489    printf("\tThe -<coreid> is optional \n");
1490    printf("\t e.g. dpdk:0000:01:00.1\n");
1491    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1492    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1493    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1494    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1495    printf("\n");
1496    printf("Supported output URIs:\n");
1497    printf("\tSame format as the input URI.\n");
1498    printf("\t e.g. dpdk:0000:01:00.1\n");
1499    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1500    printf("\n");
1501}
1502
1503 static struct libtrace_format_t dpdk = {
1504        "dpdk",
1505        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1506        TRACE_FORMAT_DPDK,
1507        NULL,                   /* probe filename */
1508        NULL,                               /* probe magic */
1509        dpdk_init_input,            /* init_input */
1510        dpdk_config_input,          /* config_input */
1511        dpdk_start_input,           /* start_input */
1512        dpdk_pause_input,           /* pause_input */
1513        dpdk_init_output,           /* init_output */
1514        NULL,                               /* config_output */
1515        dpdk_start_output,          /* start_ouput */
1516        dpdk_fin_input,             /* fin_input */
1517        dpdk_fin_output,        /* fin_output */
1518        dpdk_read_packet,           /* read_packet */
1519        dpdk_prepare_packet,    /* prepare_packet */
1520        NULL,                               /* fin_packet */
1521        dpdk_write_packet,          /* write_packet */
1522        dpdk_get_link_type,         /* get_link_type */
1523        dpdk_get_direction,         /* get_direction */
1524        dpdk_set_direction,         /* set_direction */
1525        NULL,                               /* get_erf_timestamp */
1526        dpdk_get_timeval,           /* get_timeval */
1527        dpdk_get_timespec,          /* get_timespec */
1528        NULL,                               /* get_seconds */
1529        NULL,                               /* seek_erf */
1530        NULL,                               /* seek_timeval */
1531        NULL,                               /* seek_seconds */
1532        dpdk_get_capture_length,/* get_capture_length */
1533        dpdk_get_wire_length,   /* get_wire_length */
1534        dpdk_get_framing_length,/* get_framing_length */
1535        dpdk_set_capture_length,/* set_capture_length */
1536        NULL,                               /* get_received_packets */
1537        dpdk_get_filtered_packets,/* get_filtered_packets */
1538        dpdk_get_dropped_packets,/* get_dropped_packets */
1539    dpdk_get_captured_packets,/* get_captured_packets */
1540        NULL,                       /* get_fd */
1541        dpdk_trace_event,               /* trace_event */
1542    dpdk_help,              /* help */
1543        NULL
1544};
1545
1546void dpdk_constructor(void) {
1547        register_format(&dpdk);
1548}
Note: See TracBrowser for help on using the repository browser.