source: lib/format_dpdk.c @ 17f954f

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivegetfragoffhelplibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 17f954f was 2138553, checked in by Richard Sanger <rjs51@…>, 7 years ago

Updates the dpdk build system to support the lastest DPDK libraries v1.5 and above
This drops support for anything before v1.5 due to the requirment to patch the build system.
Hopefully this new way will continue to work with newer releases

This removes the need to patch the DPDK library
Instead the only requirement is that DPDK is built as a static library with the following options
CONFIG_RTE_BUILD_COMBINE_LIBS=y EXTRA_CFLAGS="-fPIC added to the make command

-This line, and those below, will be ignored--

M configure.in
D Intel DPDK Patches/DPDKLibtracePatch.patch
M Intel DPDK Patches/README
M lib/format_dpdk.c
A lib/dpdk_libtrace.mk
M lib/Makefile.am

  • Property mode set to 100644
File size: 48.0 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#include "config.h"
44#include "libtrace.h"
45#include "libtrace_int.h"
46#include "format_helper.h"
47#include "libtrace_arphrd.h"
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
55#include <stdlib.h>
56#include <assert.h>
57#include <unistd.h>
58#include <endian.h>
59#include <rte_eal.h>
60#include <rte_per_lcore.h>
61#include <rte_debug.h>
62#include <rte_errno.h>
63#include <rte_common.h>
64#include <rte_log.h>
65#include <rte_memcpy.h>
66#include <rte_prefetch.h>
67#include <rte_branch_prediction.h>
68#include <rte_pci.h>
69#include <rte_ether.h>
70#include <rte_ethdev.h>
71#include <rte_ring.h>
72#include <rte_mempool.h>
73#include <rte_mbuf.h>
74
75/* The default size of memory buffers to use - This is the max size of standard
76 * ethernet packet less the size of the MAC CHECKSUM */
77#define RX_MBUF_SIZE 1514
78
79/* The minimum number of memory buffers per queue tx or rx. Search for
80 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
81 */
82#define MIN_NB_BUF 64
83
84/* Number of receive memory buffers to use
85 * By default this is limited by driver to 4k and must be a multiple of 128.
86 * A modification can be made to the driver to remove this limit.
87 * This can be increased in the driver and here.
88 * Should be at least MIN_NB_BUF.
89 */
90#define NB_RX_MBUF 4096
91
92/* Number of send memory buffers to use.
93 * Same limits apply as those to NB_TX_MBUF.
94 */
95#define NB_TX_MBUF 1024
96
97/* The size of the PCI blacklist needs to be big enough to contain
98 * every PCI device address (listed by lspci every bus:device.function tuple).
99 */
100#define BLACK_LIST_SIZE 50
101
102/* The maximum number of characters the mempool name can be */
103#define MEMPOOL_NAME_LEN 20
104
105#define MBUF(x) ((struct rte_mbuf *) x)
106/* Get the original placement of the packet data */
107#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
108#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
109#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
110                        (uint64_t) tv.tv_usec*1000ull)
111#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
112                        (uint64_t) ts.tv_nsec)
113
114#if RTE_PKTMBUF_HEADROOM != 128
115#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
116         "any libtrace instance processing these packet must be have the" \
117         "same RTE_PKTMBUF_HEADROOM set"
118#endif
119
120/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
121 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
122 *
123 * Make sure you understand what these are doing before enabling them.
124 * They might make traces incompatable with other builds etc.
125 *
126 * These are also included to show how to do somethings which aren't
127 * obvious in the DPDK documentation.
128 */
129
130/* Print verbose messages to stdout */
131#define DEBUG 0
132
133/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
134 * only turn on if you know clock_gettime is a vsyscall on your system
135 * overwise could be a large overhead. Again gettimeofday() should be
136 * vsyscall also if it's not you should seriously consider updating your
137 * kernel.
138 */
139#ifdef HAVE_LIBRT
140/* You can turn this on (set to 1) to prefer clock_gettime */
141#define USE_CLOCK_GETTIME 0
142#else
143/* DONT CHANGE THIS !!! */
144#define USE_CLOCK_GETTIME 0
145#endif
146
147/* This is fairly safe to turn on - currently there appears to be a 'bug'
148 * in DPDK that will remove the checksum by making the packet appear 4bytes
149 * smaller than what it really is. Most formats don't include the checksum
150 * hence writing out a port such as int: ring: and dpdk: assumes there
151 * is no checksum and will attempt to write the checksum as part of the
152 * packet
153 */
154#define GET_MAC_CRC_CHECKSUM 0
155
156/* This requires a modification of the pmd drivers (inside Intel DPDK)
157 */
158#define HAS_HW_TIMESTAMPS_82580 0
159
160#if HAS_HW_TIMESTAMPS_82580
161# define TS_NBITS_82580     40
162/* The maximum on the +ve or -ve side that we can be, make it half way */
163# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
164#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
165#endif
166
167/* As per Intel 82580 specification - mismatch in 82580 datasheet
168 * it states ts is stored in Big Endian, however its actually Little */
169struct hw_timestamp_82580 {
170    uint64_t reserved;
171    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
172};
173
174enum paused_state {
175    DPDK_NEVER_STARTED,
176    DPDK_RUNNING,
177    DPDK_PAUSED,
178};
179
180/* Used by both input and output however some fields are not used
181 * for output */
182struct dpdk_format_data_t {
183    int8_t promisc; /* promiscuous mode - RX only */
184    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
185    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
186    uint8_t paused; /* See paused_state */ 
187    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
188    int snaplen; /* The snap length for the capture - RX only */
189    /* We always have to setup both rx and tx queues even if we don't want them */
190    int nb_rx_buf; /* The number of packet buffers in the rx ring */
191    int nb_tx_buf; /* The number of packet buffers in the tx ring */
192    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
193    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
194    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
195    unsigned int nb_blacklist; /* Number of blacklist items in are valid */
196#if HAS_HW_TIMESTAMPS_82580
197    /* Timestamping only relevent to RX */
198    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
199    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
200    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
201#endif
202};
203
204enum dpdk_addt_hdr_flags {
205    INCLUDES_CHECKSUM = 0x1,
206    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
207};
208
209/**
210 * A structure placed in front of the packet where we can store
211 * additional information about the given packet.
212 * +--------------------------+
213 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
214 * +--------------------------+
215 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
216 * +--------------------------+
217 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
218 * +--------------------------+
219 * |   sizeof(dpdk_addt_hdr)  | 1 byte
220 * +--------------------------+
221 * *   hw_timestamp_82580     * 16 bytes Optional
222 * +--------------------------+
223 * |       Packet data        | Variable Size
224 * |                          |
225 */
226struct dpdk_addt_hdr {
227    uint64_t timestamp;
228    uint8_t flags;
229    uint8_t direction;
230    uint8_t reserved1;
231    uint8_t reserved2;
232    uint32_t cap_len; /* The size to say the capture is */
233};
234
235/**
236 * We want to blacklist all devices except those on the whitelist
237 * (I say list, but yes it is only the one).
238 *
239 * The default behaviour of rte_pci_probe() will map every possible device
240 * to its DPDK driver. The DPDK driver will take the ethernet device
241 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
242 *
243 * So blacklist all devices except the one that we wish to use so that
244 * the others can still be used as standard ethernet ports.
245 */
246static void blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
247{
248        struct rte_pci_device *dev = NULL;
249        format_data->nb_blacklist = 0;
250
251        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
252
253        TAILQ_FOREACH(dev, &device_list, next) {
254        if (whitelist != NULL && whitelist->domain == dev->addr.domain
255            && whitelist->bus == dev->addr.bus
256            && whitelist->devid == dev->addr.devid
257            && whitelist->function == dev->addr.function)
258            continue;
259                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
260                                / sizeof (format_data->blacklist[0])) {
261                        printf("Warning: too many devices to blacklist consider"
262                                        " increasing BLACK_LIST_SIZE");
263                        break;
264                }
265                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
266                ++format_data->nb_blacklist;
267        }
268
269        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
270}
271
272/**
273 * Parse the URI format as a pci address
274 * Fills in addr, note core is optional and is unchanged if
275 * a value for it is not provided.
276 *
277 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
278 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
279 */
280static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
281    char * wrkstr;
282    char * pch;
283    assert(str);
284    wrkstr = strdup(str);
285   
286    pch = strtok(wrkstr,":");
287    if (pch == NULL || pch[0] == 0) {
288        free(wrkstr); return -1;
289    }
290    addr->domain = (uint16_t) atoi(pch);
291
292    pch = strtok(NULL,":");
293    if (pch == NULL || pch[0] == 0) {
294        free(wrkstr); return -1;
295    }
296    addr->bus = (uint8_t) atoi(pch);
297
298    pch = strtok(NULL,".");
299    if (pch == NULL || pch[0] == 0) {
300        free(wrkstr); return -1;
301    }
302    addr->devid = (uint8_t) atoi(pch);
303
304    pch = strtok(NULL,"-"); /* Might not find the '-' it's optional */
305    if (pch == NULL || pch[0] == 0) {
306        free(wrkstr); return -1;
307    }
308    addr->function = (uint8_t) atoi(pch);
309
310    pch = strtok(NULL, ""); /* Find end of string */
311   
312    if (pch != NULL && pch[0] != 0) {
313        *core = (long) atoi(pch);
314    }
315
316    free(wrkstr);
317    return 0;
318}
319
320#if DEBUG
321/* For debugging */
322static inline void dump_configuration()
323{
324    struct rte_config * global_config;
325    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
326   
327    if (nb_cpu <= 0) {
328        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
329        nb_cpu = 1; /* fallback to just 1 core */
330    }
331    if (nb_cpu > RTE_MAX_LCORE)
332        nb_cpu = RTE_MAX_LCORE;
333   
334    global_config = rte_eal_get_configuration();
335   
336    if (global_config != NULL) {
337        int i;
338        printf("Intel DPDK setup\n"
339               "---Version      : %"PRIu32"\n"
340               "---Magic        : %"PRIu32"\n"
341               "---Master LCore : %"PRIu32"\n"
342               "---LCore Count  : %"PRIu32"\n",
343               global_config->version, global_config->magic, 
344               global_config->master_lcore, global_config->lcore_count);
345       
346        for (i = 0 ; i < nb_cpu; i++) {
347            printf("   ---Core %d : %s\n", i, 
348                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
349        }
350       
351        const char * proc_type;
352        switch (global_config->process_type) {
353            case RTE_PROC_AUTO:
354                proc_type = "auto";
355                break;
356            case RTE_PROC_PRIMARY:
357                proc_type = "primary";
358                break;
359            case RTE_PROC_SECONDARY:
360                proc_type = "secondary";
361                break;
362            case RTE_PROC_INVALID:
363                proc_type = "invalid";
364                break;
365            default:
366                proc_type = "something worse than invalid!!";
367        }
368        printf("---Process Type : %s\n", proc_type);
369    }
370   
371}
372#endif
373
374static inline int dpdk_init_enviroment(char * uridata, struct dpdk_format_data_t * format_data,
375                                        char * err, int errlen) {
376    int ret; /* Returned error codes */
377    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
378    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
379    long nb_cpu; /* The number of CPUs in the system */
380    long my_cpu; /* The CPU number we want to bind to */
381   
382#if DEBUG
383    rte_set_log_level(RTE_LOG_DEBUG);
384#else
385    rte_set_log_level(RTE_LOG_WARNING);
386#endif
387    /* Using proc-type auto allows this to be either primary or secondary
388     * Secondary allows two intances of libtrace to be used on different
389     * ports. However current version of DPDK doesn't support this on the
390     * same card (My understanding is this should work with two seperate
391     * cards).
392     */
393    char* argv[] = {"libtrace", "-c", NULL, "-n", "1", "--proc-type", "auto", NULL};
394    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
395   
396    /* This initilises the Enviroment Abstraction Layer (EAL)
397     * If we had slave workers these are put into WAITING state
398     *
399     * Basically binds this thread to a fixed core, which we choose as
400     * the last core on the machine (assuming fewer interrupts mapped here).
401     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so om
402     * "-n" the number of memory channels into the CPU (hardware specific)
403     *      - Most likely to be half the number of ram slots in your machine.
404     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
405     * Controls where in memory packets are stored and should spread across
406     * the channels. We just use 1 to be safe.
407     */
408
409    /* Get the number of cpu cores in the system and use the last core */
410    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
411    if (nb_cpu <= 0) {
412        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
413        nb_cpu = 1; /* fallback to the first core */
414    }
415    if (nb_cpu > RTE_MAX_LCORE)
416        nb_cpu = RTE_MAX_LCORE;
417
418    my_cpu = nb_cpu;
419    /* This allows the user to specify the core - we would try to do this
420     * automatically but it's hard to tell that this is secondary
421     * before running rte_eal_init(...). Currently we are limited to 1
422     * instance per core due to the way memory is allocated. */
423    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
424        snprintf(err, errlen, "Failed to parse URI");
425        return -1;
426    }
427
428    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
429                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
430
431    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
432        snprintf(err, errlen, 
433          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
434          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
435        return -1;
436    }
437
438    /* Make our mask */
439    snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
440    argv[2] = cpu_number;
441
442    /* rte_eal_init it makes a call to getopt so we need to reset the
443     * global optind variable of getopt otherwise this fails */
444    optind = 1;
445    if ((ret = rte_eal_init(argc, argv)) < 0) {
446        snprintf(err, errlen, 
447          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
448        return -1;
449    }
450#if DEBUG
451    dump_configuration();
452#endif
453    /* This registers all available NICs with Intel DPDK
454     * These are not loaded until rte_eal_pci_probe() is called.
455     */
456    if ((ret = rte_pmd_init_all()) < 0) {
457        snprintf(err, errlen, 
458          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
459        return -1;
460    }
461
462    /* Black list all ports besides the one that we want to use */
463    blacklist_devices(format_data, &use_addr);
464
465    /* This loads DPDK drivers against all ports that are not blacklisted */
466        if ((ret = rte_eal_pci_probe()) < 0) {
467        snprintf(err, errlen, 
468            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
469        return -1;
470    }
471
472    format_data->nb_ports = rte_eth_dev_count();
473
474    if (format_data->nb_ports != 1) {
475        snprintf(err, errlen, 
476            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
477            format_data->nb_ports);
478        return -1;
479    }
480
481    return 0;
482}
483
484static int dpdk_init_input (libtrace_t *libtrace) {
485    char err[500];
486    err[0] = 0;
487   
488    libtrace->format_data = (struct dpdk_format_data_t *)
489                            malloc(sizeof(struct dpdk_format_data_t));
490    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
491    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
492    FORMAT(libtrace)->nb_ports = 0;
493    FORMAT(libtrace)->snaplen = 0; /* Use default */
494    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
495    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
496    FORMAT(libtrace)->promisc = -1;
497    FORMAT(libtrace)->pktmbuf_pool = NULL;
498    FORMAT(libtrace)->nb_blacklist = 0;
499    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
500    FORMAT(libtrace)->mempool_name[0] = 0;
501#if HAS_HW_TIMESTAMPS_82580
502    FORMAT(libtrace)->ts_first_sys = 0;
503    FORMAT(libtrace)->ts_last_sys = 0;
504    FORMAT(libtrace)->wrap_count = 0;
505#endif
506
507    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
508        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
509        free(libtrace->format_data);
510        libtrace->format_data = NULL;
511        return -1;
512    }
513    return 0;
514};
515
516static int dpdk_init_output(libtrace_out_t *libtrace)
517{
518    char err[500];
519    err[0] = 0;
520   
521    libtrace->format_data = (struct dpdk_format_data_t *)
522                            malloc(sizeof(struct dpdk_format_data_t));
523    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
524    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
525    FORMAT(libtrace)->nb_ports = 0;
526    FORMAT(libtrace)->snaplen = 0; /* Use default */
527    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
528    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
529    FORMAT(libtrace)->promisc = -1;
530    FORMAT(libtrace)->pktmbuf_pool = NULL;
531    FORMAT(libtrace)->nb_blacklist = 0;
532    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
533    FORMAT(libtrace)->mempool_name[0] = 0;
534#if HAS_HW_TIMESTAMPS_82580
535    FORMAT(libtrace)->ts_first_sys = 0;
536    FORMAT(libtrace)->ts_last_sys = 0;
537    FORMAT(libtrace)->wrap_count = 0;
538#endif
539
540    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
541        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
542        free(libtrace->format_data);
543        libtrace->format_data = NULL;
544        return -1;
545    }
546    return 0;
547};
548
549/**
550 * Note here snaplen excludes the MAC checksum. Packets over
551 * the requested snaplen will be dropped. (Excluding MAC checksum)
552 *
553 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
554 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
555 * is set the maximum size of the returned packet would be 1518 otherwise
556 * 1514 would be the largest size possibly returned.
557 *
558 */
559static int dpdk_config_input (libtrace_t *libtrace,
560                                        trace_option_t option,
561                                        void *data) {
562    switch (option) {
563        case TRACE_OPTION_SNAPLEN:
564            /* Only support changing snaplen before a call to start is
565             * made */
566            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
567                FORMAT(libtrace)->snaplen=*(int*)data;
568            else
569                return -1;
570            return 0;
571                case TRACE_OPTION_PROMISC:
572                        FORMAT(libtrace)->promisc=*(int*)data;
573            return 0;
574        case TRACE_OPTION_FILTER:
575            /* TODO filtering */
576            break;
577        case TRACE_OPTION_META_FREQ:
578            break;
579        case TRACE_OPTION_EVENT_REALTIME:
580            break;
581        /* Avoid default: so that future options will cause a warning
582         * here to remind us to implement it, or flag it as
583         * unimplementable
584         */
585    }
586
587        /* Don't set an error - trace_config will try to deal with the
588         * option and will set an error if it fails */
589    return -1;
590}
591
592/* Can set jumbo frames/ or limit the size of a frame by setting both
593 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
594 *
595 */
596static struct rte_eth_conf port_conf = {
597        .rxmode = {
598                .split_hdr_size = 0,
599                .header_split   = 0, /**< Header Split disabled */
600                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
601                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
602                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
603        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
604#if GET_MAC_CRC_CHECKSUM
605/* So it appears that if hw_strip_crc is turned off the driver will still
606 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
607 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
608 * So lets just add it back on when we receive the packet.
609 */
610                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
611#else
612/* By default strip the MAC checksum because it's a bit of a hack to
613 * actually read these. And don't want to rely on disabling this to actualy
614 * always cut off the checksum in the future
615 */
616        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
617#endif
618        },
619        .txmode = {
620                .mq_mode = ETH_DCB_NONE,
621        },
622};
623
624static const struct rte_eth_rxconf rx_conf = {
625        .rx_thresh = {
626                .pthresh = 8,/* RX_PTHRESH prefetch */
627                .hthresh = 8,/* RX_HTHRESH host */
628                .wthresh = 4,/* RX_WTHRESH writeback */
629        },
630    .rx_free_thresh = 0,
631    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
632};
633
634static const struct rte_eth_txconf tx_conf = {
635        .tx_thresh = {
636                .pthresh = 36,/* TX_PTHRESH prefetch */
637                .hthresh = 0,/* TX_HTHRESH host */
638                .wthresh = 4,/* TX_WTHRESH writeback */
639        },
640        .tx_free_thresh = 0, /* Use PMD default values */
641        .tx_rs_thresh = 0, /* Use PMD default values */
642};
643
644/* Attach memory to the port and start the port or restart the port.
645 */
646static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
647    int ret; /* Check return values for errors */
648    struct rte_eth_link link_info; /* Wait for link */
649   
650    /* Already started */
651    if (format_data->paused == DPDK_RUNNING)
652        return 0;
653
654    /* First time started we need to alloc our memory, doing this here
655     * rather than in enviroment setup because we don't have snaplen then */
656    if (format_data->paused == DPDK_NEVER_STARTED) {
657        if (format_data->snaplen == 0) {
658            format_data->snaplen = RX_MBUF_SIZE;
659            port_conf.rxmode.jumbo_frame = 0;
660            port_conf.rxmode.max_rx_pkt_len = 0;
661        } else {
662            /* Use jumbo frames */
663            port_conf.rxmode.jumbo_frame = 1;
664            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
665        }
666
667        /* This is additional overhead so make sure we allow space for this */
668#if GET_MAC_CRC_CHECKSUM
669        format_data->snaplen += ETHER_CRC_LEN;
670#endif
671#if HAS_HW_TIMESTAMPS_82580
672        format_data->snaplen += sizeof(struct hw_timestamp_82580);
673#endif
674
675        /* Create the mbuf pool, which is the place our packets are allocated
676         * from - TODO figure out if there is is a free function (I cannot see one)
677         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
678         * allocate however that extra 1 packet is not used.
679         * (I assume <= vs < error some where in DPDK code)
680         * TX requires nb_tx_buffers + 1 in the case the queue is full
681         * so that will fill the new buffer and wait until slots in the
682         * ring become available.
683         */
684#if DEBUG
685    printf("Creating mempool named %s\n", format_data->mempool_name);
686#endif
687        format_data->pktmbuf_pool =
688            rte_mempool_create(format_data->mempool_name,
689                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
690                       format_data->snaplen + sizeof(struct rte_mbuf) 
691                                        + RTE_PKTMBUF_HEADROOM,
692                       8, sizeof(struct rte_pktmbuf_pool_private),
693                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
694                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
695
696        if (format_data->pktmbuf_pool == NULL) {
697            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
698                        "pool failed: %s", strerror(rte_errno));
699            return -1;
700        }
701    }
702   
703    /* ----------- Now do the setup for the port mapping ------------ */
704    /* Order of calls must be
705     * rte_eth_dev_configure()
706     * rte_eth_tx_queue_setup()
707     * rte_eth_rx_queue_setup()
708     * rte_eth_dev_start()
709     * other rte_eth calls
710     */
711   
712    /* This must be called first before another *eth* function
713     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
714    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
715    if (ret < 0) {
716        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
717                            " %"PRIu8" : %s", format_data->port,
718                            strerror(-ret));
719        return -1;
720    }
721    /* Initilise the TX queue a minimum value if using this port for
722     * receiving. Otherwise a larger size if writing packets.
723     */
724    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
725                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
726    if (ret < 0) {
727        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
728                            " %"PRIu8" : %s", format_data->port,
729                            strerror(-ret));
730        return -1;
731    }
732    /* Initilise the RX queue with some packets from memory */
733    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
734                            format_data->nb_rx_buf, SOCKET_ID_ANY, 
735                            &rx_conf, format_data->pktmbuf_pool);
736    if (ret < 0) {
737        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
738                    " %"PRIu8" : %s", format_data->port,
739                    strerror(-ret));
740        return -1;
741    }
742   
743    /* Start device */
744    ret = rte_eth_dev_start(format_data->port);
745    if (ret < 0) {
746        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
747                    strerror(-ret));
748        return -1;
749    }
750
751    /* Default promiscuous to on */
752    if (format_data->promisc == -1)
753        format_data->promisc = 1;
754   
755    if (format_data->promisc == 1)
756        rte_eth_promiscuous_enable(format_data->port);
757    else
758        rte_eth_promiscuous_disable(format_data->port);
759   
760    /* Wait for the link to come up */
761    rte_eth_link_get(format_data->port, &link_info);
762#if DEBUG
763    printf("Link status is %d %d %d\n", (int) link_info.link_status,
764            (int) link_info.link_duplex, (int) link_info.link_speed);
765#endif
766
767    /* We have now successfully started/unpased */
768    format_data->paused = DPDK_RUNNING;
769   
770    return 0;
771}
772
773static int dpdk_start_input (libtrace_t *libtrace) {
774    char err[500];
775    err[0] = 0;
776
777    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
778        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
779        free(libtrace->format_data);
780        libtrace->format_data = NULL;
781        return -1;
782    }
783    return 0;
784}
785
786static int dpdk_start_output(libtrace_out_t *libtrace)
787{
788    char err[500];
789    err[0] = 0;
790   
791    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
792        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
793        free(libtrace->format_data);
794        libtrace->format_data = NULL;
795        return -1;
796    }
797    return 0;
798}
799
800static int dpdk_pause_input(libtrace_t * libtrace){
801    /* This stops the device, but can be restarted using rte_eth_dev_start() */
802    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
803#if DEBUG     
804        printf("Pausing port\n");
805#endif
806        rte_eth_dev_stop(FORMAT(libtrace)->port);
807        FORMAT(libtrace)->paused = DPDK_PAUSED;
808        /* If we pause it the driver will be reset and likely our counter */
809#if HAS_HW_TIMESTAMPS_82580
810        FORMAT(libtrace)->ts_first_sys = 0;
811        FORMAT(libtrace)->ts_last_sys = 0;
812#endif
813    }
814    return 0;
815}
816
817static int dpdk_write_packet(libtrace_out_t *trace, 
818                libtrace_packet_t *packet){
819    struct rte_mbuf* m_buff[1];
820   
821    int wirelen = trace_get_wire_length(packet);
822    int caplen = trace_get_capture_length(packet);
823   
824    /* Check for a checksum and remove it */
825    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
826                                            wirelen == caplen)
827        caplen -= ETHER_CRC_LEN;
828
829    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
830    if (m_buff[0] == NULL) {
831        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
832        return -1;
833    } else {
834        int ret;
835        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
836        do {
837            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
838        } while (ret != 1);
839    }
840
841    return 0;
842}
843
844static int dpdk_fin_input(libtrace_t * libtrace) {
845    /* Free our memory structures */
846    if (libtrace->format_data != NULL) {
847        /* Close the device completely, device cannot be restarted */
848        if (FORMAT(libtrace)->port != 0xFF)
849            rte_eth_dev_close(FORMAT(libtrace)->port);
850        /* filter here if we used it */
851                free(libtrace->format_data);
852        }
853
854    /* Revert to the original PCI drivers */
855    /* No longer in DPDK
856    rte_eal_pci_exit(); */
857    return 0;
858}
859
860
861static int dpdk_fin_output(libtrace_out_t * libtrace) {
862    /* Free our memory structures */
863    if (libtrace->format_data != NULL) {
864        /* Close the device completely, device cannot be restarted */
865        if (FORMAT(libtrace)->port != 0xFF)
866            rte_eth_dev_close(FORMAT(libtrace)->port);
867        /* filter here if we used it */
868                free(libtrace->format_data);
869        }
870
871    /* Revert to the original PCI drivers */
872    /* No longer in DPDK
873    rte_eal_pci_exit(); */
874    return 0;
875}
876
877/**
878 * Get the start of additional header that we added to a packet.
879 */
880static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
881    uint8_t *hdrsize;
882    assert(packet);
883    assert(packet->buffer);
884    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
885    /* The byte before the original packet data denotes the size in bytes
886     * of our additional header that we added sits before the 'size byte' */
887    hdrsize--;
888    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
889}
890
891static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
892    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
893    return hdr->cap_len;
894}
895
896static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
897    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
898    if (size > hdr->cap_len) {
899        /* Cannot make a packet bigger */
900                return trace_get_capture_length(packet);
901        }
902
903    /* Reset the cached capture length first*/
904    packet->capture_length = -1;
905    hdr->cap_len = (uint32_t) size;
906        return trace_get_capture_length(packet);
907}
908
909static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
910    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
911    int org_cap_size; /* The original capture size */
912    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
913        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
914                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
915                            sizeof(struct hw_timestamp_82580);
916    } else {
917        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
918                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
919    }
920    if (hdr->flags & INCLUDES_CHECKSUM) {
921        return org_cap_size;
922    } else {
923        /* DPDK packets are always TRACE_TYPE_ETH packets */
924        return org_cap_size + ETHER_CRC_LEN;
925    }
926}
927static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
928    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
929    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
930        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
931                sizeof(struct hw_timestamp_82580);
932    else
933        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
934}
935
936static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
937                libtrace_packet_t *packet, void *buffer,
938                libtrace_rt_types_t rt_type, uint32_t flags) {
939    assert(packet);
940    if (packet->buffer != buffer &&
941        packet->buf_control == TRACE_CTRL_PACKET) {
942        free(packet->buffer);
943    }
944
945    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
946        packet->buf_control = TRACE_CTRL_PACKET;
947    } else
948        packet->buf_control = TRACE_CTRL_EXTERNAL;
949
950    packet->buffer = buffer;
951    packet->header = buffer;
952
953    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
954    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
955    packet->type = rt_type;
956    return 0;
957}
958
959/*
960 * Does any extra preperation to a captured packet.
961 * This includes adding our extra header to it with the timestamp
962 */
963static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
964                                                        struct rte_mbuf* pkt){
965    uint8_t * hdr_size;
966    struct dpdk_addt_hdr *hdr;
967#if HAS_HW_TIMESTAMPS_82580
968    struct hw_timestamp_82580 *hw_ts;
969    struct timeval cur_sys_time;
970    uint64_t cur_sys_time_ns;
971    uint64_t estimated_wraps;
972   
973    /* Using gettimeofday because it's most likely to be a vsyscall
974     * We don't want to slow down anything with systemcalls we dont need
975     * accauracy */
976    gettimeofday(&cur_sys_time, NULL);
977#else
978# if USE_CLOCK_GETTIME
979    struct timespec cur_sys_time;
980   
981    /* This looks terrible and I feel bad doing it. But it's OK
982     * on new kernels, because this is a vsyscall */
983    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
984# else
985    struct timeval cur_sys_time;
986    /* Should be a vsyscall */
987    gettimeofday(&cur_sys_time, NULL);
988# endif
989#endif
990
991    /* Record the size of our header */
992    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
993    *hdr_size = sizeof(struct dpdk_addt_hdr);
994    /* Now put our header in front of that size */
995    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
996    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
997   
998#if GET_MAC_CRC_CHECKSUM
999    /* Add back in the CRC sum */
1000    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1001    pkt->pkt.data_len += ETHER_CRC_LEN;
1002    hdr->flags |= INCLUDES_CHECKSUM;
1003#endif
1004
1005#if HAS_HW_TIMESTAMPS_82580
1006    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1007     *
1008     *        +----------+---+   +--------------+
1009     *  82580 |    24    | 8 |   |      32      |
1010     *        +----------+---+   +--------------+
1011     *          reserved  \______ 40 bits _____/
1012     *
1013     * The 40 bit 82580 SYSTIM overflows every
1014     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1015     *
1016     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1017     * Endian (for the full 64 bits) i.e. picture is mirrored
1018     */
1019   
1020    /* The timestamp is sitting before our packet and is included in pkt_len */
1021    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1022    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1023   
1024    /* Despite what the documentation says this is in Little
1025     * Endian byteorder. Mask the reserved section out.
1026     */
1027    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1028                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1029               
1030    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1031    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1032        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1033        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1034    }
1035   
1036    /* This will have serious problems if packets aren't read quickly
1037     * that is within a couple of seconds because our clock cycles every
1038     * 18 seconds */
1039    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1040                            / (1ull<<TS_NBITS_82580);
1041   
1042    /* Estimated_wraps gives the number of times the counter should have
1043     * wrapped (however depending on value last time it could have wrapped
1044     * twice more (if hw clock is close to its max value) or once less (allowing
1045     * for a bit of variance between hw and sys clock). But if the clock
1046     * shouldn't have wrapped once then don't allow it to go backwards in time */
1047    if (unlikely(estimated_wraps >= 2)) {
1048        /* 2 or more wrap arounds add all but the very last wrap */
1049        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1050    }
1051   
1052    /* Set the timestamp to the lowest possible value we're considering */
1053    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1054                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1055   
1056    /* In most runs only the first if() will need evaluating - i.e our
1057     * estimate is correct. */
1058    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1059                                hdr->timestamp, MAXSKEW_82580))) {
1060        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1061        FORMAT(libtrace)->wrap_count++;
1062        hdr->timestamp += (1ull<<TS_NBITS_82580);
1063        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1064                                hdr->timestamp, MAXSKEW_82580)) {
1065            /* Failed to match estimated_wraps */
1066            FORMAT(libtrace)->wrap_count++;
1067            hdr->timestamp += (1ull<<TS_NBITS_82580);
1068            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1069                                hdr->timestamp, MAXSKEW_82580)) {
1070                if (estimated_wraps == 0) {
1071                    /* 0 case Failed to match estimated_wraps+2 */
1072                    printf("WARNING - Hardware Timestamp failed to"
1073                                            " match using systemtime!\n");
1074                    hdr->timestamp = cur_sys_time_ns;
1075                } else {
1076                    /* Failed to match estimated_wraps+1 */
1077                    FORMAT(libtrace)->wrap_count++;
1078                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1079                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1080                                hdr->timestamp, MAXSKEW_82580)) {
1081                        /* Failed to match estimated_wraps+2 */
1082                        printf("WARNING - Hardware Timestamp failed to"
1083                                            " match using systemtime!!\n");
1084                    }
1085                }
1086            }
1087        }
1088    }
1089
1090    /* Log our previous for the next loop */
1091    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1092
1093#else
1094# if USE_CLOCK_GETTIME
1095    hdr->timestamp = TS_TO_NS(cur_sys_time);
1096# else
1097    hdr->timestamp = TV_TO_NS(cur_sys_time);
1098# endif
1099#endif
1100
1101    /* Intels samples prefetch into level 0 cache lets assume it is a good
1102     * idea and do the same */
1103    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1104    packet->buffer = pkt;
1105    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1106
1107    /* Set our capture length for the first time */
1108    hdr->cap_len = dpdk_get_wire_length(packet);
1109    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1110        hdr->cap_len -= ETHER_CRC_LEN;
1111    }
1112   
1113
1114    return dpdk_get_framing_length(packet) +
1115                        dpdk_get_capture_length(packet);
1116}
1117
1118static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1119    int nb_rx; /* Number of rx packets we've recevied */
1120    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1121
1122    /* Free the last packet buffer */
1123    if (packet->buffer != NULL) {
1124        /* Buffer is owned by DPDK */
1125        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1126            rte_pktmbuf_free(packet->buffer);
1127            packet->buffer = NULL;
1128        } else
1129        /* Buffer is owned by packet i.e. has been malloc'd */
1130        if (packet->buf_control == TRACE_CTRL_PACKET) {
1131            free(packet->buffer);
1132            packet->buffer = NULL;
1133        }
1134    }
1135   
1136    packet->buf_control = TRACE_CTRL_EXTERNAL;
1137    packet->type = TRACE_RT_DATA_DPDK;
1138   
1139    /* Wait for a packet */
1140    while (1) {
1141        /* Poll for a single packet */
1142        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1143                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1144        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1145            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1146        }
1147    }
1148   
1149    /* We'll never get here - but if we did it would be bad */
1150    return -1;
1151}
1152
1153static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1154    struct timeval tv;
1155    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1156   
1157    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1158    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1159    return tv;
1160}
1161
1162static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1163    struct timespec ts;
1164    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1165   
1166    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1167    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1168    return ts;
1169}
1170
1171static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1172    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1173}
1174
1175static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1176    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1177    return (libtrace_direction_t) hdr->direction;
1178}
1179
1180static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1181    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1182    hdr->direction = (uint8_t) direction;
1183    return (libtrace_direction_t) hdr->direction;
1184}
1185
1186/*
1187 * NOTE: Drops could occur for other reasons than running out of buffer
1188 * space. Such as failed MAC checksums and oversized packets.
1189 */
1190static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1191    struct rte_eth_stats stats = {0};
1192   
1193    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1194        return UINT64_MAX;
1195    /* Grab the current stats */
1196    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1197   
1198    /* Get the drop counter */
1199    return (uint64_t) stats.ierrors;
1200}
1201
1202static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1203    struct rte_eth_stats stats = {0};
1204   
1205    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1206        return UINT64_MAX;
1207    /* Grab the current stats */
1208    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1209   
1210    /* Get the drop counter */
1211    return (uint64_t) stats.ipackets;
1212}
1213
1214/*
1215 * This is the number of packets filtered by the NIC
1216 * and maybe ahead of number read using libtrace.
1217 *
1218 * XXX we are yet to implement any filtering, but if it was this should
1219 * get the result. So this will just return 0 for now.
1220 */
1221static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1222    struct rte_eth_stats stats = {0};
1223   
1224    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1225        return UINT64_MAX;
1226    /* Grab the current stats */
1227    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1228   
1229    /* Get the drop counter */
1230    return (uint64_t) stats.fdirmiss;
1231}
1232
1233/* Attempts to read a packet in a non-blocking fashion. If one is not
1234 * available a SLEEP event is returned. We do not have the ability to
1235 * create a select()able file descriptor in DPDK.
1236 */
1237static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1238                                        libtrace_packet_t *packet) {
1239    libtrace_eventobj_t event = {0,0,0.0,0};
1240    int nb_rx; /* Number of receive packets we've read */
1241    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1242   
1243    do {
1244   
1245        /* See if we already have a packet waiting */
1246        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1247                        FORMAT(trace)->queue_id, pkts_burst, 1);
1248       
1249        if (nb_rx > 0) {
1250            /* Free the last packet buffer */
1251            if (packet->buffer != NULL) {
1252                /* Buffer is owned by DPDK */
1253                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1254                    rte_pktmbuf_free(packet->buffer);
1255                    packet->buffer = NULL;
1256                } else
1257                /* Buffer is owned by packet i.e. has been malloc'd */
1258                if (packet->buf_control == TRACE_CTRL_PACKET) {
1259                    free(packet->buffer);
1260                    packet->buffer = NULL;
1261                }
1262            }
1263           
1264            packet->buf_control = TRACE_CTRL_EXTERNAL;
1265            packet->type = TRACE_RT_DATA_DPDK;
1266            event.type = TRACE_EVENT_PACKET;
1267            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1268           
1269            /* XXX - Check this passes the filter trace_read_packet normally
1270             * does this for us but this wont */
1271            if (trace->filter) {
1272                if (!trace_apply_filter(trace->filter, packet)) {
1273                    /* Failed the filter so we loop for another packet */
1274                    continue;
1275                }
1276            }
1277        } else {
1278            /* We only want to sleep for a very short time - we are non-blocking */
1279            event.type = TRACE_EVENT_SLEEP;
1280            event.seconds = 0.0001;
1281            event.size = 0;
1282        }
1283       
1284        /* If we get here we have our event */
1285        break;
1286    } while (1);
1287
1288    return event;
1289}
1290
1291
1292static void dpdk_help(void) {
1293    printf("dpdk format module: $Revision: 1752 $\n");
1294    printf("Supported input URIs:\n");
1295    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1296    printf("\tThe -<coreid> is optional \n");
1297    printf("\t e.g. dpdk:0000:01:00.1\n");
1298    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1299    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1300    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1301    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1302    printf("\n");
1303    printf("Supported output URIs:\n");
1304    printf("\tSame format as the input URI.\n");
1305    printf("\t e.g. dpdk:0000:01:00.1\n");
1306    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1307    printf("\n");
1308}
1309
1310 static struct libtrace_format_t dpdk = {
1311        "dpdk",
1312        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1313        TRACE_FORMAT_DPDK,
1314        NULL,                   /* probe filename */
1315        NULL,                               /* probe magic */
1316        dpdk_init_input,            /* init_input */
1317        dpdk_config_input,          /* config_input */
1318        dpdk_start_input,           /* start_input */
1319        dpdk_pause_input,           /* pause_input */
1320        dpdk_init_output,           /* init_output */
1321        NULL,                               /* config_output */
1322        dpdk_start_output,          /* start_ouput */
1323        dpdk_fin_input,             /* fin_input */
1324        dpdk_fin_output,        /* fin_output */
1325        dpdk_read_packet,           /* read_packet */
1326        dpdk_prepare_packet,    /* prepare_packet */
1327        NULL,                               /* fin_packet */
1328        dpdk_write_packet,          /* write_packet */
1329        dpdk_get_link_type,         /* get_link_type */
1330        dpdk_get_direction,         /* get_direction */
1331        dpdk_set_direction,         /* set_direction */
1332        NULL,                               /* get_erf_timestamp */
1333        dpdk_get_timeval,           /* get_timeval */
1334        dpdk_get_timespec,          /* get_timespec */
1335        NULL,                               /* get_seconds */
1336        NULL,                               /* seek_erf */
1337        NULL,                               /* seek_timeval */
1338        NULL,                               /* seek_seconds */
1339        dpdk_get_capture_length,/* get_capture_length */
1340        dpdk_get_wire_length,   /* get_wire_length */
1341        dpdk_get_framing_length,/* get_framing_length */
1342        dpdk_set_capture_length,/* set_capture_length */
1343        NULL,                               /* get_received_packets */
1344        dpdk_get_filtered_packets,/* get_filtered_packets */
1345        dpdk_get_dropped_packets,/* get_dropped_packets */
1346    dpdk_get_captured_packets,/* get_captured_packets */
1347        NULL,                       /* get_fd */
1348        dpdk_trace_event,               /* trace_event */
1349    dpdk_help,              /* help */
1350        NULL
1351};
1352
1353void dpdk_constructor(void) {
1354        register_format(&dpdk);
1355}
Note: See TracBrowser for help on using the repository browser.