source: lib/format_dpdk.c @ 8af0d01

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 8af0d01 was 8af0d01, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Limit threads to number of queues detected on the card we are connected to rather than hardcoded at 8

  • Property mode set to 100644
File size: 60.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
43#include "config.h"
44#include "libtrace.h"
45#include "libtrace_int.h"
46#include "format_helper.h"
47#include "libtrace_arphrd.h"
48#include "hash_toeplitz.h"
49
50#ifdef HAVE_INTTYPES_H
51#  include <inttypes.h>
52#else
53# error "Can't find inttypes.h"
54#endif
55
56#include <stdlib.h>
57#include <assert.h>
58#include <unistd.h>
59#include <endian.h>
60#include <rte_eal.h>
61#include <rte_per_lcore.h>
62#include <rte_debug.h>
63#include <rte_errno.h>
64#include <rte_common.h>
65#include <rte_log.h>
66#include <rte_memcpy.h>
67#include <rte_prefetch.h>
68#include <rte_branch_prediction.h>
69#include <rte_pci.h>
70#include <rte_ether.h>
71#include <rte_ethdev.h>
72#include <rte_ring.h>
73#include <rte_mempool.h>
74#include <rte_mbuf.h>
75#include <rte_launch.h>
76#include <rte_lcore.h>
77#include <rte_per_lcore.h>
78
79/* The default size of memory buffers to use - This is the max size of standard
80 * ethernet packet less the size of the MAC CHECKSUM */
81#define RX_MBUF_SIZE 1514
82
83/* The minimum number of memory buffers per queue tx or rx. Search for
84 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
85 */
86#define MIN_NB_BUF 64
87
88/* Number of receive memory buffers to use
89 * By default this is limited by driver to 4k and must be a multiple of 128.
90 * A modification can be made to the driver to remove this limit.
91 * This can be increased in the driver and here.
92 * Should be at least MIN_NB_BUF.
93 */
94#define NB_RX_MBUF 4096
95
96/* Number of send memory buffers to use.
97 * Same limits apply as those to NB_TX_MBUF.
98 */
99#define NB_TX_MBUF 1024
100
101/* The size of the PCI blacklist needs to be big enough to contain
102 * every PCI device address (listed by lspci every bus:device.function tuple).
103 */
104#define BLACK_LIST_SIZE 50
105
106/* The maximum number of characters the mempool name can be */
107#define MEMPOOL_NAME_LEN 20
108
109#define MBUF(x) ((struct rte_mbuf *) x)
110/* Get the original placement of the packet data */
111#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
112#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
113#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
114                        (uint64_t) tv.tv_usec*1000ull)
115#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
116                        (uint64_t) ts.tv_nsec)
117
118#if RTE_PKTMBUF_HEADROOM != 128
119#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
120         "any libtrace instance processing these packet must be have the" \
121         "same RTE_PKTMBUF_HEADROOM set"
122#endif
123
124/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
125 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
126 *
127 * Make sure you understand what these are doing before enabling them.
128 * They might make traces incompatable with other builds etc.
129 *
130 * These are also included to show how to do somethings which aren't
131 * obvious in the DPDK documentation.
132 */
133
134/* Print verbose messages to stdout */
135#define DEBUG 1
136
137/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
138 * only turn on if you know clock_gettime is a vsyscall on your system
139 * overwise could be a large overhead. Again gettimeofday() should be
140 * vsyscall also if it's not you should seriously consider updating your
141 * kernel.
142 */
143#ifdef HAVE_LIBRT
144/* You can turn this on (set to 1) to prefer clock_gettime */
145#define USE_CLOCK_GETTIME 0
146#else
147/* DONT CHANGE THIS !!! */
148#define USE_CLOCK_GETTIME 0
149#endif
150
151/* This is fairly safe to turn on - currently there appears to be a 'bug'
152 * in DPDK that will remove the checksum by making the packet appear 4bytes
153 * smaller than what it really is. Most formats don't include the checksum
154 * hence writing out a port such as int: ring: and dpdk: assumes there
155 * is no checksum and will attempt to write the checksum as part of the
156 * packet
157 */
158#define GET_MAC_CRC_CHECKSUM 0
159
160/* This requires a modification of the pmd drivers (inside Intel DPDK)
161 */
162#define HAS_HW_TIMESTAMPS_82580 0
163
164#if HAS_HW_TIMESTAMPS_82580
165# define TS_NBITS_82580     40
166/* The maximum on the +ve or -ve side that we can be, make it half way */
167# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
168#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
169#endif
170
171/* As per Intel 82580 specification - mismatch in 82580 datasheet
172 * it states ts is stored in Big Endian, however its actually Little */
173struct hw_timestamp_82580 {
174    uint64_t reserved;
175    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
176};
177
178enum paused_state {
179    DPDK_NEVER_STARTED,
180    DPDK_RUNNING,
181    DPDK_PAUSED,
182};
183
184struct per_lcore_t
185{
186        // TODO move time stamp stuff here
187        uint16_t queue_id;
188        uint8_t port;
189        uint8_t enabled;
190};
191
192/* Used by both input and output however some fields are not used
193 * for output */
194struct dpdk_format_data_t {
195    int8_t promisc; /* promiscuous mode - RX only */
196    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
197    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
198    uint8_t paused; /* See paused_state */ 
199    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
200    int snaplen; /* The snap length for the capture - RX only */
201    /* We always have to setup both rx and tx queues even if we don't want them */
202    int nb_rx_buf; /* The number of packet buffers in the rx ring */
203    int nb_tx_buf; /* The number of packet buffers in the tx ring */
204    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
205    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
206    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
207    unsigned int nb_blacklist; /* Number of blacklist items in are valid */
208    uint8_t rss_key[40]; // This is the RSS KEY
209#if HAS_HW_TIMESTAMPS_82580
210    /* Timestamping only relevent to RX */
211    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
212    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
213    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
214#endif
215        // DPDK normally seems to have a limit of
216        struct per_lcore_t per_lcore[RTE_MAX_LCORE];
217};
218
219enum dpdk_addt_hdr_flags {
220    INCLUDES_CHECKSUM = 0x1,
221    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
222};
223
224/**
225 * A structure placed in front of the packet where we can store
226 * additional information about the given packet.
227 * +--------------------------+
228 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
229 * +--------------------------+
230 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
231 * +--------------------------+
232 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
233 * +--------------------------+
234 * |   sizeof(dpdk_addt_hdr)  | 1 byte
235 * +--------------------------+
236 * *   hw_timestamp_82580     * 16 bytes Optional
237 * +--------------------------+
238 * |       Packet data        | Variable Size
239 * |                          |
240 */
241struct dpdk_addt_hdr {
242    uint64_t timestamp;
243    uint8_t flags;
244    uint8_t direction;
245    uint8_t reserved1;
246    uint8_t reserved2;
247    uint32_t cap_len; /* The size to say the capture is */
248};
249
250/**
251 * We want to blacklist all devices except those on the whitelist
252 * (I say list, but yes it is only the one).
253 *
254 * The default behaviour of rte_pci_probe() will map every possible device
255 * to its DPDK driver. The DPDK driver will take the ethernet device
256 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
257 *
258 * So blacklist all devices except the one that we wish to use so that
259 * the others can still be used as standard ethernet ports.
260 */
261static void blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
262{
263        struct rte_pci_device *dev = NULL;
264        format_data->nb_blacklist = 0;
265
266        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
267
268        TAILQ_FOREACH(dev, &device_list, next) {
269        if (whitelist != NULL && whitelist->domain == dev->addr.domain
270            && whitelist->bus == dev->addr.bus
271            && whitelist->devid == dev->addr.devid
272            && whitelist->function == dev->addr.function)
273            continue;
274                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
275                                / sizeof (format_data->blacklist[0])) {
276                        printf("Warning: too many devices to blacklist consider"
277                                        " increasing BLACK_LIST_SIZE");
278                        break;
279                }
280                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
281                ++format_data->nb_blacklist;
282        }
283
284        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
285}
286
287/**
288 * Parse the URI format as a pci address
289 * Fills in addr, note core is optional and is unchanged if
290 * a value for it is not provided.
291 *
292 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
293 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
294 */
295static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
296    char * wrkstr;
297    char * pch;
298    assert(str);
299    wrkstr = strdup(str);
300   
301    pch = strtok(wrkstr,":");
302    if (pch == NULL || pch[0] == 0) {
303        free(wrkstr); return -1;
304    }
305    addr->domain = (uint16_t) atoi(pch);
306
307    pch = strtok(NULL,":");
308    if (pch == NULL || pch[0] == 0) {
309        free(wrkstr); return -1;
310    }
311    addr->bus = (uint8_t) atoi(pch);
312
313    pch = strtok(NULL,".");
314    if (pch == NULL || pch[0] == 0) {
315        free(wrkstr); return -1;
316    }
317    addr->devid = (uint8_t) atoi(pch);
318
319    pch = strtok(NULL,"-"); /* Might not find the '-' it's optional */
320    if (pch == NULL || pch[0] == 0) {
321        free(wrkstr); return -1;
322    }
323    addr->function = (uint8_t) atoi(pch);
324
325    pch = strtok(NULL, ""); /* Find end of string */
326   
327    if (pch != NULL && pch[0] != 0) {
328        *core = (long) atoi(pch);
329    }
330
331    free(wrkstr);
332    return 0;
333}
334
335#if DEBUG
336/* For debugging */
337static inline void dump_configuration()
338{
339    struct rte_config * global_config;
340    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
341   
342    if (nb_cpu <= 0) {
343        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
344        nb_cpu = 1; /* fallback to just 1 core */
345    }
346    if (nb_cpu > RTE_MAX_LCORE)
347        nb_cpu = RTE_MAX_LCORE;
348   
349    global_config = rte_eal_get_configuration();
350   
351    if (global_config != NULL) {
352        int i;
353        fprintf(stderr, "Intel DPDK setup\n"
354               "---Version      : %"PRIu32"\n"
355               "---Magic        : %"PRIu32"\n"
356               "---Master LCore : %"PRIu32"\n"
357               "---LCore Count  : %"PRIu32"\n",
358               global_config->version, global_config->magic, 
359               global_config->master_lcore, global_config->lcore_count);
360       
361        for (i = 0 ; i < nb_cpu; i++) {
362            fprintf(stderr, "   ---Core %d : %s\n", i, 
363                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
364        }
365       
366        const char * proc_type;
367        switch (global_config->process_type) {
368            case RTE_PROC_AUTO:
369                proc_type = "auto";
370                break;
371            case RTE_PROC_PRIMARY:
372                proc_type = "primary";
373                break;
374            case RTE_PROC_SECONDARY:
375                proc_type = "secondary";
376                break;
377            case RTE_PROC_INVALID:
378                proc_type = "invalid";
379                break;
380            default:
381                proc_type = "something worse than invalid!!";
382        }
383        fprintf(stderr, "---Process Type : %s\n", proc_type);
384    }
385   
386}
387#endif
388
389static inline int dpdk_init_enviroment(char * uridata, struct dpdk_format_data_t * format_data,
390                                        char * err, int errlen) {
391    int ret; /* Returned error codes */
392    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
393    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
394    char mem_map[20] = {0}; /* The memory name */
395    long nb_cpu; /* The number of CPUs in the system */
396    long my_cpu; /* The CPU number we want to bind to */
397   
398#if DEBUG
399    rte_set_log_level(RTE_LOG_DEBUG);
400#else
401    rte_set_log_level(RTE_LOG_WARNING);
402#endif
403    /* Using proc-type auto allows this to be either primary or secondary
404     * Secondary allows two intances of libtrace to be used on different
405     * ports. However current version of DPDK doesn't support this on the
406     * same card (My understanding is this should work with two seperate
407     * cards).
408     *
409     * Using unique file prefixes mean separate memory is used, unlinking
410     * the two processes. However be careful we still cannot access a
411     * port that already in use.
412     */
413    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
414                "--file-prefix", mem_map, "-m", "256", NULL};
415    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
416   
417    /* This initilises the Enviroment Abstraction Layer (EAL)
418     * If we had slave workers these are put into WAITING state
419     *
420     * Basically binds this thread to a fixed core, which we choose as
421     * the last core on the machine (assuming fewer interrupts mapped here).
422     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
423     * "-n" the number of memory channels into the CPU (hardware specific)
424     *      - Most likely to be half the number of ram slots in your machine.
425     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
426     * Controls where in memory packets are stored and should spread across
427     * the channels. We just use 1 to be safe.
428     */
429
430    /* Get the number of cpu cores in the system and use the last core */
431    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
432    if (nb_cpu <= 0) {
433        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
434        nb_cpu = 1; /* fallback to the first core */
435    }
436    if (nb_cpu > RTE_MAX_LCORE)
437        nb_cpu = RTE_MAX_LCORE;
438
439    my_cpu = nb_cpu;
440    /* This allows the user to specify the core - we would try to do this
441     * automatically but it's hard to tell that this is secondary
442     * before running rte_eal_init(...). Currently we are limited to 1
443     * instance per core due to the way memory is allocated. */
444    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
445        snprintf(err, errlen, "Failed to parse URI");
446        return -1;
447    }
448
449    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
450                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
451
452    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
453        snprintf(err, errlen, 
454          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
455          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
456        return -1;
457    }
458
459    /* Make our mask */ //  0x1 << (my_cpu - 1)
460    snprintf(cpu_number, sizeof(cpu_number), "%x", 0x3);
461
462        /* Give this a name */
463        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
464    /* rte_eal_init it makes a call to getopt so we need to reset the
465     * global optind variable of getopt otherwise this fails */
466    optind = 1;
467    if ((ret = rte_eal_init(argc, argv)) < 0) {
468        snprintf(err, errlen, 
469          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
470        return -1;
471    }
472#if DEBUG
473    dump_configuration();
474#endif
475    /* This registers all available NICs with Intel DPDK
476     * These are not loaded until rte_eal_pci_probe() is called.
477     */
478    if ((ret = rte_pmd_init_all()) < 0) {
479        snprintf(err, errlen, 
480          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
481        return -1;
482    }
483
484    /* Black list all ports besides the one that we want to use */
485    blacklist_devices(format_data, &use_addr);
486
487    /* This loads DPDK drivers against all ports that are not blacklisted */
488        if ((ret = rte_eal_pci_probe()) < 0) {
489        snprintf(err, errlen, 
490            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
491        return -1;
492    }
493
494    format_data->nb_ports = rte_eth_dev_count();
495
496    if (format_data->nb_ports != 1) {
497        snprintf(err, errlen, 
498            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
499            format_data->nb_ports);
500        return -1;
501    }
502   
503    struct rte_eth_dev_info dev_info;
504    rte_eth_dev_info_get(0, &dev_info);
505    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d", 
506                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
507
508    return 0;
509}
510
511static int dpdk_init_input (libtrace_t *libtrace) {
512    char err[500];
513    err[0] = 0;
514    int i;
515   
516    libtrace->format_data = (struct dpdk_format_data_t *)
517                            malloc(sizeof(struct dpdk_format_data_t));
518    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
519    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
520    FORMAT(libtrace)->nb_ports = 0;
521    FORMAT(libtrace)->snaplen = 0; /* Use default */
522    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
523    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
524    FORMAT(libtrace)->promisc = -1;
525    FORMAT(libtrace)->pktmbuf_pool = NULL;
526    FORMAT(libtrace)->nb_blacklist = 0;
527    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
528    FORMAT(libtrace)->mempool_name[0] = 0;
529#if HAS_HW_TIMESTAMPS_82580
530    FORMAT(libtrace)->ts_first_sys = 0;
531    FORMAT(libtrace)->ts_last_sys = 0;
532    FORMAT(libtrace)->wrap_count = 0;
533#endif
534        for (i = 0;i < RTE_MAX_LCORE; i++) {
535                // Disabled by default
536                FORMAT(libtrace)->per_lcore[i].enabled = 0;
537        }
538       
539    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
540        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
541        free(libtrace->format_data);
542        libtrace->format_data = NULL;
543        return -1;
544    }
545    return 0;
546};
547
548static int dpdk_init_output(libtrace_out_t *libtrace)
549{
550    char err[500];
551    err[0] = 0;
552   
553    libtrace->format_data = (struct dpdk_format_data_t *)
554                            malloc(sizeof(struct dpdk_format_data_t));
555    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
556    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
557    FORMAT(libtrace)->nb_ports = 0;
558    FORMAT(libtrace)->snaplen = 0; /* Use default */
559    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
560    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
561    FORMAT(libtrace)->promisc = -1;
562    FORMAT(libtrace)->pktmbuf_pool = NULL;
563    FORMAT(libtrace)->nb_blacklist = 0;
564    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
565    FORMAT(libtrace)->mempool_name[0] = 0;
566#if HAS_HW_TIMESTAMPS_82580
567    FORMAT(libtrace)->ts_first_sys = 0;
568    FORMAT(libtrace)->ts_last_sys = 0;
569    FORMAT(libtrace)->wrap_count = 0;
570#endif
571
572    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
573        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
574        free(libtrace->format_data);
575        libtrace->format_data = NULL;
576        return -1;
577    }
578    return 0;
579};
580
581static int dpdk_pconfig_input (libtrace_t *libtrace,
582                                trace_parallel_option_t option,
583                                void *data) {
584        switch (option) {
585                case TRACE_OPTION_SET_HASHER:
586                        switch (*((enum hasher_types *) data))
587                        {
588                                case HASHER_BALANCE:
589                                case HASHER_UNIDIRECTIONAL:
590                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
591                                        return 0;
592                                case HASHER_BIDIRECTIONAL:
593                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
594                                        return 0;
595                                case HASHER_HARDWARE:
596                                case HASHER_CUSTOM:
597                                        // We don't support these
598                                        return -1;
599                        }
600        }
601        return -1;
602}
603/**
604 * Note here snaplen excludes the MAC checksum. Packets over
605 * the requested snaplen will be dropped. (Excluding MAC checksum)
606 *
607 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
608 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
609 * is set the maximum size of the returned packet would be 1518 otherwise
610 * 1514 would be the largest size possibly returned.
611 *
612 */
613static int dpdk_config_input (libtrace_t *libtrace,
614                                        trace_option_t option,
615                                        void *data) {
616    switch (option) {
617        case TRACE_OPTION_SNAPLEN:
618            /* Only support changing snaplen before a call to start is
619             * made */
620            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
621                FORMAT(libtrace)->snaplen=*(int*)data;
622            else
623                return -1;
624            return 0;
625                case TRACE_OPTION_PROMISC:
626                        FORMAT(libtrace)->promisc=*(int*)data;
627            return 0;
628        case TRACE_OPTION_FILTER:
629            /* TODO filtering */
630            break;
631        case TRACE_OPTION_META_FREQ:
632            break;
633        case TRACE_OPTION_EVENT_REALTIME:
634            break;
635        /* Avoid default: so that future options will cause a warning
636         * here to remind us to implement it, or flag it as
637         * unimplementable
638         */
639    }
640
641        /* Don't set an error - trace_config will try to deal with the
642         * option and will set an error if it fails */
643    return -1;
644}
645
646/* Can set jumbo frames/ or limit the size of a frame by setting both
647 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
648 *
649 */
650static struct rte_eth_conf port_conf = {
651        .rxmode = {
652                .mq_mode = ETH_RSS,
653                .split_hdr_size = 0,
654                .header_split   = 0, /**< Header Split disabled */
655                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
656                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
657                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
658        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
659#if GET_MAC_CRC_CHECKSUM
660/* So it appears that if hw_strip_crc is turned off the driver will still
661 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
662 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
663 * So lets just add it back on when we receive the packet.
664 */
665                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
666#else
667/* By default strip the MAC checksum because it's a bit of a hack to
668 * actually read these. And don't want to rely on disabling this to actualy
669 * always cut off the checksum in the future
670 */
671        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
672#endif
673        },
674        .txmode = {
675                .mq_mode = ETH_DCB_NONE,
676        },
677        .rx_adv_conf = {
678                .rss_conf = {
679                        // .rss_key = &rss_key, // We set this per format
680                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
681                },
682        },
683};
684
685static const struct rte_eth_rxconf rx_conf = {
686        .rx_thresh = {
687                .pthresh = 8,/* RX_PTHRESH prefetch */
688                .hthresh = 8,/* RX_HTHRESH host */
689                .wthresh = 4,/* RX_WTHRESH writeback */
690        },
691    .rx_free_thresh = 0,
692    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
693};
694
695static const struct rte_eth_txconf tx_conf = {
696        .tx_thresh = {
697        /**
698         * TX_PTHRESH prefetch
699         * Set on the NIC, if the number of unprocessed descriptors to queued on
700         * the card fall below this try grab at least hthresh more unprocessed
701         * descriptors.
702         */
703                .pthresh = 36,
704
705        /* TX_HTHRESH host
706         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
707         */
708                .hthresh = 0,
709       
710        /* TX_WTHRESH writeback
711         * Set on the NIC, the number of sent descriptors before writing back
712         * status to confirm the transmission. This is done more efficiently as
713         * a bulk DMA-transfer rather than writing one at a time.
714         * Similar to tx_free_thresh however this is applied to the NIC, where
715         * as tx_free_thresh is when DPDK will check these. This is extended
716         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
717         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
718         */
719                .wthresh = 4,
720        },
721
722    /* Used internally by DPDK rather than passed to the NIC. The number of
723     * packet descriptors to send before checking for any responses written
724     * back (to confirm the transmission). Default = 32 if set to 0)
725     */
726        .tx_free_thresh = 0,
727
728    /* This is the Report Status threshold, used by 10Gbit cards,
729     * This signals the card to only write back status (such as
730     * transmission successful) after this minimum number of transmit
731     * descriptors are seen. The default is 32 (if set to 0) however if set
732     * to greater than 1 TX wthresh must be set to zero, because this is kindof
733     * a replacement. See the dpdk programmers guide for more restrictions.
734     */
735        .tx_rs_thresh = 1,
736};
737
738/* Attach memory to the port and start the port or restart the port.
739 */
740static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
741    int ret; /* Check return values for errors */
742    struct rte_eth_link link_info; /* Wait for link */
743   
744    /* Already started */
745    if (format_data->paused == DPDK_RUNNING)
746        return 0;
747
748    /* First time started we need to alloc our memory, doing this here
749     * rather than in enviroment setup because we don't have snaplen then */
750    if (format_data->paused == DPDK_NEVER_STARTED) {
751        if (format_data->snaplen == 0) {
752            format_data->snaplen = RX_MBUF_SIZE;
753            port_conf.rxmode.jumbo_frame = 0;
754            port_conf.rxmode.max_rx_pkt_len = 0;
755        } else {
756            /* Use jumbo frames */
757            port_conf.rxmode.jumbo_frame = 1;
758            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
759        }
760
761        /* This is additional overhead so make sure we allow space for this */
762#if GET_MAC_CRC_CHECKSUM
763        format_data->snaplen += ETHER_CRC_LEN;
764#endif
765#if HAS_HW_TIMESTAMPS_82580
766        format_data->snaplen += sizeof(struct hw_timestamp_82580);
767#endif
768
769        /* Create the mbuf pool, which is the place our packets are allocated
770         * from - TODO figure out if there is is a free function (I cannot see one)
771         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
772         * allocate however that extra 1 packet is not used.
773         * (I assume <= vs < error some where in DPDK code)
774         * TX requires nb_tx_buffers + 1 in the case the queue is full
775         * so that will fill the new buffer and wait until slots in the
776         * ring become available.
777         */
778#if DEBUG
779    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
780#endif
781        format_data->pktmbuf_pool =
782            rte_mempool_create(format_data->mempool_name,
783                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
784                       format_data->snaplen + sizeof(struct rte_mbuf) 
785                                        + RTE_PKTMBUF_HEADROOM,
786                       8, sizeof(struct rte_pktmbuf_pool_private),
787                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
788                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
789
790        if (format_data->pktmbuf_pool == NULL) {
791            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
792                        "pool failed: %s", strerror(rte_errno));
793            return -1;
794        }
795    }
796   
797    /* ----------- Now do the setup for the port mapping ------------ */
798    /* Order of calls must be
799     * rte_eth_dev_configure()
800     * rte_eth_tx_queue_setup()
801     * rte_eth_rx_queue_setup()
802     * rte_eth_dev_start()
803     * other rte_eth calls
804     */
805   
806   
807    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
808   
809    /* This must be called first before another *eth* function
810     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
811    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
812    if (ret < 0) {
813        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
814                            " %"PRIu8" : %s", format_data->port,
815                            strerror(-ret));
816        return -1;
817    }
818    /* Initialise the TX queue a minimum value if using this port for
819     * receiving. Otherwise a larger size if writing packets.
820     */
821    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
822                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
823    if (ret < 0) {
824        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
825                            " %"PRIu8" : %s", format_data->port,
826                            strerror(-ret));
827        return -1;
828    }
829    /* Initialise the RX queue with some packets from memory */
830    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
831                            format_data->nb_rx_buf, SOCKET_ID_ANY, 
832                            &rx_conf, format_data->pktmbuf_pool);
833    if (ret < 0) {
834        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
835                    " %"PRIu8" : %s", format_data->port,
836                    strerror(-ret));
837        return -1;
838    }
839   
840    /* Start device */
841    ret = rte_eth_dev_start(format_data->port);
842    if (ret < 0) {
843        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
844                    strerror(-ret));
845        return -1;
846    }
847
848    /* Default promiscuous to on */
849    if (format_data->promisc == -1)
850        format_data->promisc = 1;
851   
852    if (format_data->promisc == 1)
853        rte_eth_promiscuous_enable(format_data->port);
854    else
855        rte_eth_promiscuous_disable(format_data->port);
856   
857    /* Wait for the link to come up */
858    rte_eth_link_get(format_data->port, &link_info);
859#if DEBUG
860    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
861            (int) link_info.link_duplex, (int) link_info.link_speed);
862#endif
863
864    /* We have now successfully started/unpaused */
865    format_data->paused = DPDK_RUNNING;
866   
867    return 0;
868}
869
870/* Attach memory to the port and start the port or restart the ports.
871 */
872static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t * format_data, char *err, int errlen, uint16_t rx_queues){
873    int ret, i; /* Check return values for errors */
874    struct rte_eth_link link_info; /* Wait for link */
875   
876    /* Already started */
877    if (format_data->paused == DPDK_RUNNING)
878        return 0;
879
880    /* First time started we need to alloc our memory, doing this here
881     * rather than in environment setup because we don't have snaplen then */
882    if (format_data->paused == DPDK_NEVER_STARTED) {
883        if (format_data->snaplen == 0) {
884            format_data->snaplen = RX_MBUF_SIZE;
885            port_conf.rxmode.jumbo_frame = 0;
886            port_conf.rxmode.max_rx_pkt_len = 0;
887        } else {
888            /* Use jumbo frames */
889            port_conf.rxmode.jumbo_frame = 1;
890            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
891        }
892
893        /* This is additional overhead so make sure we allow space for this */
894#if GET_MAC_CRC_CHECKSUM
895        format_data->snaplen += ETHER_CRC_LEN;
896#endif
897#if HAS_HW_TIMESTAMPS_82580
898        format_data->snaplen += sizeof(struct hw_timestamp_82580);
899#endif
900
901        /* Create the mbuf pool, which is the place our packets are allocated
902         * from - TODO figure out if there is is a free function (I cannot see one)
903         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
904         * allocate however that extra 1 packet is not used.
905         * (I assume <= vs < error some where in DPDK code)
906         * TX requires nb_tx_buffers + 1 in the case the queue is full
907         * so that will fill the new buffer and wait until slots in the
908         * ring become available.
909         */
910#if DEBUG
911    printf("Creating mempool named %s\n", format_data->mempool_name);
912#endif
913        format_data->pktmbuf_pool =
914            rte_mempool_create(format_data->mempool_name,
915                       format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1,
916                       format_data->snaplen + sizeof(struct rte_mbuf) 
917                                        + RTE_PKTMBUF_HEADROOM,
918                       8, sizeof(struct rte_pktmbuf_pool_private),
919                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
920                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
921
922        if (format_data->pktmbuf_pool == NULL) {
923            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
924                        "pool failed: %s", strerror(rte_errno));
925            return -1;
926        }
927    }
928   
929    /* ----------- Now do the setup for the port mapping ------------ */
930    /* Order of calls must be
931     * rte_eth_dev_configure()
932     * rte_eth_tx_queue_setup()
933     * rte_eth_rx_queue_setup()
934     * rte_eth_dev_start()
935     * other rte_eth calls
936     */
937   
938    /* This must be called first before another *eth* function
939     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
940    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
941    if (ret < 0) {
942        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
943                            " %"PRIu8" : %s", format_data->port,
944                            strerror(-ret));
945        return -1;
946    }
947#if DEBUG
948    printf("Doing dev configure\n");
949#endif
950    /* Initialise the TX queue a minimum value if using this port for
951     * receiving. Otherwise a larger size if writing packets.
952     */
953    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
954                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
955    if (ret < 0) {
956        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
957                            " %"PRIu8" : %s", format_data->port,
958                            strerror(-ret));
959        return -1;
960    }
961   
962    for (i=0; i < rx_queues; i++) {
963#if DEBUG
964    printf("Doing queue configure\n");
965#endif 
966                /* Initialise the RX queue with some packets from memory */
967                ret = rte_eth_rx_queue_setup(format_data->port, i,
968                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
969                                                                &rx_conf, format_data->pktmbuf_pool);
970                if (ret < 0) {
971                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
972                                                " %"PRIu8" : %s", format_data->port,
973                                                strerror(-ret));
974                        return -1;
975                }
976        }
977   
978#if DEBUG
979    printf("Doing start device\n");
980#endif 
981    /* Start device */
982    ret = rte_eth_dev_start(format_data->port);
983#if DEBUG
984    fprintf(stderr, "Done start device\n");
985#endif 
986    if (ret < 0) {
987        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
988                    strerror(-ret));
989        return -1;
990    }
991
992
993    /* Default promiscuous to on */
994    if (format_data->promisc == -1)
995        format_data->promisc = 1;
996   
997    if (format_data->promisc == 1)
998        rte_eth_promiscuous_enable(format_data->port);
999    else
1000        rte_eth_promiscuous_disable(format_data->port);
1001   
1002   
1003    /* We have now successfully started/unpased */
1004    format_data->paused = DPDK_RUNNING;
1005   
1006    // Can use remote launch for all
1007    /*RTE_LCORE_FOREACH_SLAVE(i) {
1008                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
1009        }*/
1010   
1011    /* Wait for the link to come up */
1012    rte_eth_link_get(format_data->port, &link_info);
1013#if DEBUG
1014    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1015            (int) link_info.link_duplex, (int) link_info.link_speed);
1016#endif
1017
1018    return 0;
1019}
1020
1021static int dpdk_start_input (libtrace_t *libtrace) {
1022    char err[500];
1023    err[0] = 0;
1024
1025    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1026        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1027        free(libtrace->format_data);
1028        libtrace->format_data = NULL;
1029        return -1;
1030    }
1031    return 0;
1032}
1033
1034static inline size_t dpdk_get_max_rx_queues(uint8_t port_id) {
1035    struct rte_eth_dev_info dev_info;
1036    rte_eth_dev_info_get(0, &dev_info);
1037    return dev_info.max_rx_queues;
1038}
1039
1040static int dpdk_pstart_input (libtrace_t *libtrace) {
1041    char err[500];
1042    int enabled_lcore_count = 0, i=0;
1043    int tot = libtrace->perpkt_thread_count;
1044    err[0] = 0;
1045       
1046        libtrace->perpkt_thread_count;
1047       
1048        for (i = 0; i < RTE_MAX_LCORE; i++)
1049        {
1050                if (rte_lcore_is_enabled(i))
1051                        enabled_lcore_count++;
1052        }
1053       
1054        tot = MIN(libtrace->perpkt_thread_count, enabled_lcore_count);
1055        tot = MIN(tot, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1056        fprintf(stderr, "Running pstart DPDK %d %d %d %d\n", tot, libtrace->perpkt_thread_count, enabled_lcore_count, rte_lcore_count());
1057       
1058    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1059        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1060        free(libtrace->format_data);
1061        libtrace->format_data = NULL;
1062        return -1;
1063    }
1064   
1065    return 0;
1066    return tot;
1067}
1068
1069static int dpdk_start_output(libtrace_out_t *libtrace)
1070{
1071    char err[500];
1072    err[0] = 0;
1073   
1074    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1075        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1076        free(libtrace->format_data);
1077        libtrace->format_data = NULL;
1078        return -1;
1079    }
1080    return 0;
1081}
1082
1083static int dpdk_pause_input(libtrace_t * libtrace){
1084    /* This stops the device, but can be restarted using rte_eth_dev_start() */
1085    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1086#if DEBUG     
1087        fprintf(stderr, "Pausing port\n");
1088#endif
1089        rte_eth_dev_stop(FORMAT(libtrace)->port);
1090        FORMAT(libtrace)->paused = DPDK_PAUSED;
1091        /* If we pause it the driver will be reset and likely our counter */
1092#if HAS_HW_TIMESTAMPS_82580
1093        FORMAT(libtrace)->ts_first_sys = 0;
1094        FORMAT(libtrace)->ts_last_sys = 0;
1095#endif
1096    }
1097    return 0;
1098}
1099
1100static int dpdk_write_packet(libtrace_out_t *trace, 
1101                libtrace_packet_t *packet){
1102    struct rte_mbuf* m_buff[1];
1103   
1104    int wirelen = trace_get_wire_length(packet);
1105    int caplen = trace_get_capture_length(packet);
1106   
1107    /* Check for a checksum and remove it */
1108    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1109                                            wirelen == caplen)
1110        caplen -= ETHER_CRC_LEN;
1111
1112    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1113    if (m_buff[0] == NULL) {
1114        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1115        return -1;
1116    } else {
1117        int ret;
1118        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1119        do {
1120            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1121        } while (ret != 1);
1122    }
1123
1124    return 0;
1125}
1126
1127static int dpdk_fin_input(libtrace_t * libtrace) {
1128    /* Free our memory structures */
1129    if (libtrace->format_data != NULL) {
1130        /* Close the device completely, device cannot be restarted */
1131        if (FORMAT(libtrace)->port != 0xFF)
1132            rte_eth_dev_close(FORMAT(libtrace)->port);
1133        /* filter here if we used it */
1134                free(libtrace->format_data);
1135        }
1136
1137    /* Revert to the original PCI drivers */
1138    /* No longer in DPDK
1139    rte_eal_pci_exit(); */
1140    return 0;
1141}
1142
1143
1144static int dpdk_fin_output(libtrace_out_t * libtrace) {
1145    /* Free our memory structures */
1146    if (libtrace->format_data != NULL) {
1147        /* Close the device completely, device cannot be restarted */
1148        if (FORMAT(libtrace)->port != 0xFF)
1149            rte_eth_dev_close(FORMAT(libtrace)->port);
1150        /* filter here if we used it */
1151                free(libtrace->format_data);
1152        }
1153
1154    /* Revert to the original PCI drivers */
1155    /* No longer in DPDK
1156    rte_eal_pci_exit(); */
1157    return 0;
1158}
1159
1160/**
1161 * Get the start of additional header that we added to a packet.
1162 */
1163static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1164    uint8_t *hdrsize;
1165    assert(packet);
1166    assert(packet->buffer);
1167    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
1168    /* The byte before the original packet data denotes the size in bytes
1169     * of our additional header that we added sits before the 'size byte' */
1170    hdrsize--;
1171    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
1172}
1173
1174static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1175    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1176    return hdr->cap_len;
1177}
1178
1179static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1180    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1181    if (size > hdr->cap_len) {
1182        /* Cannot make a packet bigger */
1183                return trace_get_capture_length(packet);
1184        }
1185
1186    /* Reset the cached capture length first*/
1187    packet->capture_length = -1;
1188    hdr->cap_len = (uint32_t) size;
1189        return trace_get_capture_length(packet);
1190}
1191
1192static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1193    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1194    int org_cap_size; /* The original capture size */
1195    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1196        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1197                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
1198                            sizeof(struct hw_timestamp_82580);
1199    } else {
1200        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1201                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
1202    }
1203    if (hdr->flags & INCLUDES_CHECKSUM) {
1204        return org_cap_size;
1205    } else {
1206        /* DPDK packets are always TRACE_TYPE_ETH packets */
1207        return org_cap_size + ETHER_CRC_LEN;
1208    }
1209}
1210static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1211    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1212    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1213        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1214                sizeof(struct hw_timestamp_82580);
1215    else
1216        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1217}
1218
1219static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1220                libtrace_packet_t *packet, void *buffer,
1221                libtrace_rt_types_t rt_type, uint32_t flags) {
1222    assert(packet);
1223    if (packet->buffer != buffer &&
1224        packet->buf_control == TRACE_CTRL_PACKET) {
1225        free(packet->buffer);
1226    }
1227
1228    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1229        packet->buf_control = TRACE_CTRL_PACKET;
1230    } else
1231        packet->buf_control = TRACE_CTRL_EXTERNAL;
1232
1233    packet->buffer = buffer;
1234    packet->header = buffer;
1235
1236    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1237    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1238    packet->type = rt_type;
1239    return 0;
1240}
1241
1242/*
1243 * Does any extra preperation to a captured packet.
1244 * This includes adding our extra header to it with the timestamp
1245 */
1246static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1247                                                        struct rte_mbuf* pkt){
1248    uint8_t * hdr_size;
1249    struct dpdk_addt_hdr *hdr;
1250#if HAS_HW_TIMESTAMPS_82580
1251    struct hw_timestamp_82580 *hw_ts;
1252    struct timeval cur_sys_time;
1253    uint64_t cur_sys_time_ns;
1254    uint64_t estimated_wraps;
1255   
1256    /* Using gettimeofday because it's most likely to be a vsyscall
1257     * We don't want to slow down anything with systemcalls we dont need
1258     * accauracy */
1259    gettimeofday(&cur_sys_time, NULL);
1260#else
1261# if USE_CLOCK_GETTIME
1262    struct timespec cur_sys_time;
1263   
1264    /* This looks terrible and I feel bad doing it. But it's OK
1265     * on new kernels, because this is a vsyscall */
1266    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1267# else
1268    struct timeval cur_sys_time;
1269    /* Should be a vsyscall */
1270    gettimeofday(&cur_sys_time, NULL);
1271# endif
1272#endif
1273
1274    /* Record the size of our header */
1275    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1276    *hdr_size = sizeof(struct dpdk_addt_hdr);
1277    /* Now put our header in front of that size */
1278    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1279    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1280   
1281#if GET_MAC_CRC_CHECKSUM
1282    /* Add back in the CRC sum */
1283    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1284    pkt->pkt.data_len += ETHER_CRC_LEN;
1285    hdr->flags |= INCLUDES_CHECKSUM;
1286#endif
1287
1288#if HAS_HW_TIMESTAMPS_82580
1289    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1290     *
1291     *        +----------+---+   +--------------+
1292     *  82580 |    24    | 8 |   |      32      |
1293     *        +----------+---+   +--------------+
1294     *          reserved  \______ 40 bits _____/
1295     *
1296     * The 40 bit 82580 SYSTIM overflows every
1297     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1298     *
1299     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1300     * Endian (for the full 64 bits) i.e. picture is mirrored
1301     */
1302   
1303    /* The timestamp is sitting before our packet and is included in pkt_len */
1304    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1305    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1306   
1307    /* Despite what the documentation says this is in Little
1308     * Endian byteorder. Mask the reserved section out.
1309     */
1310    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1311                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1312               
1313    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1314    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1315        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1316        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1317    }
1318   
1319    /* This will have serious problems if packets aren't read quickly
1320     * that is within a couple of seconds because our clock cycles every
1321     * 18 seconds */
1322    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1323                            / (1ull<<TS_NBITS_82580);
1324   
1325    /* Estimated_wraps gives the number of times the counter should have
1326     * wrapped (however depending on value last time it could have wrapped
1327     * twice more (if hw clock is close to its max value) or once less (allowing
1328     * for a bit of variance between hw and sys clock). But if the clock
1329     * shouldn't have wrapped once then don't allow it to go backwards in time */
1330    if (unlikely(estimated_wraps >= 2)) {
1331        /* 2 or more wrap arounds add all but the very last wrap */
1332        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1333    }
1334   
1335    /* Set the timestamp to the lowest possible value we're considering */
1336    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1337                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1338   
1339    /* In most runs only the first if() will need evaluating - i.e our
1340     * estimate is correct. */
1341    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1342                                hdr->timestamp, MAXSKEW_82580))) {
1343        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1344        FORMAT(libtrace)->wrap_count++;
1345        hdr->timestamp += (1ull<<TS_NBITS_82580);
1346        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1347                                hdr->timestamp, MAXSKEW_82580)) {
1348            /* Failed to match estimated_wraps */
1349            FORMAT(libtrace)->wrap_count++;
1350            hdr->timestamp += (1ull<<TS_NBITS_82580);
1351            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1352                                hdr->timestamp, MAXSKEW_82580)) {
1353                if (estimated_wraps == 0) {
1354                    /* 0 case Failed to match estimated_wraps+2 */
1355                    printf("WARNING - Hardware Timestamp failed to"
1356                                            " match using systemtime!\n");
1357                    hdr->timestamp = cur_sys_time_ns;
1358                } else {
1359                    /* Failed to match estimated_wraps+1 */
1360                    FORMAT(libtrace)->wrap_count++;
1361                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1362                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1363                                hdr->timestamp, MAXSKEW_82580)) {
1364                        /* Failed to match estimated_wraps+2 */
1365                        printf("WARNING - Hardware Timestamp failed to"
1366                                            " match using systemtime!!\n");
1367                    }
1368                }
1369            }
1370        }
1371    }
1372
1373    /* Log our previous for the next loop */
1374    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1375
1376#else
1377# if USE_CLOCK_GETTIME
1378    hdr->timestamp = TS_TO_NS(cur_sys_time);
1379# else
1380    hdr->timestamp = TV_TO_NS(cur_sys_time);
1381# endif
1382#endif
1383
1384    /* Intels samples prefetch into level 0 cache lets assume it is a good
1385     * idea and do the same */
1386    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1387    packet->buffer = pkt;
1388    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1389
1390    /* Set our capture length for the first time */
1391    hdr->cap_len = dpdk_get_wire_length(packet);
1392    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1393        hdr->cap_len -= ETHER_CRC_LEN;
1394    }
1395   
1396
1397    return dpdk_get_framing_length(packet) +
1398                        dpdk_get_capture_length(packet);
1399}
1400
1401
1402static void dpdk_fin_packet(libtrace_packet_t *packet)
1403{
1404        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1405                rte_pktmbuf_free(packet->buffer);
1406                packet->buffer = NULL;
1407        }
1408}
1409
1410static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1411    int nb_rx; /* Number of rx packets we've recevied */
1412    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1413
1414    /* Free the last packet buffer */
1415    if (packet->buffer != NULL) {
1416        /* Buffer is owned by DPDK */
1417        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1418            rte_pktmbuf_free(packet->buffer);
1419            packet->buffer = NULL;
1420        } else
1421        /* Buffer is owned by packet i.e. has been malloc'd */
1422        if (packet->buf_control == TRACE_CTRL_PACKET) {
1423            free(packet->buffer);
1424            packet->buffer = NULL;
1425        }
1426    }
1427   
1428    packet->buf_control = TRACE_CTRL_EXTERNAL;
1429    packet->type = TRACE_RT_DATA_DPDK;
1430   
1431    /* Wait for a packet */
1432    while (1) {
1433        /* Poll for a single packet */
1434        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1435                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1436        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1437            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1438        }
1439    }
1440   
1441    /* We'll never get here - but if we did it would be bad */
1442    return -1;
1443}
1444libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
1445static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1446    int nb_rx; /* Number of rx packets we've recevied */
1447    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1448
1449    /* Free the last packet buffer */
1450    if (packet->buffer != NULL) {
1451        /* Buffer is owned by DPDK */
1452        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1453            rte_pktmbuf_free(packet->buffer);
1454            packet->buffer = NULL;
1455        } else
1456        /* Buffer is owned by packet i.e. has been malloc'd */
1457        if (packet->buf_control == TRACE_CTRL_PACKET) {
1458            free(packet->buffer);
1459            packet->buffer = NULL;
1460        }
1461    }
1462   
1463    packet->buf_control = TRACE_CTRL_EXTERNAL;
1464    packet->type = TRACE_RT_DATA_DPDK;
1465   
1466    /* Wait for a packet */
1467    while (1) {
1468        /* Poll for a single packet */
1469        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1470                            get_thread_table_num(libtrace), pkts_burst, 1);
1471        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1472                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1473            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1474        }
1475        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
1476        if (libtrace_message_queue_count(&(get_thread_table(libtrace)->messages)) > 0) {
1477                        printf("Extra message yay");
1478                        return -2;
1479                }
1480    }
1481   
1482    /* We'll never get here - but if we did it would be bad */
1483    return -1;
1484}
1485
1486static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1487    struct timeval tv;
1488    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1489   
1490    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1491    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1492    return tv;
1493}
1494
1495static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1496    struct timespec ts;
1497    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1498   
1499    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1500    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1501    return ts;
1502}
1503
1504static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1505    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1506}
1507
1508static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1509    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1510    return (libtrace_direction_t) hdr->direction;
1511}
1512
1513static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1514    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1515    hdr->direction = (uint8_t) direction;
1516    return (libtrace_direction_t) hdr->direction;
1517}
1518
1519/*
1520 * NOTE: Drops could occur for other reasons than running out of buffer
1521 * space. Such as failed MAC checksums and oversized packets.
1522 */
1523static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1524    struct rte_eth_stats stats = {0};
1525   
1526    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1527        return UINT64_MAX;
1528    /* Grab the current stats */
1529    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1530   
1531    /* Get the drop counter */
1532    return (uint64_t) stats.ierrors;
1533}
1534
1535static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1536    struct rte_eth_stats stats = {0};
1537   
1538    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1539        return UINT64_MAX;
1540    /* Grab the current stats */
1541    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1542   
1543    /* Get the drop counter */
1544    return (uint64_t) stats.ipackets;
1545}
1546
1547/*
1548 * This is the number of packets filtered by the NIC
1549 * and maybe ahead of number read using libtrace.
1550 *
1551 * XXX we are yet to implement any filtering, but if it was this should
1552 * get the result. So this will just return 0 for now.
1553 */
1554static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1555    struct rte_eth_stats stats = {0};
1556   
1557    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1558        return UINT64_MAX;
1559    /* Grab the current stats */
1560    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1561   
1562    /* Get the drop counter */
1563    return (uint64_t) stats.fdirmiss;
1564}
1565
1566/* Attempts to read a packet in a non-blocking fashion. If one is not
1567 * available a SLEEP event is returned. We do not have the ability to
1568 * create a select()able file descriptor in DPDK.
1569 */
1570static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1571                                        libtrace_packet_t *packet) {
1572    libtrace_eventobj_t event = {0,0,0.0,0};
1573    int nb_rx; /* Number of receive packets we've read */
1574    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1575   
1576    do {
1577   
1578        /* See if we already have a packet waiting */
1579        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1580                        FORMAT(trace)->queue_id, pkts_burst, 1);
1581       
1582        if (nb_rx > 0) {
1583            /* Free the last packet buffer */
1584            if (packet->buffer != NULL) {
1585                /* Buffer is owned by DPDK */
1586                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1587                    rte_pktmbuf_free(packet->buffer);
1588                    packet->buffer = NULL;
1589                } else
1590                /* Buffer is owned by packet i.e. has been malloc'd */
1591                if (packet->buf_control == TRACE_CTRL_PACKET) {
1592                    free(packet->buffer);
1593                    packet->buffer = NULL;
1594                }
1595            }
1596           
1597            packet->buf_control = TRACE_CTRL_EXTERNAL;
1598            packet->type = TRACE_RT_DATA_DPDK;
1599            event.type = TRACE_EVENT_PACKET;
1600            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1601           
1602            /* XXX - Check this passes the filter trace_read_packet normally
1603             * does this for us but this wont */
1604            if (trace->filter) {
1605                if (!trace_apply_filter(trace->filter, packet)) {
1606                    /* Failed the filter so we loop for another packet */
1607                    continue;
1608                }
1609            }
1610        } else {
1611            /* We only want to sleep for a very short time - we are non-blocking */
1612            event.type = TRACE_EVENT_SLEEP;
1613            event.seconds = 0.0001;
1614            event.size = 0;
1615        }
1616       
1617        /* If we get here we have our event */
1618        break;
1619    } while (1);
1620
1621    return event;
1622}
1623
1624
1625static void dpdk_help(void) {
1626    printf("dpdk format module: $Revision: 1752 $\n");
1627    printf("Supported input URIs:\n");
1628    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1629    printf("\tThe -<coreid> is optional \n");
1630    printf("\t e.g. dpdk:0000:01:00.1\n");
1631    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1632    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1633    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1634    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1635    printf("\n");
1636    printf("Supported output URIs:\n");
1637    printf("\tSame format as the input URI.\n");
1638    printf("\t e.g. dpdk:0000:01:00.1\n");
1639    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1640    printf("\n");
1641}
1642
1643static struct libtrace_format_t dpdk = {
1644        "dpdk",
1645        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1646        TRACE_FORMAT_DPDK,
1647        NULL,                   /* probe filename */
1648        NULL,                               /* probe magic */
1649        dpdk_init_input,            /* init_input */
1650        dpdk_config_input,          /* config_input */
1651        dpdk_start_input,           /* start_input */
1652        dpdk_pause_input,           /* pause_input */
1653        dpdk_init_output,           /* init_output */
1654        NULL,                               /* config_output */
1655        dpdk_start_output,          /* start_ouput */
1656        dpdk_fin_input,             /* fin_input */
1657        dpdk_fin_output,        /* fin_output */
1658        dpdk_read_packet,           /* read_packet */
1659        dpdk_prepare_packet,    /* prepare_packet */
1660        dpdk_fin_packet,                                    /* fin_packet */
1661        dpdk_write_packet,          /* write_packet */
1662        dpdk_get_link_type,         /* get_link_type */
1663        dpdk_get_direction,         /* get_direction */
1664        dpdk_set_direction,         /* set_direction */
1665        NULL,                               /* get_erf_timestamp */
1666        dpdk_get_timeval,           /* get_timeval */
1667        dpdk_get_timespec,          /* get_timespec */
1668        NULL,                               /* get_seconds */
1669        NULL,                               /* seek_erf */
1670        NULL,                               /* seek_timeval */
1671        NULL,                               /* seek_seconds */
1672        dpdk_get_capture_length,/* get_capture_length */
1673        dpdk_get_wire_length,   /* get_wire_length */
1674        dpdk_get_framing_length,/* get_framing_length */
1675        dpdk_set_capture_length,/* set_capture_length */
1676        NULL,                               /* get_received_packets */
1677        dpdk_get_filtered_packets,/* get_filtered_packets */
1678        dpdk_get_dropped_packets,/* get_dropped_packets */
1679    dpdk_get_captured_packets,/* get_captured_packets */
1680        NULL,                       /* get_fd */
1681        dpdk_trace_event,               /* trace_event */
1682    dpdk_help,              /* help */
1683    NULL,                   /* next pointer */
1684    {true, 8},              /* Live, NICs typically have 8 threads */
1685    dpdk_pstart_input, /* pstart_input */
1686        dpdk_pread_packet, /* pread_packet */
1687        dpdk_pause_input, /* ppause */
1688        dpdk_fin_input, /* p_fin */
1689        dpdk_pconfig_input /* pconfig_input */
1690};
1691
1692void dpdk_constructor(void) {
1693        register_format(&dpdk);
1694}
Note: See TracBrowser for help on using the repository browser.