source: lib/format_dpdk.c @ 95364aa

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 95364aa was 95364aa, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Change thresholds to work with e1000 series cards

  • Fix some typos
  • Property mode set to 100644
File size: 49.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#include "config.h"
44#include "libtrace.h"
45#include "libtrace_int.h"
46#include "format_helper.h"
47#include "libtrace_arphrd.h"
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
55#include <stdlib.h>
56#include <assert.h>
57#include <unistd.h>
58#include <endian.h>
59#include <rte_eal.h>
60#include <rte_per_lcore.h>
61#include <rte_debug.h>
62#include <rte_errno.h>
63#include <rte_common.h>
64#include <rte_log.h>
65#include <rte_memcpy.h>
66#include <rte_prefetch.h>
67#include <rte_branch_prediction.h>
68#include <rte_pci.h>
69#include <rte_ether.h>
70#include <rte_ethdev.h>
71#include <rte_ring.h>
72#include <rte_mempool.h>
73#include <rte_mbuf.h>
74
75/* The default size of memory buffers to use - This is the max size of standard
76 * ethernet packet less the size of the MAC CHECKSUM */
77#define RX_MBUF_SIZE 1514
78
79/* The minimum number of memory buffers per queue tx or rx. Search for
80 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
81 */
82#define MIN_NB_BUF 64
83
84/* Number of receive memory buffers to use
85 * By default this is limited by driver to 4k and must be a multiple of 128.
86 * A modification can be made to the driver to remove this limit.
87 * This can be increased in the driver and here.
88 * Should be at least MIN_NB_BUF.
89 */
90#define NB_RX_MBUF 4096
91
92/* Number of send memory buffers to use.
93 * Same limits apply as those to NB_TX_MBUF.
94 */
95#define NB_TX_MBUF 1024
96
97/* The size of the PCI blacklist needs to be big enough to contain
98 * every PCI device address (listed by lspci every bus:device.function tuple).
99 */
100#define BLACK_LIST_SIZE 50
101
102/* The maximum number of characters the mempool name can be */
103#define MEMPOOL_NAME_LEN 20
104
105#define MBUF(x) ((struct rte_mbuf *) x)
106/* Get the original placement of the packet data */
107#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
108#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
109#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
110                        (uint64_t) tv.tv_usec*1000ull)
111#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
112                        (uint64_t) ts.tv_nsec)
113
114#if RTE_PKTMBUF_HEADROOM != 128
115#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
116         "any libtrace instance processing these packet must be have the" \
117         "same RTE_PKTMBUF_HEADROOM set"
118#endif
119
120/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
121 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
122 *
123 * Make sure you understand what these are doing before enabling them.
124 * They might make traces incompatable with other builds etc.
125 *
126 * These are also included to show how to do somethings which aren't
127 * obvious in the DPDK documentation.
128 */
129
130/* Print verbose messages to stdout */
131#define DEBUG 0
132
133/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
134 * only turn on if you know clock_gettime is a vsyscall on your system
135 * overwise could be a large overhead. Again gettimeofday() should be
136 * vsyscall also if it's not you should seriously consider updating your
137 * kernel.
138 */
139#ifdef HAVE_LIBRT
140/* You can turn this on (set to 1) to prefer clock_gettime */
141#define USE_CLOCK_GETTIME 0
142#else
143/* DONT CHANGE THIS !!! */
144#define USE_CLOCK_GETTIME 0
145#endif
146
147/* This is fairly safe to turn on - currently there appears to be a 'bug'
148 * in DPDK that will remove the checksum by making the packet appear 4bytes
149 * smaller than what it really is. Most formats don't include the checksum
150 * hence writing out a port such as int: ring: and dpdk: assumes there
151 * is no checksum and will attempt to write the checksum as part of the
152 * packet
153 */
154#define GET_MAC_CRC_CHECKSUM 0
155
156/* This requires a modification of the pmd drivers (inside Intel DPDK)
157 */
158#define HAS_HW_TIMESTAMPS_82580 0
159
160#if HAS_HW_TIMESTAMPS_82580
161# define TS_NBITS_82580     40
162/* The maximum on the +ve or -ve side that we can be, make it half way */
163# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
164#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
165#endif
166
167/* As per Intel 82580 specification - mismatch in 82580 datasheet
168 * it states ts is stored in Big Endian, however its actually Little */
169struct hw_timestamp_82580 {
170    uint64_t reserved;
171    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
172};
173
174enum paused_state {
175    DPDK_NEVER_STARTED,
176    DPDK_RUNNING,
177    DPDK_PAUSED,
178};
179
180/* Used by both input and output however some fields are not used
181 * for output */
182struct dpdk_format_data_t {
183    int8_t promisc; /* promiscuous mode - RX only */
184    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
185    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
186    uint8_t paused; /* See paused_state */ 
187    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
188    int snaplen; /* The snap length for the capture - RX only */
189    /* We always have to setup both rx and tx queues even if we don't want them */
190    int nb_rx_buf; /* The number of packet buffers in the rx ring */
191    int nb_tx_buf; /* The number of packet buffers in the tx ring */
192    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
193    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
194    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
195    unsigned int nb_blacklist; /* Number of blacklist items in are valid */
196#if HAS_HW_TIMESTAMPS_82580
197    /* Timestamping only relevent to RX */
198    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
199    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
200    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
201#endif
202};
203
204enum dpdk_addt_hdr_flags {
205    INCLUDES_CHECKSUM = 0x1,
206    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
207};
208
209/**
210 * A structure placed in front of the packet where we can store
211 * additional information about the given packet.
212 * +--------------------------+
213 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
214 * +--------------------------+
215 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
216 * +--------------------------+
217 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
218 * +--------------------------+
219 * |   sizeof(dpdk_addt_hdr)  | 1 byte
220 * +--------------------------+
221 * *   hw_timestamp_82580     * 16 bytes Optional
222 * +--------------------------+
223 * |       Packet data        | Variable Size
224 * |                          |
225 */
226struct dpdk_addt_hdr {
227    uint64_t timestamp;
228    uint8_t flags;
229    uint8_t direction;
230    uint8_t reserved1;
231    uint8_t reserved2;
232    uint32_t cap_len; /* The size to say the capture is */
233};
234
235/**
236 * We want to blacklist all devices except those on the whitelist
237 * (I say list, but yes it is only the one).
238 *
239 * The default behaviour of rte_pci_probe() will map every possible device
240 * to its DPDK driver. The DPDK driver will take the ethernet device
241 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
242 *
243 * So blacklist all devices except the one that we wish to use so that
244 * the others can still be used as standard ethernet ports.
245 */
246static void blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
247{
248        struct rte_pci_device *dev = NULL;
249        format_data->nb_blacklist = 0;
250
251        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
252
253        TAILQ_FOREACH(dev, &device_list, next) {
254        if (whitelist != NULL && whitelist->domain == dev->addr.domain
255            && whitelist->bus == dev->addr.bus
256            && whitelist->devid == dev->addr.devid
257            && whitelist->function == dev->addr.function)
258            continue;
259                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
260                                / sizeof (format_data->blacklist[0])) {
261                        printf("Warning: too many devices to blacklist consider"
262                                        " increasing BLACK_LIST_SIZE");
263                        break;
264                }
265                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
266                ++format_data->nb_blacklist;
267        }
268
269        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
270}
271
272/**
273 * Parse the URI format as a pci address
274 * Fills in addr, note core is optional and is unchanged if
275 * a value for it is not provided.
276 *
277 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
278 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
279 */
280static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
281    char * wrkstr;
282    char * pch;
283    assert(str);
284    wrkstr = strdup(str);
285   
286    pch = strtok(wrkstr,":");
287    if (pch == NULL || pch[0] == 0) {
288        free(wrkstr); return -1;
289    }
290    addr->domain = (uint16_t) atoi(pch);
291
292    pch = strtok(NULL,":");
293    if (pch == NULL || pch[0] == 0) {
294        free(wrkstr); return -1;
295    }
296    addr->bus = (uint8_t) atoi(pch);
297
298    pch = strtok(NULL,".");
299    if (pch == NULL || pch[0] == 0) {
300        free(wrkstr); return -1;
301    }
302    addr->devid = (uint8_t) atoi(pch);
303
304    pch = strtok(NULL,"-"); /* Might not find the '-' it's optional */
305    if (pch == NULL || pch[0] == 0) {
306        free(wrkstr); return -1;
307    }
308    addr->function = (uint8_t) atoi(pch);
309
310    pch = strtok(NULL, ""); /* Find end of string */
311   
312    if (pch != NULL && pch[0] != 0) {
313        *core = (long) atoi(pch);
314    }
315
316    free(wrkstr);
317    return 0;
318}
319
320#if DEBUG
321/* For debugging */
322static inline void dump_configuration()
323{
324    struct rte_config * global_config;
325    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
326   
327    if (nb_cpu <= 0) {
328        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
329        nb_cpu = 1; /* fallback to just 1 core */
330    }
331    if (nb_cpu > RTE_MAX_LCORE)
332        nb_cpu = RTE_MAX_LCORE;
333   
334    global_config = rte_eal_get_configuration();
335   
336    if (global_config != NULL) {
337        int i;
338        fprintf(stderr, "Intel DPDK setup\n"
339               "---Version      : %"PRIu32"\n"
340               "---Magic        : %"PRIu32"\n"
341               "---Master LCore : %"PRIu32"\n"
342               "---LCore Count  : %"PRIu32"\n",
343               global_config->version, global_config->magic, 
344               global_config->master_lcore, global_config->lcore_count);
345       
346        for (i = 0 ; i < nb_cpu; i++) {
347            fprintf(stderr, "   ---Core %d : %s\n", i, 
348                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
349        }
350       
351        const char * proc_type;
352        switch (global_config->process_type) {
353            case RTE_PROC_AUTO:
354                proc_type = "auto";
355                break;
356            case RTE_PROC_PRIMARY:
357                proc_type = "primary";
358                break;
359            case RTE_PROC_SECONDARY:
360                proc_type = "secondary";
361                break;
362            case RTE_PROC_INVALID:
363                proc_type = "invalid";
364                break;
365            default:
366                proc_type = "something worse than invalid!!";
367        }
368        fprintf(stderr, "---Process Type : %s\n", proc_type);
369    }
370   
371}
372#endif
373
374static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
375                                        char * err, int errlen) {
376    int ret; /* Returned error codes */
377    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
378    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
379    char mem_map[20] = {0}; /* The memory name */
380    long nb_cpu; /* The number of CPUs in the system */
381    long my_cpu; /* The CPU number we want to bind to */
382   
383#if DEBUG
384    rte_set_log_level(RTE_LOG_DEBUG);
385#else
386    rte_set_log_level(RTE_LOG_WARNING);
387#endif
388    /*
389     * Using unique file prefixes mean separate memory is used, unlinking
390     * the two processes. However be careful we still cannot access a
391     * port that already in use.
392     */
393    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
394                "--file-prefix", mem_map, "-m", "256", NULL};
395    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
396   
397    /* This initialises the Environment Abstraction Layer (EAL)
398    /* This initialises the Environment Abstraction Layer (EAL)
399     * If we had slave workers these are put into WAITING state
400     *
401     * Basically binds this thread to a fixed core, which we choose as
402     * the last core on the machine (assuming fewer interrupts mapped here).
403     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
404     * "-n" the number of memory channels into the CPU (hardware specific)
405     *      - Most likely to be half the number of ram slots in your machine.
406     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
407     * Controls where in memory packets are stored and should spread across
408     * the channels. We just use 1 to be safe.
409     */
410
411    /* Get the number of cpu cores in the system and use the last core */
412    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
413    if (nb_cpu <= 0) {
414        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
415        nb_cpu = 1; /* fallback to the first core */
416    }
417    if (nb_cpu > RTE_MAX_LCORE)
418        nb_cpu = RTE_MAX_LCORE;
419
420    my_cpu = nb_cpu;
421    /* This allows the user to specify the core - we would try to do this
422     * automatically but it's hard to tell that this is secondary
423     * before running rte_eal_init(...). Currently we are limited to 1
424     * instance per core due to the way memory is allocated. */
425    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
426        snprintf(err, errlen, "Failed to parse URI");
427        return -1;
428    }
429
430    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
431                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
432
433    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
434        snprintf(err, errlen, 
435          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
436          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
437        return -1;
438    }
439
440    /* Make our mask */
441    snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
442
443
444        /* Give the memory map a unique name */
445        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
446    /* rte_eal_init it makes a call to getopt so we need to reset the
447     * global optind variable of getopt otherwise this fails */
448    optind = 1;
449    if ((ret = rte_eal_init(argc, argv)) < 0) {
450        snprintf(err, errlen, 
451          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
452        return -1;
453    }
454#if DEBUG
455    dump_configuration();
456#endif
457    /* This registers all available NICs with Intel DPDK
458     * These are not loaded until rte_eal_pci_probe() is called.
459     */
460    if ((ret = rte_pmd_init_all()) < 0) {
461        snprintf(err, errlen, 
462          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
463        return -1;
464    }
465
466    /* Black list all ports besides the one that we want to use */
467    blacklist_devices(format_data, &use_addr);
468
469    /* This loads DPDK drivers against all ports that are not blacklisted */
470        if ((ret = rte_eal_pci_probe()) < 0) {
471        snprintf(err, errlen, 
472            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
473        return -1;
474    }
475
476    format_data->nb_ports = rte_eth_dev_count();
477
478    if (format_data->nb_ports != 1) {
479        snprintf(err, errlen, 
480            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
481            format_data->nb_ports);
482        return -1;
483    }
484
485    return 0;
486}
487
488static int dpdk_init_input (libtrace_t *libtrace) {
489    char err[500];
490    err[0] = 0;
491   
492    libtrace->format_data = (struct dpdk_format_data_t *)
493                            malloc(sizeof(struct dpdk_format_data_t));
494    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
495    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
496    FORMAT(libtrace)->nb_ports = 0;
497    FORMAT(libtrace)->snaplen = 0; /* Use default */
498    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
499    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
500    FORMAT(libtrace)->promisc = -1;
501    FORMAT(libtrace)->pktmbuf_pool = NULL;
502    FORMAT(libtrace)->nb_blacklist = 0;
503    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
504    FORMAT(libtrace)->mempool_name[0] = 0;
505#if HAS_HW_TIMESTAMPS_82580
506    FORMAT(libtrace)->ts_first_sys = 0;
507    FORMAT(libtrace)->ts_last_sys = 0;
508    FORMAT(libtrace)->wrap_count = 0;
509#endif
510
511    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
512        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
513        free(libtrace->format_data);
514        libtrace->format_data = NULL;
515        return -1;
516    }
517    return 0;
518};
519
520static int dpdk_init_output(libtrace_out_t *libtrace)
521{
522    char err[500];
523    err[0] = 0;
524   
525    libtrace->format_data = (struct dpdk_format_data_t *)
526                            malloc(sizeof(struct dpdk_format_data_t));
527    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
528    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
529    FORMAT(libtrace)->nb_ports = 0;
530    FORMAT(libtrace)->snaplen = 0; /* Use default */
531    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
532    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
533    FORMAT(libtrace)->promisc = -1;
534    FORMAT(libtrace)->pktmbuf_pool = NULL;
535    FORMAT(libtrace)->nb_blacklist = 0;
536    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
537    FORMAT(libtrace)->mempool_name[0] = 0;
538#if HAS_HW_TIMESTAMPS_82580
539    FORMAT(libtrace)->ts_first_sys = 0;
540    FORMAT(libtrace)->ts_last_sys = 0;
541    FORMAT(libtrace)->wrap_count = 0;
542#endif
543
544    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
545        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
546        free(libtrace->format_data);
547        libtrace->format_data = NULL;
548        return -1;
549    }
550    return 0;
551};
552
553/**
554 * Note here snaplen excludes the MAC checksum. Packets over
555 * the requested snaplen will be dropped. (Excluding MAC checksum)
556 *
557 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
558 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
559 * is set the maximum size of the returned packet would be 1518 otherwise
560 * 1514 would be the largest size possibly returned.
561 *
562 */
563static int dpdk_config_input (libtrace_t *libtrace,
564                                        trace_option_t option,
565                                        void *data) {
566    switch (option) {
567        case TRACE_OPTION_SNAPLEN:
568            /* Only support changing snaplen before a call to start is
569             * made */
570            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
571                FORMAT(libtrace)->snaplen=*(int*)data;
572            else
573                return -1;
574            return 0;
575                case TRACE_OPTION_PROMISC:
576                        FORMAT(libtrace)->promisc=*(int*)data;
577            return 0;
578        case TRACE_OPTION_FILTER:
579            /* TODO filtering */
580            break;
581        case TRACE_OPTION_META_FREQ:
582            break;
583        case TRACE_OPTION_EVENT_REALTIME:
584            break;
585        /* Avoid default: so that future options will cause a warning
586         * here to remind us to implement it, or flag it as
587         * unimplementable
588         */
589    }
590
591        /* Don't set an error - trace_config will try to deal with the
592         * option and will set an error if it fails */
593    return -1;
594}
595
596/* Can set jumbo frames/ or limit the size of a frame by setting both
597 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
598 *
599 */
600static struct rte_eth_conf port_conf = {
601        .rxmode = {
602                .split_hdr_size = 0,
603                .header_split   = 0, /**< Header Split disabled */
604                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
605                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
606                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
607        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
608#if GET_MAC_CRC_CHECKSUM
609/* So it appears that if hw_strip_crc is turned off the driver will still
610 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
611 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
612 * So lets just add it back on when we receive the packet.
613 */
614                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
615#else
616/* By default strip the MAC checksum because it's a bit of a hack to
617 * actually read these. And don't want to rely on disabling this to actualy
618 * always cut off the checksum in the future
619 */
620        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
621#endif
622        },
623        .txmode = {
624                .mq_mode = ETH_DCB_NONE,
625        },
626};
627
628static const struct rte_eth_rxconf rx_conf = {
629        .rx_thresh = {
630                .pthresh = 8,/* RX_PTHRESH prefetch */
631                .hthresh = 8,/* RX_HTHRESH host */
632                .wthresh = 4,/* RX_WTHRESH writeback */
633        },
634    .rx_free_thresh = 0,
635    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
636};
637
638static const struct rte_eth_txconf tx_conf = {
639        .tx_thresh = {
640        /**
641         * TX_PTHRESH prefetch
642         * Set on the NIC, if the number of unprocessed descriptors to queued on
643         * the card fall below this try grab at least hthresh more unprocessed
644         * descriptors.
645         */
646                .pthresh = 36,
647
648        /* TX_HTHRESH host
649         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
650         */
651                .hthresh = 0,
652       
653        /* TX_WTHRESH writeback
654         * Set on the NIC, the number of sent descriptors before writing back
655         * status to confirm the transmission. This is done more efficiently as
656         * a bulk DMA-transfer rather than writing one at a time.
657         * Similar to tx_free_thresh however this is applied to the NIC, where
658         * as tx_free_thresh is when DPDK will check these. This is extended
659         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
660         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
661         */
662                .wthresh = 4,
663        },
664
665    /* Used internally by DPDK rather than passed to the NIC. The number of
666     * packet descriptors to send before checking for any responses written
667     * back (to confirm the transmission). Default = 32 if set to 0)
668     */
669        .tx_free_thresh = 0,
670
671    /* This is the Report Status threshold, used by 10Gbit cards,
672     * This signals the card to only write back status (such as
673     * transmission successful) after this minimum number of transmit
674     * descriptors are seen. The default is 32 (if set to 0) however if set
675     * to greater than 1 TX wthresh must be set to zero, because this is kindof
676     * a replacement. See the dpdk programmers guide for more restrictions.
677     */
678        .tx_rs_thresh = 1,
679};
680
681/* Attach memory to the port and start the port or restart the port.
682 */
683static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
684    int ret; /* Check return values for errors */
685    struct rte_eth_link link_info; /* Wait for link */
686   
687    /* Already started */
688    if (format_data->paused == DPDK_RUNNING)
689        return 0;
690
691    /* First time started we need to alloc our memory, doing this here
692     * rather than in environment setup because we don't have snaplen then */
693    if (format_data->paused == DPDK_NEVER_STARTED) {
694        if (format_data->snaplen == 0) {
695            format_data->snaplen = RX_MBUF_SIZE;
696            port_conf.rxmode.jumbo_frame = 0;
697            port_conf.rxmode.max_rx_pkt_len = 0;
698        } else {
699            /* Use jumbo frames */
700            port_conf.rxmode.jumbo_frame = 1;
701            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
702        }
703
704        /* This is additional overhead so make sure we allow space for this */
705#if GET_MAC_CRC_CHECKSUM
706        format_data->snaplen += ETHER_CRC_LEN;
707#endif
708#if HAS_HW_TIMESTAMPS_82580
709        format_data->snaplen += sizeof(struct hw_timestamp_82580);
710#endif
711
712        /* Create the mbuf pool, which is the place our packets are allocated
713         * from - TODO figure out if there is is a free function (I cannot see one)
714         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
715         * allocate however that extra 1 packet is not used.
716         * (I assume <= vs < error some where in DPDK code)
717         * TX requires nb_tx_buffers + 1 in the case the queue is full
718         * so that will fill the new buffer and wait until slots in the
719         * ring become available.
720         */
721#if DEBUG
722    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
723#endif
724        format_data->pktmbuf_pool =
725            rte_mempool_create(format_data->mempool_name,
726                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
727                       format_data->snaplen + sizeof(struct rte_mbuf) 
728                                        + RTE_PKTMBUF_HEADROOM,
729                       8, sizeof(struct rte_pktmbuf_pool_private),
730                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
731                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
732
733        if (format_data->pktmbuf_pool == NULL) {
734            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
735                        "pool failed: %s", strerror(rte_errno));
736            return -1;
737        }
738    }
739   
740    /* ----------- Now do the setup for the port mapping ------------ */
741    /* Order of calls must be
742     * rte_eth_dev_configure()
743     * rte_eth_tx_queue_setup()
744     * rte_eth_rx_queue_setup()
745     * rte_eth_dev_start()
746     * other rte_eth calls
747     */
748   
749    /* This must be called first before another *eth* function
750     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
751    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
752    if (ret < 0) {
753        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
754                            " %"PRIu8" : %s", format_data->port,
755                            strerror(-ret));
756        return -1;
757    }
758    /* Initialise the TX queue a minimum value if using this port for
759     * receiving. Otherwise a larger size if writing packets.
760     */
761    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
762                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
763    if (ret < 0) {
764        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
765                            " %"PRIu8" : %s", format_data->port,
766                            strerror(-ret));
767        return -1;
768    }
769    /* Initialise the RX queue with some packets from memory */
770    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
771                            format_data->nb_rx_buf, SOCKET_ID_ANY, 
772                            &rx_conf, format_data->pktmbuf_pool);
773    if (ret < 0) {
774        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
775                    " %"PRIu8" : %s", format_data->port,
776                    strerror(-ret));
777        return -1;
778    }
779   
780    /* Start device */
781    ret = rte_eth_dev_start(format_data->port);
782    if (ret < 0) {
783        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
784                    strerror(-ret));
785        return -1;
786    }
787
788    /* Default promiscuous to on */
789    if (format_data->promisc == -1)
790        format_data->promisc = 1;
791   
792    if (format_data->promisc == 1)
793        rte_eth_promiscuous_enable(format_data->port);
794    else
795        rte_eth_promiscuous_disable(format_data->port);
796   
797    /* Wait for the link to come up */
798    rte_eth_link_get(format_data->port, &link_info);
799#if DEBUG
800    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
801            (int) link_info.link_duplex, (int) link_info.link_speed);
802#endif
803
804    /* We have now successfully started/unpaused */
805    format_data->paused = DPDK_RUNNING;
806   
807    return 0;
808}
809
810static int dpdk_start_input (libtrace_t *libtrace) {
811    char err[500];
812    err[0] = 0;
813
814    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
815        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
816        free(libtrace->format_data);
817        libtrace->format_data = NULL;
818        return -1;
819    }
820    return 0;
821}
822
823static int dpdk_start_output(libtrace_out_t *libtrace)
824{
825    char err[500];
826    err[0] = 0;
827   
828    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
829        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
830        free(libtrace->format_data);
831        libtrace->format_data = NULL;
832        return -1;
833    }
834    return 0;
835}
836
837static int dpdk_pause_input(libtrace_t * libtrace){
838    /* This stops the device, but can be restarted using rte_eth_dev_start() */
839    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
840#if DEBUG     
841        fprintf(stderr, "Pausing port\n");
842#endif
843        rte_eth_dev_stop(FORMAT(libtrace)->port);
844        FORMAT(libtrace)->paused = DPDK_PAUSED;
845        /* If we pause it the driver will be reset and likely our counter */
846#if HAS_HW_TIMESTAMPS_82580
847        FORMAT(libtrace)->ts_first_sys = 0;
848        FORMAT(libtrace)->ts_last_sys = 0;
849#endif
850    }
851    return 0;
852}
853
854static int dpdk_write_packet(libtrace_out_t *trace, 
855                libtrace_packet_t *packet){
856    struct rte_mbuf* m_buff[1];
857   
858    int wirelen = trace_get_wire_length(packet);
859    int caplen = trace_get_capture_length(packet);
860   
861    /* Check for a checksum and remove it */
862    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
863                                            wirelen == caplen)
864        caplen -= ETHER_CRC_LEN;
865
866    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
867    if (m_buff[0] == NULL) {
868        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
869        return -1;
870    } else {
871        int ret;
872        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
873        do {
874            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
875        } while (ret != 1);
876    }
877
878    return 0;
879}
880
881static int dpdk_fin_input(libtrace_t * libtrace) {
882    /* Free our memory structures */
883    if (libtrace->format_data != NULL) {
884        /* Close the device completely, device cannot be restarted */
885        if (FORMAT(libtrace)->port != 0xFF)
886            rte_eth_dev_close(FORMAT(libtrace)->port);
887        /* filter here if we used it */
888                free(libtrace->format_data);
889        }
890
891    /* Revert to the original PCI drivers */
892    /* No longer in DPDK
893    rte_eal_pci_exit(); */
894    return 0;
895}
896
897
898static int dpdk_fin_output(libtrace_out_t * libtrace) {
899    /* Free our memory structures */
900    if (libtrace->format_data != NULL) {
901        /* Close the device completely, device cannot be restarted */
902        if (FORMAT(libtrace)->port != 0xFF)
903            rte_eth_dev_close(FORMAT(libtrace)->port);
904        /* filter here if we used it */
905                free(libtrace->format_data);
906        }
907
908    /* Revert to the original PCI drivers */
909    /* No longer in DPDK
910    rte_eal_pci_exit(); */
911    return 0;
912}
913
914/**
915 * Get the start of additional header that we added to a packet.
916 */
917static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
918    uint8_t *hdrsize;
919    assert(packet);
920    assert(packet->buffer);
921    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
922    /* The byte before the original packet data denotes the size in bytes
923     * of our additional header that we added sits before the 'size byte' */
924    hdrsize--;
925    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
926}
927
928static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
929    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
930    return hdr->cap_len;
931}
932
933static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
934    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
935    if (size > hdr->cap_len) {
936        /* Cannot make a packet bigger */
937                return trace_get_capture_length(packet);
938        }
939
940    /* Reset the cached capture length first*/
941    packet->capture_length = -1;
942    hdr->cap_len = (uint32_t) size;
943        return trace_get_capture_length(packet);
944}
945
946static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
947    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
948    int org_cap_size; /* The original capture size */
949    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
950        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
951                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
952                            sizeof(struct hw_timestamp_82580);
953    } else {
954        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
955                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
956    }
957    if (hdr->flags & INCLUDES_CHECKSUM) {
958        return org_cap_size;
959    } else {
960        /* DPDK packets are always TRACE_TYPE_ETH packets */
961        return org_cap_size + ETHER_CRC_LEN;
962    }
963}
964static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
965    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
966    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
967        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
968                sizeof(struct hw_timestamp_82580);
969    else
970        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
971}
972
973static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
974                libtrace_packet_t *packet, void *buffer,
975                libtrace_rt_types_t rt_type, uint32_t flags) {
976    assert(packet);
977    if (packet->buffer != buffer &&
978        packet->buf_control == TRACE_CTRL_PACKET) {
979        free(packet->buffer);
980    }
981
982    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
983        packet->buf_control = TRACE_CTRL_PACKET;
984    } else
985        packet->buf_control = TRACE_CTRL_EXTERNAL;
986
987    packet->buffer = buffer;
988    packet->header = buffer;
989
990    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
991    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
992    packet->type = rt_type;
993    return 0;
994}
995
996/*
997 * Does any extra preperation to a captured packet.
998 * This includes adding our extra header to it with the timestamp
999 */
1000static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1001                                                        struct rte_mbuf* pkt){
1002    uint8_t * hdr_size;
1003    struct dpdk_addt_hdr *hdr;
1004#if HAS_HW_TIMESTAMPS_82580
1005    struct hw_timestamp_82580 *hw_ts;
1006    struct timeval cur_sys_time;
1007    uint64_t cur_sys_time_ns;
1008    uint64_t estimated_wraps;
1009   
1010    /* Using gettimeofday because it's most likely to be a vsyscall
1011     * We don't want to slow down anything with systemcalls we dont need
1012     * accauracy */
1013    gettimeofday(&cur_sys_time, NULL);
1014#else
1015# if USE_CLOCK_GETTIME
1016    struct timespec cur_sys_time;
1017   
1018    /* This looks terrible and I feel bad doing it. But it's OK
1019     * on new kernels, because this is a vsyscall */
1020    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1021# else
1022    struct timeval cur_sys_time;
1023    /* Should be a vsyscall */
1024    gettimeofday(&cur_sys_time, NULL);
1025# endif
1026#endif
1027
1028    /* Record the size of our header */
1029    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1030    *hdr_size = sizeof(struct dpdk_addt_hdr);
1031    /* Now put our header in front of that size */
1032    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1033    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1034   
1035#if GET_MAC_CRC_CHECKSUM
1036    /* Add back in the CRC sum */
1037    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1038    pkt->pkt.data_len += ETHER_CRC_LEN;
1039    hdr->flags |= INCLUDES_CHECKSUM;
1040#endif
1041
1042#if HAS_HW_TIMESTAMPS_82580
1043    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1044     *
1045     *        +----------+---+   +--------------+
1046     *  82580 |    24    | 8 |   |      32      |
1047     *        +----------+---+   +--------------+
1048     *          reserved  \______ 40 bits _____/
1049     *
1050     * The 40 bit 82580 SYSTIM overflows every
1051     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1052     *
1053     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1054     * Endian (for the full 64 bits) i.e. picture is mirrored
1055     */
1056   
1057    /* The timestamp is sitting before our packet and is included in pkt_len */
1058    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1059    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1060   
1061    /* Despite what the documentation says this is in Little
1062     * Endian byteorder. Mask the reserved section out.
1063     */
1064    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1065                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1066               
1067    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1068    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1069        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1070        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1071    }
1072   
1073    /* This will have serious problems if packets aren't read quickly
1074     * that is within a couple of seconds because our clock cycles every
1075     * 18 seconds */
1076    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1077                            / (1ull<<TS_NBITS_82580);
1078   
1079    /* Estimated_wraps gives the number of times the counter should have
1080     * wrapped (however depending on value last time it could have wrapped
1081     * twice more (if hw clock is close to its max value) or once less (allowing
1082     * for a bit of variance between hw and sys clock). But if the clock
1083     * shouldn't have wrapped once then don't allow it to go backwards in time */
1084    if (unlikely(estimated_wraps >= 2)) {
1085        /* 2 or more wrap arounds add all but the very last wrap */
1086        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1087    }
1088   
1089    /* Set the timestamp to the lowest possible value we're considering */
1090    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1091                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1092   
1093    /* In most runs only the first if() will need evaluating - i.e our
1094     * estimate is correct. */
1095    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1096                                hdr->timestamp, MAXSKEW_82580))) {
1097        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1098        FORMAT(libtrace)->wrap_count++;
1099        hdr->timestamp += (1ull<<TS_NBITS_82580);
1100        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1101                                hdr->timestamp, MAXSKEW_82580)) {
1102            /* Failed to match estimated_wraps */
1103            FORMAT(libtrace)->wrap_count++;
1104            hdr->timestamp += (1ull<<TS_NBITS_82580);
1105            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1106                                hdr->timestamp, MAXSKEW_82580)) {
1107                if (estimated_wraps == 0) {
1108                    /* 0 case Failed to match estimated_wraps+2 */
1109                    printf("WARNING - Hardware Timestamp failed to"
1110                                            " match using systemtime!\n");
1111                    hdr->timestamp = cur_sys_time_ns;
1112                } else {
1113                    /* Failed to match estimated_wraps+1 */
1114                    FORMAT(libtrace)->wrap_count++;
1115                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1116                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1117                                hdr->timestamp, MAXSKEW_82580)) {
1118                        /* Failed to match estimated_wraps+2 */
1119                        printf("WARNING - Hardware Timestamp failed to"
1120                                            " match using systemtime!!\n");
1121                    }
1122                }
1123            }
1124        }
1125    }
1126
1127    /* Log our previous for the next loop */
1128    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1129
1130#else
1131# if USE_CLOCK_GETTIME
1132    hdr->timestamp = TS_TO_NS(cur_sys_time);
1133# else
1134    hdr->timestamp = TV_TO_NS(cur_sys_time);
1135# endif
1136#endif
1137
1138    /* Intels samples prefetch into level 0 cache lets assume it is a good
1139     * idea and do the same */
1140    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1141    packet->buffer = pkt;
1142    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1143
1144    /* Set our capture length for the first time */
1145    hdr->cap_len = dpdk_get_wire_length(packet);
1146    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1147        hdr->cap_len -= ETHER_CRC_LEN;
1148    }
1149   
1150
1151    return dpdk_get_framing_length(packet) +
1152                        dpdk_get_capture_length(packet);
1153}
1154
1155static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1156    int nb_rx; /* Number of rx packets we've recevied */
1157    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1158
1159    /* Free the last packet buffer */
1160    if (packet->buffer != NULL) {
1161        /* Buffer is owned by DPDK */
1162        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1163            rte_pktmbuf_free(packet->buffer);
1164            packet->buffer = NULL;
1165        } else
1166        /* Buffer is owned by packet i.e. has been malloc'd */
1167        if (packet->buf_control == TRACE_CTRL_PACKET) {
1168            free(packet->buffer);
1169            packet->buffer = NULL;
1170        }
1171    }
1172   
1173    packet->buf_control = TRACE_CTRL_EXTERNAL;
1174    packet->type = TRACE_RT_DATA_DPDK;
1175   
1176    /* Wait for a packet */
1177    while (1) {
1178        /* Poll for a single packet */
1179        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1180                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1181        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1182            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1183        }
1184    }
1185   
1186    /* We'll never get here - but if we did it would be bad */
1187    return -1;
1188}
1189
1190static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1191    struct timeval tv;
1192    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1193   
1194    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1195    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1196    return tv;
1197}
1198
1199static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1200    struct timespec ts;
1201    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1202   
1203    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1204    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1205    return ts;
1206}
1207
1208static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1209    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1210}
1211
1212static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1213    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1214    return (libtrace_direction_t) hdr->direction;
1215}
1216
1217static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1218    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1219    hdr->direction = (uint8_t) direction;
1220    return (libtrace_direction_t) hdr->direction;
1221}
1222
1223/*
1224 * NOTE: Drops could occur for other reasons than running out of buffer
1225 * space. Such as failed MAC checksums and oversized packets.
1226 */
1227static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1228    struct rte_eth_stats stats = {0};
1229   
1230    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1231        return UINT64_MAX;
1232    /* Grab the current stats */
1233    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1234   
1235    /* Get the drop counter */
1236    return (uint64_t) stats.ierrors;
1237}
1238
1239static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1240    struct rte_eth_stats stats = {0};
1241   
1242    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1243        return UINT64_MAX;
1244    /* Grab the current stats */
1245    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1246   
1247    /* Get the drop counter */
1248    return (uint64_t) stats.ipackets;
1249}
1250
1251/*
1252 * This is the number of packets filtered by the NIC
1253 * and maybe ahead of number read using libtrace.
1254 *
1255 * XXX we are yet to implement any filtering, but if it was this should
1256 * get the result. So this will just return 0 for now.
1257 */
1258static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1259    struct rte_eth_stats stats = {0};
1260   
1261    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1262        return UINT64_MAX;
1263    /* Grab the current stats */
1264    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1265   
1266    /* Get the drop counter */
1267    return (uint64_t) stats.fdirmiss;
1268}
1269
1270/* Attempts to read a packet in a non-blocking fashion. If one is not
1271 * available a SLEEP event is returned. We do not have the ability to
1272 * create a select()able file descriptor in DPDK.
1273 */
1274static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1275                                        libtrace_packet_t *packet) {
1276    libtrace_eventobj_t event = {0,0,0.0,0};
1277    int nb_rx; /* Number of receive packets we've read */
1278    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1279   
1280    do {
1281   
1282        /* See if we already have a packet waiting */
1283        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1284                        FORMAT(trace)->queue_id, pkts_burst, 1);
1285       
1286        if (nb_rx > 0) {
1287            /* Free the last packet buffer */
1288            if (packet->buffer != NULL) {
1289                /* Buffer is owned by DPDK */
1290                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1291                    rte_pktmbuf_free(packet->buffer);
1292                    packet->buffer = NULL;
1293                } else
1294                /* Buffer is owned by packet i.e. has been malloc'd */
1295                if (packet->buf_control == TRACE_CTRL_PACKET) {
1296                    free(packet->buffer);
1297                    packet->buffer = NULL;
1298                }
1299            }
1300           
1301            packet->buf_control = TRACE_CTRL_EXTERNAL;
1302            packet->type = TRACE_RT_DATA_DPDK;
1303            event.type = TRACE_EVENT_PACKET;
1304            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1305           
1306            /* XXX - Check this passes the filter trace_read_packet normally
1307             * does this for us but this wont */
1308            if (trace->filter) {
1309                if (!trace_apply_filter(trace->filter, packet)) {
1310                    /* Failed the filter so we loop for another packet */
1311                    trace->filtered_packets ++;
1312                    continue;
1313                }
1314            }
1315            trace->accepted_packets ++;
1316        } else {
1317            /* We only want to sleep for a very short time - we are non-blocking */
1318            event.type = TRACE_EVENT_SLEEP;
1319            event.seconds = 0.0001;
1320            event.size = 0;
1321        }
1322       
1323        /* If we get here we have our event */
1324        break;
1325    } while (1);
1326
1327    return event;
1328}
1329
1330
1331static void dpdk_help(void) {
1332    printf("dpdk format module: $Revision: 1752 $\n");
1333    printf("Supported input URIs:\n");
1334    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1335    printf("\tThe -<coreid> is optional \n");
1336    printf("\t e.g. dpdk:0000:01:00.1\n");
1337    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1338    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1339    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1340    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1341    printf("\n");
1342    printf("Supported output URIs:\n");
1343    printf("\tSame format as the input URI.\n");
1344    printf("\t e.g. dpdk:0000:01:00.1\n");
1345    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1346    printf("\n");
1347}
1348
1349 static struct libtrace_format_t dpdk = {
1350        "dpdk",
1351        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1352        TRACE_FORMAT_DPDK,
1353        NULL,                   /* probe filename */
1354        NULL,                               /* probe magic */
1355        dpdk_init_input,            /* init_input */
1356        dpdk_config_input,          /* config_input */
1357        dpdk_start_input,           /* start_input */
1358        dpdk_pause_input,           /* pause_input */
1359        dpdk_init_output,           /* init_output */
1360        NULL,                               /* config_output */
1361        dpdk_start_output,          /* start_ouput */
1362        dpdk_fin_input,             /* fin_input */
1363        dpdk_fin_output,        /* fin_output */
1364        dpdk_read_packet,           /* read_packet */
1365        dpdk_prepare_packet,    /* prepare_packet */
1366        NULL,                               /* fin_packet */
1367        dpdk_write_packet,          /* write_packet */
1368        dpdk_get_link_type,         /* get_link_type */
1369        dpdk_get_direction,         /* get_direction */
1370        dpdk_set_direction,         /* set_direction */
1371        NULL,                               /* get_erf_timestamp */
1372        dpdk_get_timeval,           /* get_timeval */
1373        dpdk_get_timespec,          /* get_timespec */
1374        NULL,                               /* get_seconds */
1375        NULL,                               /* seek_erf */
1376        NULL,                               /* seek_timeval */
1377        NULL,                               /* seek_seconds */
1378        dpdk_get_capture_length,/* get_capture_length */
1379        dpdk_get_wire_length,   /* get_wire_length */
1380        dpdk_get_framing_length,/* get_framing_length */
1381        dpdk_set_capture_length,/* set_capture_length */
1382        NULL,                               /* get_received_packets */
1383        dpdk_get_filtered_packets,/* get_filtered_packets */
1384        dpdk_get_dropped_packets,/* get_dropped_packets */
1385    dpdk_get_captured_packets,/* get_captured_packets */
1386        NULL,                       /* get_fd */
1387        dpdk_trace_event,               /* trace_event */
1388    dpdk_help,              /* help */
1389        NULL
1390};
1391
1392void dpdk_constructor(void) {
1393        register_format(&dpdk);
1394}
Note: See TracBrowser for help on using the repository browser.