source: lib/format_dpdk.c @ 6c3b3d7

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 6c3b3d7 was 6c3b3d7, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Fix compile warning

  • Property mode set to 100644
File size: 51.8 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#include "config.h"
44#include "libtrace.h"
45#include "libtrace_int.h"
46#include "format_helper.h"
47#include "libtrace_arphrd.h"
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
55#include <stdlib.h>
56#include <assert.h>
57#include <unistd.h>
58#include <endian.h>
59#include <string.h>
60
61/* We can deal with any minor differences by checking the RTE VERSION
62 * Typically DPDK backports some fixes (typically for building against
63 * newer kernels) to the older version of DPDK.
64 *
65 * These get released with the rX suffix. The following macros where added
66 * in these new releases.
67 *
68 * Below this is a log of version that required changes to the libtrace
69 * code (that we still attempt to support).
70 *
71 * Currently 1.5 to 1.7 is supported.
72 */
73#include <rte_eal.h>
74#include <rte_version.h>
75#ifndef RTE_VERSION_NUM
76#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
77#endif
78#ifndef RTE_VER_PATCH_RELEASE
79#       define RTE_VER_PATCH_RELEASE 0
80#endif
81#ifndef RTE_VERSION
82#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
83        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
84#endif
85
86/* 1.6.0r2 :
87 *      rte_eal_pci_set_blacklist() is removed
88 *      device_list is renamed ot pci_device_list
89 *
90 * Replaced by:
91 *      rte_devargs (we can simply whitelist)
92 */
93#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
94#       define DPDK_USE_BLACKLIST 1
95#else
96#       define DPDK_USE_BLACKLIST 0
97#endif
98
99/*
100 * 1.7.0 :
101 *      rte_pmd_init_all is removed
102 *
103 * Replaced by:
104 *      Nothing, no longer needed
105 */
106#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
107#       define DPDK_USE_PMD_INIT 1
108#else
109#       define DPDK_USE_PMD_INIT 0
110#endif
111
112#include <rte_per_lcore.h>
113#include <rte_debug.h>
114#include <rte_errno.h>
115#include <rte_common.h>
116#include <rte_log.h>
117#include <rte_memcpy.h>
118#include <rte_prefetch.h>
119#include <rte_branch_prediction.h>
120#include <rte_pci.h>
121#include <rte_ether.h>
122#include <rte_ethdev.h>
123#include <rte_ring.h>
124#include <rte_mempool.h>
125#include <rte_mbuf.h>
126
127/* The default size of memory buffers to use - This is the max size of standard
128 * ethernet packet less the size of the MAC CHECKSUM */
129#define RX_MBUF_SIZE 1514
130
131/* The minimum number of memory buffers per queue tx or rx. Search for
132 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
133 */
134#define MIN_NB_BUF 64
135
136/* Number of receive memory buffers to use
137 * By default this is limited by driver to 4k and must be a multiple of 128.
138 * A modification can be made to the driver to remove this limit.
139 * This can be increased in the driver and here.
140 * Should be at least MIN_NB_BUF.
141 */
142#define NB_RX_MBUF 4096
143
144/* Number of send memory buffers to use.
145 * Same limits apply as those to NB_TX_MBUF.
146 */
147#define NB_TX_MBUF 1024
148
149/* The size of the PCI blacklist needs to be big enough to contain
150 * every PCI device address (listed by lspci every bus:device.function tuple).
151 */
152#define BLACK_LIST_SIZE 50
153
154/* The maximum number of characters the mempool name can be */
155#define MEMPOOL_NAME_LEN 20
156
157#define MBUF(x) ((struct rte_mbuf *) x)
158/* Get the original placement of the packet data */
159#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
160#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
161#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
162                        (uint64_t) tv.tv_usec*1000ull)
163#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
164                        (uint64_t) ts.tv_nsec)
165
166#if RTE_PKTMBUF_HEADROOM != 128
167#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
168         "any libtrace instance processing these packet must be have the" \
169         "same RTE_PKTMBUF_HEADROOM set"
170#endif
171
172/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
173 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
174 *
175 * Make sure you understand what these are doing before enabling them.
176 * They might make traces incompatable with other builds etc.
177 *
178 * These are also included to show how to do somethings which aren't
179 * obvious in the DPDK documentation.
180 */
181
182/* Print verbose messages to stdout */
183#define DEBUG 0
184
185/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
186 * only turn on if you know clock_gettime is a vsyscall on your system
187 * overwise could be a large overhead. Again gettimeofday() should be
188 * vsyscall also if it's not you should seriously consider updating your
189 * kernel.
190 */
191#ifdef HAVE_LIBRT
192/* You can turn this on (set to 1) to prefer clock_gettime */
193#define USE_CLOCK_GETTIME 0
194#else
195/* DONT CHANGE THIS !!! */
196#define USE_CLOCK_GETTIME 0
197#endif
198
199/* This is fairly safe to turn on - currently there appears to be a 'bug'
200 * in DPDK that will remove the checksum by making the packet appear 4bytes
201 * smaller than what it really is. Most formats don't include the checksum
202 * hence writing out a port such as int: ring: and dpdk: assumes there
203 * is no checksum and will attempt to write the checksum as part of the
204 * packet
205 */
206#define GET_MAC_CRC_CHECKSUM 0
207
208/* This requires a modification of the pmd drivers (inside Intel DPDK)
209 */
210#define HAS_HW_TIMESTAMPS_82580 0
211
212#if HAS_HW_TIMESTAMPS_82580
213# define TS_NBITS_82580     40
214/* The maximum on the +ve or -ve side that we can be, make it half way */
215# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
216#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
217#endif
218
219/* As per Intel 82580 specification - mismatch in 82580 datasheet
220 * it states ts is stored in Big Endian, however its actually Little */
221struct hw_timestamp_82580 {
222    uint64_t reserved;
223    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
224};
225
226enum paused_state {
227    DPDK_NEVER_STARTED,
228    DPDK_RUNNING,
229    DPDK_PAUSED,
230};
231
232/* Used by both input and output however some fields are not used
233 * for output */
234struct dpdk_format_data_t {
235    int8_t promisc; /* promiscuous mode - RX only */
236    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
237    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
238    uint8_t paused; /* See paused_state */ 
239    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
240    int snaplen; /* The snap length for the capture - RX only */
241    /* We always have to setup both rx and tx queues even if we don't want them */
242    int nb_rx_buf; /* The number of packet buffers in the rx ring */
243    int nb_tx_buf; /* The number of packet buffers in the tx ring */
244    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
245#if DPDK_USE_BLACKLIST
246    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
247        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
248#endif
249    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
250#if HAS_HW_TIMESTAMPS_82580
251    /* Timestamping only relevent to RX */
252    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
253    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
254    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
255#endif
256};
257
258enum dpdk_addt_hdr_flags {
259    INCLUDES_CHECKSUM = 0x1,
260    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
261};
262
263/**
264 * A structure placed in front of the packet where we can store
265 * additional information about the given packet.
266 * +--------------------------+
267 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
268 * +--------------------------+
269 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
270 * +--------------------------+
271 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
272 * +--------------------------+
273 * |   sizeof(dpdk_addt_hdr)  | 1 byte
274 * +--------------------------+
275 * *   hw_timestamp_82580     * 16 bytes Optional
276 * +--------------------------+
277 * |       Packet data        | Variable Size
278 * |                          |
279 */
280struct dpdk_addt_hdr {
281    uint64_t timestamp;
282    uint8_t flags;
283    uint8_t direction;
284    uint8_t reserved1;
285    uint8_t reserved2;
286    uint32_t cap_len; /* The size to say the capture is */
287};
288
289/**
290 * We want to blacklist all devices except those on the whitelist
291 * (I say list, but yes it is only the one).
292 *
293 * The default behaviour of rte_pci_probe() will map every possible device
294 * to its DPDK driver. The DPDK driver will take the ethernet device
295 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
296 *
297 * So blacklist all devices except the one that we wish to use so that
298 * the others can still be used as standard ethernet ports.
299 *
300 * @return 0 if successful, otherwise -1 on error.
301 */
302#if DPDK_USE_BLACKLIST
303static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
304{
305        struct rte_pci_device *dev = NULL;
306        format_data->nb_blacklist = 0;
307
308        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
309
310        TAILQ_FOREACH(dev, &device_list, next) {
311        if (whitelist != NULL && whitelist->domain == dev->addr.domain
312            && whitelist->bus == dev->addr.bus
313            && whitelist->devid == dev->addr.devid
314            && whitelist->function == dev->addr.function)
315            continue;
316                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
317                                / sizeof (format_data->blacklist[0])) {
318                        printf("Warning: too many devices to blacklist consider"
319                                        " increasing BLACK_LIST_SIZE");
320                        break;
321                }
322                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
323                ++format_data->nb_blacklist;
324        }
325
326        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
327        return 0;
328}
329#else /* DPDK_USE_BLACKLIST */
330#include <rte_devargs.h>
331static int blacklist_devices(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
332{
333        char pci_str[20] = {0};
334        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
335                 whitelist->domain,
336                 whitelist->bus,
337                 whitelist->devid,
338                 whitelist->function);
339        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
340                return -1;
341        }
342        return 0;
343}
344#endif
345
346/**
347 * Parse the URI format as a pci address
348 * Fills in addr, note core is optional and is unchanged if
349 * a value for it is not provided.
350 *
351 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
352 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
353 */
354static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
355    char * wrkstr;
356    char * pch;
357    assert(str);
358    wrkstr = strdup(str);
359   
360    pch = strtok(wrkstr,":");
361    if (pch == NULL || pch[0] == 0) {
362        free(wrkstr); return -1;
363    }
364    addr->domain = (uint16_t) atoi(pch);
365
366    pch = strtok(NULL,":");
367    if (pch == NULL || pch[0] == 0) {
368        free(wrkstr); return -1;
369    }
370    addr->bus = (uint8_t) atoi(pch);
371
372    pch = strtok(NULL,".");
373    if (pch == NULL || pch[0] == 0) {
374        free(wrkstr); return -1;
375    }
376    addr->devid = (uint8_t) atoi(pch);
377
378    pch = strtok(NULL,"-"); /* Might not find the '-' it's optional */
379    if (pch == NULL || pch[0] == 0) {
380        free(wrkstr); return -1;
381    }
382    addr->function = (uint8_t) atoi(pch);
383
384    pch = strtok(NULL, ""); /* Find end of string */
385   
386    if (pch != NULL && pch[0] != 0) {
387        *core = (long) atoi(pch);
388    }
389
390    free(wrkstr);
391    return 0;
392}
393
394#if DEBUG
395/* For debugging */
396static inline void dump_configuration()
397{
398    struct rte_config * global_config;
399    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
400   
401    if (nb_cpu <= 0) {
402        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
403        nb_cpu = 1; /* fallback to just 1 core */
404    }
405    if (nb_cpu > RTE_MAX_LCORE)
406        nb_cpu = RTE_MAX_LCORE;
407   
408    global_config = rte_eal_get_configuration();
409   
410    if (global_config != NULL) {
411        int i;
412        fprintf(stderr, "Intel DPDK setup\n"
413               "---Version      : %"PRIu32"\n"
414               "---Magic        : %"PRIu32"\n"
415               "---Master LCore : %"PRIu32"\n"
416               "---LCore Count  : %"PRIu32"\n",
417               global_config->version, global_config->magic, 
418               global_config->master_lcore, global_config->lcore_count);
419       
420        for (i = 0 ; i < nb_cpu; i++) {
421            fprintf(stderr, "   ---Core %d : %s\n", i, 
422                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
423        }
424       
425        const char * proc_type;
426        switch (global_config->process_type) {
427            case RTE_PROC_AUTO:
428                proc_type = "auto";
429                break;
430            case RTE_PROC_PRIMARY:
431                proc_type = "primary";
432                break;
433            case RTE_PROC_SECONDARY:
434                proc_type = "secondary";
435                break;
436            case RTE_PROC_INVALID:
437                proc_type = "invalid";
438                break;
439            default:
440                proc_type = "something worse than invalid!!";
441        }
442        fprintf(stderr, "---Process Type : %s\n", proc_type);
443    }
444   
445}
446#endif
447
448static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
449                                        char * err, int errlen) {
450    int ret; /* Returned error codes */
451    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
452    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
453    char mem_map[20] = {0}; /* The memory name */
454    long nb_cpu; /* The number of CPUs in the system */
455    long my_cpu; /* The CPU number we want to bind to */
456   
457#if DEBUG
458    rte_set_log_level(RTE_LOG_DEBUG);
459#else
460    rte_set_log_level(RTE_LOG_WARNING);
461#endif
462    /*
463     * Using unique file prefixes mean separate memory is used, unlinking
464     * the two processes. However be careful we still cannot access a
465     * port that already in use.
466     */
467    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
468                "--file-prefix", mem_map, "-m", "256", NULL};
469    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
470
471    /* This initialises the Environment Abstraction Layer (EAL)
472     * If we had slave workers these are put into WAITING state
473     *
474     * Basically binds this thread to a fixed core, which we choose as
475     * the last core on the machine (assuming fewer interrupts mapped here).
476     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
477     * "-n" the number of memory channels into the CPU (hardware specific)
478     *      - Most likely to be half the number of ram slots in your machine.
479     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
480     * Controls where in memory packets are stored and should spread across
481     * the channels. We just use 1 to be safe.
482     */
483
484    /* Get the number of cpu cores in the system and use the last core */
485    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
486    if (nb_cpu <= 0) {
487        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
488        nb_cpu = 1; /* fallback to the first core */
489    }
490    if (nb_cpu > RTE_MAX_LCORE)
491        nb_cpu = RTE_MAX_LCORE;
492
493    my_cpu = nb_cpu;
494    /* This allows the user to specify the core - we would try to do this
495     * automatically but it's hard to tell that this is secondary
496     * before running rte_eal_init(...). Currently we are limited to 1
497     * instance per core due to the way memory is allocated. */
498    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
499        snprintf(err, errlen, "Failed to parse URI");
500        return -1;
501    }
502
503    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
504                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
505
506    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
507        snprintf(err, errlen, 
508          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
509          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
510        return -1;
511    }
512
513    /* Make our mask */
514    snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
515
516
517        /* Give the memory map a unique name */
518        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
519    /* rte_eal_init it makes a call to getopt so we need to reset the
520     * global optind variable of getopt otherwise this fails */
521    optind = 1;
522    if ((ret = rte_eal_init(argc, argv)) < 0) {
523        snprintf(err, errlen, 
524          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
525        return -1;
526    }
527#if DEBUG
528    dump_configuration();
529#endif
530
531#if DPDK_USE_PMD_INIT
532    /* This registers all available NICs with Intel DPDK
533     * These are not loaded until rte_eal_pci_probe() is called.
534     */
535    if ((ret = rte_pmd_init_all()) < 0) {
536        snprintf(err, errlen, 
537          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
538        return -1;
539    }
540#endif
541
542    /* Blacklist all ports besides the one that we want to use */
543        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
544                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
545                         " are you sure the address is correct?: %s", strerror(-ret));
546                return -1;
547        }
548
549    /* This loads DPDK drivers against all ports that are not blacklisted */
550        if ((ret = rte_eal_pci_probe()) < 0) {
551        snprintf(err, errlen, 
552            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
553        return -1;
554    }
555
556    format_data->nb_ports = rte_eth_dev_count();
557
558    if (format_data->nb_ports != 1) {
559        snprintf(err, errlen, 
560            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
561            format_data->nb_ports);
562        return -1;
563    }
564
565    return 0;
566}
567
568static int dpdk_init_input (libtrace_t *libtrace) {
569    char err[500];
570    err[0] = 0;
571   
572    libtrace->format_data = (struct dpdk_format_data_t *)
573                            malloc(sizeof(struct dpdk_format_data_t));
574    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
575    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
576    FORMAT(libtrace)->nb_ports = 0;
577    FORMAT(libtrace)->snaplen = 0; /* Use default */
578    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
579    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
580    FORMAT(libtrace)->promisc = -1;
581    FORMAT(libtrace)->pktmbuf_pool = NULL;
582#if DPDK_USE_BLACKLIST
583    FORMAT(libtrace)->nb_blacklist = 0;
584#endif
585    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
586    FORMAT(libtrace)->mempool_name[0] = 0;
587#if HAS_HW_TIMESTAMPS_82580
588    FORMAT(libtrace)->ts_first_sys = 0;
589    FORMAT(libtrace)->ts_last_sys = 0;
590    FORMAT(libtrace)->wrap_count = 0;
591#endif
592
593    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
594        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
595        free(libtrace->format_data);
596        libtrace->format_data = NULL;
597        return -1;
598    }
599    return 0;
600};
601
602static int dpdk_init_output(libtrace_out_t *libtrace)
603{
604    char err[500];
605    err[0] = 0;
606   
607    libtrace->format_data = (struct dpdk_format_data_t *)
608                            malloc(sizeof(struct dpdk_format_data_t));
609    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
610    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
611    FORMAT(libtrace)->nb_ports = 0;
612    FORMAT(libtrace)->snaplen = 0; /* Use default */
613    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
614    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
615    FORMAT(libtrace)->promisc = -1;
616    FORMAT(libtrace)->pktmbuf_pool = NULL;
617#if DPDK_USE_BLACKLIST
618    FORMAT(libtrace)->nb_blacklist = 0;
619#endif
620    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
621    FORMAT(libtrace)->mempool_name[0] = 0;
622#if HAS_HW_TIMESTAMPS_82580
623    FORMAT(libtrace)->ts_first_sys = 0;
624    FORMAT(libtrace)->ts_last_sys = 0;
625    FORMAT(libtrace)->wrap_count = 0;
626#endif
627
628    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
629        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
630        free(libtrace->format_data);
631        libtrace->format_data = NULL;
632        return -1;
633    }
634    return 0;
635};
636
637/**
638 * Note here snaplen excludes the MAC checksum. Packets over
639 * the requested snaplen will be dropped. (Excluding MAC checksum)
640 *
641 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
642 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
643 * is set the maximum size of the returned packet would be 1518 otherwise
644 * 1514 would be the largest size possibly returned.
645 *
646 */
647static int dpdk_config_input (libtrace_t *libtrace,
648                                        trace_option_t option,
649                                        void *data) {
650    switch (option) {
651        case TRACE_OPTION_SNAPLEN:
652            /* Only support changing snaplen before a call to start is
653             * made */
654            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
655                FORMAT(libtrace)->snaplen=*(int*)data;
656            else
657                return -1;
658            return 0;
659                case TRACE_OPTION_PROMISC:
660                        FORMAT(libtrace)->promisc=*(int*)data;
661            return 0;
662        case TRACE_OPTION_FILTER:
663            /* TODO filtering */
664            break;
665        case TRACE_OPTION_META_FREQ:
666            break;
667        case TRACE_OPTION_EVENT_REALTIME:
668            break;
669        /* Avoid default: so that future options will cause a warning
670         * here to remind us to implement it, or flag it as
671         * unimplementable
672         */
673    }
674
675        /* Don't set an error - trace_config will try to deal with the
676         * option and will set an error if it fails */
677    return -1;
678}
679
680/* Can set jumbo frames/ or limit the size of a frame by setting both
681 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
682 *
683 */
684static struct rte_eth_conf port_conf = {
685        .rxmode = {
686                .split_hdr_size = 0,
687                .header_split   = 0, /**< Header Split disabled */
688                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
689                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
690                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
691        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
692#if GET_MAC_CRC_CHECKSUM
693/* So it appears that if hw_strip_crc is turned off the driver will still
694 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
695 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
696 * So lets just add it back on when we receive the packet.
697 */
698                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
699#else
700/* By default strip the MAC checksum because it's a bit of a hack to
701 * actually read these. And don't want to rely on disabling this to actualy
702 * always cut off the checksum in the future
703 */
704        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
705#endif
706        },
707        .txmode = {
708                .mq_mode = ETH_DCB_NONE,
709        },
710};
711
712static const struct rte_eth_rxconf rx_conf = {
713        .rx_thresh = {
714                .pthresh = 8,/* RX_PTHRESH prefetch */
715                .hthresh = 8,/* RX_HTHRESH host */
716                .wthresh = 4,/* RX_WTHRESH writeback */
717        },
718    .rx_free_thresh = 0,
719    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
720};
721
722static const struct rte_eth_txconf tx_conf = {
723        .tx_thresh = {
724        /**
725         * TX_PTHRESH prefetch
726         * Set on the NIC, if the number of unprocessed descriptors to queued on
727         * the card fall below this try grab at least hthresh more unprocessed
728         * descriptors.
729         */
730                .pthresh = 36,
731
732        /* TX_HTHRESH host
733         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
734         */
735                .hthresh = 0,
736       
737        /* TX_WTHRESH writeback
738         * Set on the NIC, the number of sent descriptors before writing back
739         * status to confirm the transmission. This is done more efficiently as
740         * a bulk DMA-transfer rather than writing one at a time.
741         * Similar to tx_free_thresh however this is applied to the NIC, where
742         * as tx_free_thresh is when DPDK will check these. This is extended
743         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
744         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
745         */
746                .wthresh = 4,
747        },
748
749    /* Used internally by DPDK rather than passed to the NIC. The number of
750     * packet descriptors to send before checking for any responses written
751     * back (to confirm the transmission). Default = 32 if set to 0)
752     */
753        .tx_free_thresh = 0,
754
755    /* This is the Report Status threshold, used by 10Gbit cards,
756     * This signals the card to only write back status (such as
757     * transmission successful) after this minimum number of transmit
758     * descriptors are seen. The default is 32 (if set to 0) however if set
759     * to greater than 1 TX wthresh must be set to zero, because this is kindof
760     * a replacement. See the dpdk programmers guide for more restrictions.
761     */
762        .tx_rs_thresh = 1,
763};
764
765/* Attach memory to the port and start the port or restart the port.
766 */
767static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
768    int ret; /* Check return values for errors */
769    struct rte_eth_link link_info; /* Wait for link */
770   
771    /* Already started */
772    if (format_data->paused == DPDK_RUNNING)
773        return 0;
774
775    /* First time started we need to alloc our memory, doing this here
776     * rather than in environment setup because we don't have snaplen then */
777    if (format_data->paused == DPDK_NEVER_STARTED) {
778        if (format_data->snaplen == 0) {
779            format_data->snaplen = RX_MBUF_SIZE;
780            port_conf.rxmode.jumbo_frame = 0;
781            port_conf.rxmode.max_rx_pkt_len = 0;
782        } else {
783            /* Use jumbo frames */
784            port_conf.rxmode.jumbo_frame = 1;
785            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
786        }
787
788        /* This is additional overhead so make sure we allow space for this */
789#if GET_MAC_CRC_CHECKSUM
790        format_data->snaplen += ETHER_CRC_LEN;
791#endif
792#if HAS_HW_TIMESTAMPS_82580
793        format_data->snaplen += sizeof(struct hw_timestamp_82580);
794#endif
795
796        /* Create the mbuf pool, which is the place our packets are allocated
797         * from - TODO figure out if there is is a free function (I cannot see one)
798         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
799         * allocate however that extra 1 packet is not used.
800         * (I assume <= vs < error some where in DPDK code)
801         * TX requires nb_tx_buffers + 1 in the case the queue is full
802         * so that will fill the new buffer and wait until slots in the
803         * ring become available.
804         */
805#if DEBUG
806    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
807#endif
808        format_data->pktmbuf_pool =
809            rte_mempool_create(format_data->mempool_name,
810                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
811                       format_data->snaplen + sizeof(struct rte_mbuf) 
812                                        + RTE_PKTMBUF_HEADROOM,
813                       8, sizeof(struct rte_pktmbuf_pool_private),
814                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
815                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
816
817        if (format_data->pktmbuf_pool == NULL) {
818            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
819                        "pool failed: %s", strerror(rte_errno));
820            return -1;
821        }
822    }
823   
824    /* ----------- Now do the setup for the port mapping ------------ */
825    /* Order of calls must be
826     * rte_eth_dev_configure()
827     * rte_eth_tx_queue_setup()
828     * rte_eth_rx_queue_setup()
829     * rte_eth_dev_start()
830     * other rte_eth calls
831     */
832   
833    /* This must be called first before another *eth* function
834     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
835    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
836    if (ret < 0) {
837        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
838                            " %"PRIu8" : %s", format_data->port,
839                            strerror(-ret));
840        return -1;
841    }
842    /* Initialise the TX queue a minimum value if using this port for
843     * receiving. Otherwise a larger size if writing packets.
844     */
845    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
846                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
847    if (ret < 0) {
848        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
849                            " %"PRIu8" : %s", format_data->port,
850                            strerror(-ret));
851        return -1;
852    }
853    /* Initialise the RX queue with some packets from memory */
854    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
855                            format_data->nb_rx_buf, SOCKET_ID_ANY, 
856                            &rx_conf, format_data->pktmbuf_pool);
857    if (ret < 0) {
858        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
859                    " %"PRIu8" : %s", format_data->port,
860                    strerror(-ret));
861        return -1;
862    }
863   
864    /* Start device */
865    ret = rte_eth_dev_start(format_data->port);
866    if (ret < 0) {
867        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
868                    strerror(-ret));
869        return -1;
870    }
871
872    /* Default promiscuous to on */
873    if (format_data->promisc == -1)
874        format_data->promisc = 1;
875   
876    if (format_data->promisc == 1)
877        rte_eth_promiscuous_enable(format_data->port);
878    else
879        rte_eth_promiscuous_disable(format_data->port);
880   
881    /* Wait for the link to come up */
882    rte_eth_link_get(format_data->port, &link_info);
883#if DEBUG
884    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
885            (int) link_info.link_duplex, (int) link_info.link_speed);
886#endif
887
888    /* We have now successfully started/unpaused */
889    format_data->paused = DPDK_RUNNING;
890   
891    return 0;
892}
893
894static int dpdk_start_input (libtrace_t *libtrace) {
895    char err[500];
896    err[0] = 0;
897
898    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
899        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
900        free(libtrace->format_data);
901        libtrace->format_data = NULL;
902        return -1;
903    }
904    return 0;
905}
906
907static int dpdk_start_output(libtrace_out_t *libtrace)
908{
909    char err[500];
910    err[0] = 0;
911   
912    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
913        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
914        free(libtrace->format_data);
915        libtrace->format_data = NULL;
916        return -1;
917    }
918    return 0;
919}
920
921static int dpdk_pause_input(libtrace_t * libtrace){
922    /* This stops the device, but can be restarted using rte_eth_dev_start() */
923    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
924#if DEBUG     
925        fprintf(stderr, "Pausing port\n");
926#endif
927        rte_eth_dev_stop(FORMAT(libtrace)->port);
928        FORMAT(libtrace)->paused = DPDK_PAUSED;
929        /* If we pause it the driver will be reset and likely our counter */
930#if HAS_HW_TIMESTAMPS_82580
931        FORMAT(libtrace)->ts_first_sys = 0;
932        FORMAT(libtrace)->ts_last_sys = 0;
933#endif
934    }
935    return 0;
936}
937
938static int dpdk_write_packet(libtrace_out_t *trace, 
939                libtrace_packet_t *packet){
940    struct rte_mbuf* m_buff[1];
941   
942    int wirelen = trace_get_wire_length(packet);
943    int caplen = trace_get_capture_length(packet);
944   
945    /* Check for a checksum and remove it */
946    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
947                                            wirelen == caplen)
948        caplen -= ETHER_CRC_LEN;
949
950    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
951    if (m_buff[0] == NULL) {
952        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
953        return -1;
954    } else {
955        int ret;
956        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
957        do {
958            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
959        } while (ret != 1);
960    }
961
962    return 0;
963}
964
965static int dpdk_fin_input(libtrace_t * libtrace) {
966    /* Free our memory structures */
967    if (libtrace->format_data != NULL) {
968        /* Close the device completely, device cannot be restarted */
969        if (FORMAT(libtrace)->port != 0xFF)
970            rte_eth_dev_close(FORMAT(libtrace)->port);
971        /* filter here if we used it */
972                free(libtrace->format_data);
973        }
974
975    /* Revert to the original PCI drivers */
976    /* No longer in DPDK
977    rte_eal_pci_exit(); */
978    return 0;
979}
980
981
982static int dpdk_fin_output(libtrace_out_t * libtrace) {
983    /* Free our memory structures */
984    if (libtrace->format_data != NULL) {
985        /* Close the device completely, device cannot be restarted */
986        if (FORMAT(libtrace)->port != 0xFF)
987            rte_eth_dev_close(FORMAT(libtrace)->port);
988        /* filter here if we used it */
989                free(libtrace->format_data);
990        }
991
992    /* Revert to the original PCI drivers */
993    /* No longer in DPDK
994    rte_eal_pci_exit(); */
995    return 0;
996}
997
998/**
999 * Get the start of additional header that we added to a packet.
1000 */
1001static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1002    uint8_t *hdrsize;
1003    assert(packet);
1004    assert(packet->buffer);
1005    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
1006    /* The byte before the original packet data denotes the size in bytes
1007     * of our additional header that we added sits before the 'size byte' */
1008    hdrsize--;
1009    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
1010}
1011
1012static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1013    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1014    return hdr->cap_len;
1015}
1016
1017static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1018    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1019    if (size > hdr->cap_len) {
1020        /* Cannot make a packet bigger */
1021                return trace_get_capture_length(packet);
1022        }
1023
1024    /* Reset the cached capture length first*/
1025    packet->capture_length = -1;
1026    hdr->cap_len = (uint32_t) size;
1027        return trace_get_capture_length(packet);
1028}
1029
1030static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1031    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1032    int org_cap_size; /* The original capture size */
1033    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1034        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1035                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
1036                            sizeof(struct hw_timestamp_82580);
1037    } else {
1038        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1039                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
1040    }
1041    if (hdr->flags & INCLUDES_CHECKSUM) {
1042        return org_cap_size;
1043    } else {
1044        /* DPDK packets are always TRACE_TYPE_ETH packets */
1045        return org_cap_size + ETHER_CRC_LEN;
1046    }
1047}
1048static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1049    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1050    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1051        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1052                sizeof(struct hw_timestamp_82580);
1053    else
1054        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1055}
1056
1057static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1058                libtrace_packet_t *packet, void *buffer,
1059                libtrace_rt_types_t rt_type, uint32_t flags) {
1060    assert(packet);
1061    if (packet->buffer != buffer &&
1062        packet->buf_control == TRACE_CTRL_PACKET) {
1063        free(packet->buffer);
1064    }
1065
1066    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1067        packet->buf_control = TRACE_CTRL_PACKET;
1068    } else
1069        packet->buf_control = TRACE_CTRL_EXTERNAL;
1070
1071    packet->buffer = buffer;
1072    packet->header = buffer;
1073
1074    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1075    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1076    packet->type = rt_type;
1077    return 0;
1078}
1079
1080/*
1081 * Does any extra preperation to a captured packet.
1082 * This includes adding our extra header to it with the timestamp
1083 */
1084static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1085                                                        struct rte_mbuf* pkt){
1086    uint8_t * hdr_size;
1087    struct dpdk_addt_hdr *hdr;
1088#if HAS_HW_TIMESTAMPS_82580
1089    struct hw_timestamp_82580 *hw_ts;
1090    struct timeval cur_sys_time;
1091    uint64_t cur_sys_time_ns;
1092    uint64_t estimated_wraps;
1093   
1094    /* Using gettimeofday because it's most likely to be a vsyscall
1095     * We don't want to slow down anything with systemcalls we dont need
1096     * accauracy */
1097    gettimeofday(&cur_sys_time, NULL);
1098#else
1099# if USE_CLOCK_GETTIME
1100    struct timespec cur_sys_time;
1101   
1102    /* This looks terrible and I feel bad doing it. But it's OK
1103     * on new kernels, because this is a vsyscall */
1104    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1105# else
1106    struct timeval cur_sys_time;
1107    /* Should be a vsyscall */
1108    gettimeofday(&cur_sys_time, NULL);
1109# endif
1110#endif
1111
1112    /* Record the size of our header */
1113    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1114    *hdr_size = sizeof(struct dpdk_addt_hdr);
1115    /* Now put our header in front of that size */
1116    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1117    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1118   
1119#if GET_MAC_CRC_CHECKSUM
1120    /* Add back in the CRC sum */
1121    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1122    pkt->pkt.data_len += ETHER_CRC_LEN;
1123    hdr->flags |= INCLUDES_CHECKSUM;
1124#endif
1125
1126#if HAS_HW_TIMESTAMPS_82580
1127    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1128     *
1129     *        +----------+---+   +--------------+
1130     *  82580 |    24    | 8 |   |      32      |
1131     *        +----------+---+   +--------------+
1132     *          reserved  \______ 40 bits _____/
1133     *
1134     * The 40 bit 82580 SYSTIM overflows every
1135     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1136     *
1137     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1138     * Endian (for the full 64 bits) i.e. picture is mirrored
1139     */
1140   
1141    /* The timestamp is sitting before our packet and is included in pkt_len */
1142    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1143    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1144   
1145    /* Despite what the documentation says this is in Little
1146     * Endian byteorder. Mask the reserved section out.
1147     */
1148    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1149                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1150               
1151    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1152    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1153        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1154        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1155    }
1156   
1157    /* This will have serious problems if packets aren't read quickly
1158     * that is within a couple of seconds because our clock cycles every
1159     * 18 seconds */
1160    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1161                            / (1ull<<TS_NBITS_82580);
1162   
1163    /* Estimated_wraps gives the number of times the counter should have
1164     * wrapped (however depending on value last time it could have wrapped
1165     * twice more (if hw clock is close to its max value) or once less (allowing
1166     * for a bit of variance between hw and sys clock). But if the clock
1167     * shouldn't have wrapped once then don't allow it to go backwards in time */
1168    if (unlikely(estimated_wraps >= 2)) {
1169        /* 2 or more wrap arounds add all but the very last wrap */
1170        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1171    }
1172   
1173    /* Set the timestamp to the lowest possible value we're considering */
1174    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1175                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1176   
1177    /* In most runs only the first if() will need evaluating - i.e our
1178     * estimate is correct. */
1179    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1180                                hdr->timestamp, MAXSKEW_82580))) {
1181        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1182        FORMAT(libtrace)->wrap_count++;
1183        hdr->timestamp += (1ull<<TS_NBITS_82580);
1184        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1185                                hdr->timestamp, MAXSKEW_82580)) {
1186            /* Failed to match estimated_wraps */
1187            FORMAT(libtrace)->wrap_count++;
1188            hdr->timestamp += (1ull<<TS_NBITS_82580);
1189            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1190                                hdr->timestamp, MAXSKEW_82580)) {
1191                if (estimated_wraps == 0) {
1192                    /* 0 case Failed to match estimated_wraps+2 */
1193                    printf("WARNING - Hardware Timestamp failed to"
1194                                            " match using systemtime!\n");
1195                    hdr->timestamp = cur_sys_time_ns;
1196                } else {
1197                    /* Failed to match estimated_wraps+1 */
1198                    FORMAT(libtrace)->wrap_count++;
1199                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1200                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1201                                hdr->timestamp, MAXSKEW_82580)) {
1202                        /* Failed to match estimated_wraps+2 */
1203                        printf("WARNING - Hardware Timestamp failed to"
1204                                            " match using systemtime!!\n");
1205                    }
1206                }
1207            }
1208        }
1209    }
1210
1211    /* Log our previous for the next loop */
1212    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1213
1214#else
1215# if USE_CLOCK_GETTIME
1216    hdr->timestamp = TS_TO_NS(cur_sys_time);
1217# else
1218    hdr->timestamp = TV_TO_NS(cur_sys_time);
1219# endif
1220#endif
1221
1222    /* Intels samples prefetch into level 0 cache lets assume it is a good
1223     * idea and do the same */
1224    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1225    packet->buffer = pkt;
1226    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1227
1228    /* Set our capture length for the first time */
1229    hdr->cap_len = dpdk_get_wire_length(packet);
1230    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1231        hdr->cap_len -= ETHER_CRC_LEN;
1232    }
1233   
1234
1235    return dpdk_get_framing_length(packet) +
1236                        dpdk_get_capture_length(packet);
1237}
1238
1239static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1240    int nb_rx; /* Number of rx packets we've recevied */
1241    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1242
1243    /* Free the last packet buffer */
1244    if (packet->buffer != NULL) {
1245        /* Buffer is owned by DPDK */
1246        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1247            rte_pktmbuf_free(packet->buffer);
1248            packet->buffer = NULL;
1249        } else
1250        /* Buffer is owned by packet i.e. has been malloc'd */
1251        if (packet->buf_control == TRACE_CTRL_PACKET) {
1252            free(packet->buffer);
1253            packet->buffer = NULL;
1254        }
1255    }
1256   
1257    packet->buf_control = TRACE_CTRL_EXTERNAL;
1258    packet->type = TRACE_RT_DATA_DPDK;
1259   
1260    /* Wait for a packet */
1261    while (1) {
1262        /* Poll for a single packet */
1263        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1264                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1265        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1266            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1267        }
1268    }
1269   
1270    /* We'll never get here - but if we did it would be bad */
1271    return -1;
1272}
1273
1274static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1275    struct timeval tv;
1276    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1277   
1278    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1279    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1280    return tv;
1281}
1282
1283static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1284    struct timespec ts;
1285    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1286   
1287    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1288    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1289    return ts;
1290}
1291
1292static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1293    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1294}
1295
1296static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1297    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1298    return (libtrace_direction_t) hdr->direction;
1299}
1300
1301static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1302    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1303    hdr->direction = (uint8_t) direction;
1304    return (libtrace_direction_t) hdr->direction;
1305}
1306
1307/*
1308 * NOTE: Drops could occur for other reasons than running out of buffer
1309 * space. Such as failed MAC checksums and oversized packets.
1310 */
1311static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1312    struct rte_eth_stats stats = {0};
1313   
1314    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1315        return UINT64_MAX;
1316    /* Grab the current stats */
1317    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1318   
1319    /* Get the drop counter */
1320    return (uint64_t) stats.ierrors;
1321}
1322
1323static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1324    struct rte_eth_stats stats = {0};
1325   
1326    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1327        return UINT64_MAX;
1328    /* Grab the current stats */
1329    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1330   
1331    /* Get the drop counter */
1332    return (uint64_t) stats.ipackets;
1333}
1334
1335/*
1336 * This is the number of packets filtered by the NIC
1337 * and maybe ahead of number read using libtrace.
1338 *
1339 * XXX we are yet to implement any filtering, but if it was this should
1340 * get the result. So this will just return 0 for now.
1341 */
1342static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1343    struct rte_eth_stats stats = {0};
1344   
1345    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1346        return UINT64_MAX;
1347    /* Grab the current stats */
1348    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1349   
1350    /* Get the drop counter */
1351    return (uint64_t) stats.fdirmiss;
1352}
1353
1354/* Attempts to read a packet in a non-blocking fashion. If one is not
1355 * available a SLEEP event is returned. We do not have the ability to
1356 * create a select()able file descriptor in DPDK.
1357 */
1358static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1359                                        libtrace_packet_t *packet) {
1360    libtrace_eventobj_t event = {0,0,0.0,0};
1361    int nb_rx; /* Number of receive packets we've read */
1362    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1363   
1364    do {
1365   
1366        /* See if we already have a packet waiting */
1367        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1368                        FORMAT(trace)->queue_id, pkts_burst, 1);
1369       
1370        if (nb_rx > 0) {
1371            /* Free the last packet buffer */
1372            if (packet->buffer != NULL) {
1373                /* Buffer is owned by DPDK */
1374                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1375                    rte_pktmbuf_free(packet->buffer);
1376                    packet->buffer = NULL;
1377                } else
1378                /* Buffer is owned by packet i.e. has been malloc'd */
1379                if (packet->buf_control == TRACE_CTRL_PACKET) {
1380                    free(packet->buffer);
1381                    packet->buffer = NULL;
1382                }
1383            }
1384           
1385            packet->buf_control = TRACE_CTRL_EXTERNAL;
1386            packet->type = TRACE_RT_DATA_DPDK;
1387            event.type = TRACE_EVENT_PACKET;
1388            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1389           
1390            /* XXX - Check this passes the filter trace_read_packet normally
1391             * does this for us but this wont */
1392            if (trace->filter) {
1393                if (!trace_apply_filter(trace->filter, packet)) {
1394                    /* Failed the filter so we loop for another packet */
1395                    trace->filtered_packets ++;
1396                    continue;
1397                }
1398            }
1399            trace->accepted_packets ++;
1400        } else {
1401            /* We only want to sleep for a very short time - we are non-blocking */
1402            event.type = TRACE_EVENT_SLEEP;
1403            event.seconds = 0.0001;
1404            event.size = 0;
1405        }
1406       
1407        /* If we get here we have our event */
1408        break;
1409    } while (1);
1410
1411    return event;
1412}
1413
1414
1415static void dpdk_help(void) {
1416    printf("dpdk format module: $Revision: 1752 $\n");
1417    printf("Supported input URIs:\n");
1418    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1419    printf("\tThe -<coreid> is optional \n");
1420    printf("\t e.g. dpdk:0000:01:00.1\n");
1421    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1422    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1423    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1424    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1425    printf("\n");
1426    printf("Supported output URIs:\n");
1427    printf("\tSame format as the input URI.\n");
1428    printf("\t e.g. dpdk:0000:01:00.1\n");
1429    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1430    printf("\n");
1431}
1432
1433 static struct libtrace_format_t dpdk = {
1434        "dpdk",
1435        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1436        TRACE_FORMAT_DPDK,
1437        NULL,                   /* probe filename */
1438        NULL,                               /* probe magic */
1439        dpdk_init_input,            /* init_input */
1440        dpdk_config_input,          /* config_input */
1441        dpdk_start_input,           /* start_input */
1442        dpdk_pause_input,           /* pause_input */
1443        dpdk_init_output,           /* init_output */
1444        NULL,                               /* config_output */
1445        dpdk_start_output,          /* start_ouput */
1446        dpdk_fin_input,             /* fin_input */
1447        dpdk_fin_output,        /* fin_output */
1448        dpdk_read_packet,           /* read_packet */
1449        dpdk_prepare_packet,    /* prepare_packet */
1450        NULL,                               /* fin_packet */
1451        dpdk_write_packet,          /* write_packet */
1452        dpdk_get_link_type,         /* get_link_type */
1453        dpdk_get_direction,         /* get_direction */
1454        dpdk_set_direction,         /* set_direction */
1455        NULL,                               /* get_erf_timestamp */
1456        dpdk_get_timeval,           /* get_timeval */
1457        dpdk_get_timespec,          /* get_timespec */
1458        NULL,                               /* get_seconds */
1459        NULL,                               /* seek_erf */
1460        NULL,                               /* seek_timeval */
1461        NULL,                               /* seek_seconds */
1462        dpdk_get_capture_length,/* get_capture_length */
1463        dpdk_get_wire_length,   /* get_wire_length */
1464        dpdk_get_framing_length,/* get_framing_length */
1465        dpdk_set_capture_length,/* set_capture_length */
1466        NULL,                               /* get_received_packets */
1467        dpdk_get_filtered_packets,/* get_filtered_packets */
1468        dpdk_get_dropped_packets,/* get_dropped_packets */
1469    dpdk_get_captured_packets,/* get_captured_packets */
1470        NULL,                       /* get_fd */
1471        dpdk_trace_event,               /* trace_event */
1472    dpdk_help,              /* help */
1473        NULL
1474};
1475
1476void dpdk_constructor(void) {
1477        register_format(&dpdk);
1478}
Note: See TracBrowser for help on using the repository browser.