source: lib/format_dpdk.c @ 50ce607

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 50ce607 was 50ce607, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Adds per thread storage to for the format to use against libtrace_threads.
Passes threads as arguments to reads to save overhead of looking these up.
Various changes to the DPDK system including registering a thread to allow our threads to be start with different DPDK thread numbers for thread local memory caches.

  • Property mode set to 100644
File size: 66.9 KB
RevLine 
[c04929c]1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Richard Sanger
8 *         
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
29 *
30 */
31
32/* This format module deals with using the Intel Data Plane Development
33 * Kit capture format.
34 *
35 * Intel Data Plane Development Kit is a LIVE capture format.
36 *
37 * This format also supports writing which will write packets out to the
38 * network as a form of packet replay. This should not be confused with the
39 * RT protocol which is intended to transfer captured packet records between
40 * RT-speaking programs.
41 */
42
[50ce607]43#define _GNU_SOURCE
44
[c04929c]45#include "config.h"
46#include "libtrace.h"
47#include "libtrace_int.h"
48#include "format_helper.h"
49#include "libtrace_arphrd.h"
[29bbef0]50#include "hash_toeplitz.h"
[c04929c]51
52#ifdef HAVE_INTTYPES_H
53#  include <inttypes.h>
54#else
55# error "Can't find inttypes.h"
56#endif
57
58#include <stdlib.h>
59#include <assert.h>
60#include <unistd.h>
61#include <endian.h>
62#include <rte_eal.h>
63#include <rte_per_lcore.h>
64#include <rte_debug.h>
65#include <rte_errno.h>
66#include <rte_common.h>
67#include <rte_log.h>
68#include <rte_memcpy.h>
69#include <rte_prefetch.h>
70#include <rte_branch_prediction.h>
71#include <rte_pci.h>
72#include <rte_ether.h>
73#include <rte_ethdev.h>
74#include <rte_ring.h>
75#include <rte_mempool.h>
76#include <rte_mbuf.h>
[29bbef0]77#include <rte_launch.h>
78#include <rte_lcore.h>
79#include <rte_per_lcore.h>
[50ce607]80#include <pthread.h>
[c04929c]81
82/* The default size of memory buffers to use - This is the max size of standard
83 * ethernet packet less the size of the MAC CHECKSUM */
84#define RX_MBUF_SIZE 1514
85
86/* The minimum number of memory buffers per queue tx or rx. Search for
87 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
88 */
89#define MIN_NB_BUF 64
90
91/* Number of receive memory buffers to use
92 * By default this is limited by driver to 4k and must be a multiple of 128.
93 * A modification can be made to the driver to remove this limit.
94 * This can be increased in the driver and here.
95 * Should be at least MIN_NB_BUF.
96 */
97#define NB_RX_MBUF 4096
98
99/* Number of send memory buffers to use.
100 * Same limits apply as those to NB_TX_MBUF.
101 */
102#define NB_TX_MBUF 1024
103
104/* The size of the PCI blacklist needs to be big enough to contain
105 * every PCI device address (listed by lspci every bus:device.function tuple).
106 */
107#define BLACK_LIST_SIZE 50
108
109/* The maximum number of characters the mempool name can be */
110#define MEMPOOL_NAME_LEN 20
111
112#define MBUF(x) ((struct rte_mbuf *) x)
113/* Get the original placement of the packet data */
114#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
115#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
[50ce607]116#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
117
[c04929c]118#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
119                        (uint64_t) tv.tv_usec*1000ull)
120#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
121                        (uint64_t) ts.tv_nsec)
122
123#if RTE_PKTMBUF_HEADROOM != 128
124#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
125         "any libtrace instance processing these packet must be have the" \
126         "same RTE_PKTMBUF_HEADROOM set"
127#endif
128
129/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
130 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
131 *
132 * Make sure you understand what these are doing before enabling them.
133 * They might make traces incompatable with other builds etc.
134 *
135 * These are also included to show how to do somethings which aren't
136 * obvious in the DPDK documentation.
137 */
138
139/* Print verbose messages to stdout */
[29bbef0]140#define DEBUG 1
[c04929c]141
142/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
143 * only turn on if you know clock_gettime is a vsyscall on your system
144 * overwise could be a large overhead. Again gettimeofday() should be
145 * vsyscall also if it's not you should seriously consider updating your
146 * kernel.
147 */
148#ifdef HAVE_LIBRT
[2138553]149/* You can turn this on (set to 1) to prefer clock_gettime */
[c04929c]150#define USE_CLOCK_GETTIME 0
151#else
[2138553]152/* DONT CHANGE THIS !!! */
[c04929c]153#define USE_CLOCK_GETTIME 0
154#endif
155
156/* This is fairly safe to turn on - currently there appears to be a 'bug'
157 * in DPDK that will remove the checksum by making the packet appear 4bytes
158 * smaller than what it really is. Most formats don't include the checksum
159 * hence writing out a port such as int: ring: and dpdk: assumes there
160 * is no checksum and will attempt to write the checksum as part of the
161 * packet
162 */
163#define GET_MAC_CRC_CHECKSUM 0
164
165/* This requires a modification of the pmd drivers (inside Intel DPDK)
166 */
167#define HAS_HW_TIMESTAMPS_82580 0
168
169#if HAS_HW_TIMESTAMPS_82580
170# define TS_NBITS_82580     40
171/* The maximum on the +ve or -ve side that we can be, make it half way */
172# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
173#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
174#endif
175
176/* As per Intel 82580 specification - mismatch in 82580 datasheet
177 * it states ts is stored in Big Endian, however its actually Little */
178struct hw_timestamp_82580 {
179    uint64_t reserved;
180    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
181};
182
183enum paused_state {
184    DPDK_NEVER_STARTED,
185    DPDK_RUNNING,
186    DPDK_PAUSED,
187};
188
[50ce607]189struct dpdk_per_lcore_t
[29bbef0]190{
191        // TODO move time stamp stuff here
192        uint16_t queue_id;
193        uint8_t port;
194};
195
[c04929c]196/* Used by both input and output however some fields are not used
197 * for output */
198struct dpdk_format_data_t {
199    int8_t promisc; /* promiscuous mode - RX only */
200    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
201    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
202    uint8_t paused; /* See paused_state */ 
203    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
204    int snaplen; /* The snap length for the capture - RX only */
205    /* We always have to setup both rx and tx queues even if we don't want them */
206    int nb_rx_buf; /* The number of packet buffers in the rx ring */
207    int nb_tx_buf; /* The number of packet buffers in the tx ring */
208    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
209    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
210    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
211    unsigned int nb_blacklist; /* Number of blacklist items in are valid */
[29bbef0]212    uint8_t rss_key[40]; // This is the RSS KEY
[c04929c]213#if HAS_HW_TIMESTAMPS_82580
214    /* Timestamping only relevent to RX */
215    uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
216    uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
217    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
218#endif
[50ce607]219        // DPDK normally seems to have a limit of 8 queues for a given card
220        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
[c04929c]221};
222
223enum dpdk_addt_hdr_flags {
224    INCLUDES_CHECKSUM = 0x1,
225    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
226};
227
228/**
229 * A structure placed in front of the packet where we can store
230 * additional information about the given packet.
231 * +--------------------------+
232 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
233 * +--------------------------+
234 * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
235 * +--------------------------+
236 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
237 * +--------------------------+
238 * |   sizeof(dpdk_addt_hdr)  | 1 byte
239 * +--------------------------+
240 * *   hw_timestamp_82580     * 16 bytes Optional
241 * +--------------------------+
242 * |       Packet data        | Variable Size
243 * |                          |
244 */
245struct dpdk_addt_hdr {
246    uint64_t timestamp;
247    uint8_t flags;
248    uint8_t direction;
249    uint8_t reserved1;
250    uint8_t reserved2;
251    uint32_t cap_len; /* The size to say the capture is */
252};
253
254/**
255 * We want to blacklist all devices except those on the whitelist
256 * (I say list, but yes it is only the one).
257 *
258 * The default behaviour of rte_pci_probe() will map every possible device
259 * to its DPDK driver. The DPDK driver will take the ethernet device
260 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
261 *
262 * So blacklist all devices except the one that we wish to use so that
263 * the others can still be used as standard ethernet ports.
264 */
265static void blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
266{
267        struct rte_pci_device *dev = NULL;
268        format_data->nb_blacklist = 0;
269
270        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
271
272        TAILQ_FOREACH(dev, &device_list, next) {
273        if (whitelist != NULL && whitelist->domain == dev->addr.domain
274            && whitelist->bus == dev->addr.bus
275            && whitelist->devid == dev->addr.devid
276            && whitelist->function == dev->addr.function)
277            continue;
278                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
279                                / sizeof (format_data->blacklist[0])) {
280                        printf("Warning: too many devices to blacklist consider"
281                                        " increasing BLACK_LIST_SIZE");
282                        break;
283                }
284                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
285                ++format_data->nb_blacklist;
286        }
287
288        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
289}
290
291/**
292 * Parse the URI format as a pci address
293 * Fills in addr, note core is optional and is unchanged if
294 * a value for it is not provided.
295 *
296 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
297 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
298 */
299static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
300    char * wrkstr;
301    char * pch;
302    assert(str);
303    wrkstr = strdup(str);
304   
305    pch = strtok(wrkstr,":");
306    if (pch == NULL || pch[0] == 0) {
307        free(wrkstr); return -1;
308    }
309    addr->domain = (uint16_t) atoi(pch);
310
311    pch = strtok(NULL,":");
312    if (pch == NULL || pch[0] == 0) {
313        free(wrkstr); return -1;
314    }
315    addr->bus = (uint8_t) atoi(pch);
316
317    pch = strtok(NULL,".");
318    if (pch == NULL || pch[0] == 0) {
319        free(wrkstr); return -1;
320    }
321    addr->devid = (uint8_t) atoi(pch);
322
323    pch = strtok(NULL,"-"); /* Might not find the '-' it's optional */
324    if (pch == NULL || pch[0] == 0) {
325        free(wrkstr); return -1;
326    }
327    addr->function = (uint8_t) atoi(pch);
328
329    pch = strtok(NULL, ""); /* Find end of string */
330   
331    if (pch != NULL && pch[0] != 0) {
332        *core = (long) atoi(pch);
333    }
334
335    free(wrkstr);
336    return 0;
337}
338
339#if DEBUG
340/* For debugging */
341static inline void dump_configuration()
342{
343    struct rte_config * global_config;
344    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
345   
346    if (nb_cpu <= 0) {
347        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
348        nb_cpu = 1; /* fallback to just 1 core */
349    }
350    if (nb_cpu > RTE_MAX_LCORE)
351        nb_cpu = RTE_MAX_LCORE;
352   
353    global_config = rte_eal_get_configuration();
354   
355    if (global_config != NULL) {
356        int i;
[09dc6dc]357        fprintf(stderr, "Intel DPDK setup\n"
[c04929c]358               "---Version      : %"PRIu32"\n"
359               "---Magic        : %"PRIu32"\n"
360               "---Master LCore : %"PRIu32"\n"
361               "---LCore Count  : %"PRIu32"\n",
362               global_config->version, global_config->magic, 
363               global_config->master_lcore, global_config->lcore_count);
364       
365        for (i = 0 ; i < nb_cpu; i++) {
[09dc6dc]366            fprintf(stderr, "   ---Core %d : %s\n", i, 
[c04929c]367                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
368        }
369       
370        const char * proc_type;
371        switch (global_config->process_type) {
372            case RTE_PROC_AUTO:
373                proc_type = "auto";
374                break;
375            case RTE_PROC_PRIMARY:
376                proc_type = "primary";
377                break;
378            case RTE_PROC_SECONDARY:
379                proc_type = "secondary";
380                break;
381            case RTE_PROC_INVALID:
382                proc_type = "invalid";
383                break;
384            default:
385                proc_type = "something worse than invalid!!";
386        }
[09dc6dc]387        fprintf(stderr, "---Process Type : %s\n", proc_type);
[c04929c]388    }
389   
390}
391#endif
392
[50ce607]393/**
394 * Expects to be called from the master lcore and moves it to the given dpdk id
395 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
396 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
397 *               and not already in use.
398 * @return 0 is successful otherwise -1 on error.
399 */
400static inline int dpdk_move_master_lcore(size_t core) {
401    struct rte_config *cfg = rte_eal_get_configuration();
402    cpu_set_t cpuset;
403    int i;
404
405    assert (core < RTE_MAX_LCORE);
406    assert (rte_get_master_lcore() == rte_lcore_id());
407
408    if (core == rte_lcore_id())
409        return 0;
410
411    // Make sure we are not overwriting someone else
412    assert(!rte_lcore_is_enabled(core));
413
414    // Move the core
415    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
416    cfg->lcore_role[core] = ROLE_RTE;
417    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
418    rte_eal_get_configuration()->master_lcore = core;
419    RTE_PER_LCORE(_lcore_id) = core;
420
421    // Now change the affinity
422    CPU_ZERO(&cpuset);
423
424    if (lcore_config[core].detected) {
425        CPU_SET(core, &cpuset);
426    } else {
427        for (i = 0; i < RTE_MAX_LCORE; ++i) {
428            if (lcore_config[i].detected)
429                CPU_SET(i, &cpuset);
430        }
431    }
432
433    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
434    if (i != 0) {
435        // TODO proper libtrace style error here!!
436        fprintf(stderr, "pthread_setaffinity_np failed\n");
437        return -1;
438    }
439    return 0;
440}
441
442
[c04929c]443static inline int dpdk_init_enviroment(char * uridata, struct dpdk_format_data_t * format_data,
444                                        char * err, int errlen) {
445    int ret; /* Returned error codes */
446    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
447    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
[9b69b49]448    char mem_map[20] = {0}; /* The memory name */
[c04929c]449    long nb_cpu; /* The number of CPUs in the system */
450    long my_cpu; /* The CPU number we want to bind to */
[50ce607]451    int i;
452    struct rte_config *cfg = rte_eal_get_configuration();
[c04929c]453   
454#if DEBUG
455    rte_set_log_level(RTE_LOG_DEBUG);
456#else
457    rte_set_log_level(RTE_LOG_WARNING);
458#endif
459    /* Using proc-type auto allows this to be either primary or secondary
[50ce607]460     * Secondary allows two instances of libtrace to be used on different
[c04929c]461     * ports. However current version of DPDK doesn't support this on the
[50ce607]462     * same card (My understanding is this should work with two separate
[c04929c]463     * cards).
[9b69b49]464     *
465     * Using unique file prefixes mean separate memory is used, unlinking
466     * the two processes. However be careful we still cannot access a
467     * port that already in use.
[c04929c]468     */
[9b69b49]469    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
470                "--file-prefix", mem_map, "-m", "256", NULL};
[c04929c]471    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
472   
[50ce607]473    /* This initialises the Environment Abstraction Layer (EAL)
[c04929c]474     * If we had slave workers these are put into WAITING state
475     *
476     * Basically binds this thread to a fixed core, which we choose as
477     * the last core on the machine (assuming fewer interrupts mapped here).
[29bbef0]478     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
[c04929c]479     * "-n" the number of memory channels into the CPU (hardware specific)
480     *      - Most likely to be half the number of ram slots in your machine.
481     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
482     * Controls where in memory packets are stored and should spread across
483     * the channels. We just use 1 to be safe.
484     */
485
486    /* Get the number of cpu cores in the system and use the last core */
487    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
488    if (nb_cpu <= 0) {
489        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
490        nb_cpu = 1; /* fallback to the first core */
491    }
492    if (nb_cpu > RTE_MAX_LCORE)
493        nb_cpu = RTE_MAX_LCORE;
494
495    my_cpu = nb_cpu;
496    /* This allows the user to specify the core - we would try to do this
497     * automatically but it's hard to tell that this is secondary
498     * before running rte_eal_init(...). Currently we are limited to 1
499     * instance per core due to the way memory is allocated. */
500    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
501        snprintf(err, errlen, "Failed to parse URI");
502        return -1;
503    }
504
505    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
506                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
507
508    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
509        snprintf(err, errlen, 
510          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
511          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
512        return -1;
513    }
514
[50ce607]515    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
516       info older versions */
517    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
518    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
[c04929c]519
[9b69b49]520        /* Give this a name */
521        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
[c04929c]522    /* rte_eal_init it makes a call to getopt so we need to reset the
523     * global optind variable of getopt otherwise this fails */
524    optind = 1;
525    if ((ret = rte_eal_init(argc, argv)) < 0) {
526        snprintf(err, errlen, 
527          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
528        return -1;
529    }
[50ce607]530
531    // These are still running but will never do anything with DPDK v1.7 we
532    // should remove this XXX in the future
533    for(i = 0; i < RTE_MAX_LCORE; ++i) {
534        if (rte_lcore_is_enabled(i) && i != rte_get_master_lcore()) {
535            cfg->lcore_role[i] = ROLE_OFF;
536            cfg->lcore_count--;
537        }
538    }
539    // Only the master should be running
540    assert(cfg->lcore_count == 1);
541
542    dpdk_move_master_lcore(my_cpu-1);
543
[c04929c]544#if DEBUG
545    dump_configuration();
546#endif
547    /* This registers all available NICs with Intel DPDK
548     * These are not loaded until rte_eal_pci_probe() is called.
549     */
550    if ((ret = rte_pmd_init_all()) < 0) {
551        snprintf(err, errlen, 
552          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
553        return -1;
554    }
555
556    /* Black list all ports besides the one that we want to use */
557    blacklist_devices(format_data, &use_addr);
558
559    /* This loads DPDK drivers against all ports that are not blacklisted */
560        if ((ret = rte_eal_pci_probe()) < 0) {
561        snprintf(err, errlen, 
562            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
563        return -1;
564    }
565
566    format_data->nb_ports = rte_eth_dev_count();
567
568    if (format_data->nb_ports != 1) {
569        snprintf(err, errlen, 
570            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
571            format_data->nb_ports);
572        return -1;
573    }
[29bbef0]574   
575    struct rte_eth_dev_info dev_info;
576    rte_eth_dev_info_get(0, &dev_info);
577    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d", 
578                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
[c04929c]579
580    return 0;
581}
582
583static int dpdk_init_input (libtrace_t *libtrace) {
584    char err[500];
585    err[0] = 0;
586   
587    libtrace->format_data = (struct dpdk_format_data_t *)
588                            malloc(sizeof(struct dpdk_format_data_t));
589    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
590    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
591    FORMAT(libtrace)->nb_ports = 0;
592    FORMAT(libtrace)->snaplen = 0; /* Use default */
593    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
594    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
595    FORMAT(libtrace)->promisc = -1;
596    FORMAT(libtrace)->pktmbuf_pool = NULL;
597    FORMAT(libtrace)->nb_blacklist = 0;
598    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
599    FORMAT(libtrace)->mempool_name[0] = 0;
600#if HAS_HW_TIMESTAMPS_82580
601    FORMAT(libtrace)->ts_first_sys = 0;
602    FORMAT(libtrace)->ts_last_sys = 0;
603    FORMAT(libtrace)->wrap_count = 0;
604#endif
[29bbef0]605       
[c04929c]606    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
607        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
608        free(libtrace->format_data);
609        libtrace->format_data = NULL;
610        return -1;
611    }
612    return 0;
613};
614
615static int dpdk_init_output(libtrace_out_t *libtrace)
616{
617    char err[500];
618    err[0] = 0;
619   
620    libtrace->format_data = (struct dpdk_format_data_t *)
621                            malloc(sizeof(struct dpdk_format_data_t));
622    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
623    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
624    FORMAT(libtrace)->nb_ports = 0;
625    FORMAT(libtrace)->snaplen = 0; /* Use default */
626    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
627    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
628    FORMAT(libtrace)->promisc = -1;
629    FORMAT(libtrace)->pktmbuf_pool = NULL;
630    FORMAT(libtrace)->nb_blacklist = 0;
631    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
632    FORMAT(libtrace)->mempool_name[0] = 0;
633#if HAS_HW_TIMESTAMPS_82580
634    FORMAT(libtrace)->ts_first_sys = 0;
635    FORMAT(libtrace)->ts_last_sys = 0;
636    FORMAT(libtrace)->wrap_count = 0;
637#endif
638
639    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
640        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
641        free(libtrace->format_data);
642        libtrace->format_data = NULL;
643        return -1;
644    }
645    return 0;
646};
647
[29bbef0]648static int dpdk_pconfig_input (libtrace_t *libtrace,
649                                trace_parallel_option_t option,
650                                void *data) {
651        switch (option) {
652                case TRACE_OPTION_SET_HASHER:
653                        switch (*((enum hasher_types *) data))
654                        {
655                                case HASHER_BALANCE:
656                                case HASHER_UNIDIRECTIONAL:
657                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
658                                        return 0;
659                                case HASHER_BIDIRECTIONAL:
660                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
661                                        return 0;
662                                case HASHER_HARDWARE:
663                                case HASHER_CUSTOM:
664                                        // We don't support these
665                                        return -1;
666                        }
[50ce607]667        break;
[29bbef0]668        }
669        return -1;
670}
[c04929c]671/**
672 * Note here snaplen excludes the MAC checksum. Packets over
673 * the requested snaplen will be dropped. (Excluding MAC checksum)
674 *
675 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
676 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
677 * is set the maximum size of the returned packet would be 1518 otherwise
678 * 1514 would be the largest size possibly returned.
679 *
680 */
681static int dpdk_config_input (libtrace_t *libtrace,
682                                        trace_option_t option,
683                                        void *data) {
684    switch (option) {
685        case TRACE_OPTION_SNAPLEN:
686            /* Only support changing snaplen before a call to start is
687             * made */
688            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
689                FORMAT(libtrace)->snaplen=*(int*)data;
690            else
691                return -1;
692            return 0;
693                case TRACE_OPTION_PROMISC:
694                        FORMAT(libtrace)->promisc=*(int*)data;
695            return 0;
696        case TRACE_OPTION_FILTER:
697            /* TODO filtering */
698            break;
699        case TRACE_OPTION_META_FREQ:
700            break;
701        case TRACE_OPTION_EVENT_REALTIME:
702            break;
703        /* Avoid default: so that future options will cause a warning
704         * here to remind us to implement it, or flag it as
705         * unimplementable
706         */
707    }
708
709        /* Don't set an error - trace_config will try to deal with the
710         * option and will set an error if it fails */
711    return -1;
712}
713
714/* Can set jumbo frames/ or limit the size of a frame by setting both
715 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
716 *
717 */
718static struct rte_eth_conf port_conf = {
719        .rxmode = {
[29bbef0]720                .mq_mode = ETH_RSS,
[c04929c]721                .split_hdr_size = 0,
722                .header_split   = 0, /**< Header Split disabled */
723                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
724                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
725                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
726        .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
727#if GET_MAC_CRC_CHECKSUM
728/* So it appears that if hw_strip_crc is turned off the driver will still
729 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
730 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
731 * So lets just add it back on when we receive the packet.
732 */
733                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
734#else
735/* By default strip the MAC checksum because it's a bit of a hack to
736 * actually read these. And don't want to rely on disabling this to actualy
737 * always cut off the checksum in the future
738 */
739        .hw_strip_crc   = 1, /**< CRC stripped by hardware */
740#endif
741        },
742        .txmode = {
743                .mq_mode = ETH_DCB_NONE,
744        },
[29bbef0]745        .rx_adv_conf = {
746                .rss_conf = {
747                        // .rss_key = &rss_key, // We set this per format
748                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
749                },
750        },
[c04929c]751};
752
753static const struct rte_eth_rxconf rx_conf = {
754        .rx_thresh = {
755                .pthresh = 8,/* RX_PTHRESH prefetch */
756                .hthresh = 8,/* RX_HTHRESH host */
757                .wthresh = 4,/* RX_WTHRESH writeback */
758        },
759    .rx_free_thresh = 0,
760    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
761};
762
763static const struct rte_eth_txconf tx_conf = {
764        .tx_thresh = {
[0c866b0]765        /**
766         * TX_PTHRESH prefetch
767         * Set on the NIC, if the number of unprocessed descriptors to queued on
768         * the card fall below this try grab at least hthresh more unprocessed
769         * descriptors.
770         */
771                .pthresh = 36,
772
773        /* TX_HTHRESH host
774         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
775         */
776                .hthresh = 0,
777       
778        /* TX_WTHRESH writeback
779         * Set on the NIC, the number of sent descriptors before writing back
780         * status to confirm the transmission. This is done more efficiently as
781         * a bulk DMA-transfer rather than writing one at a time.
782         * Similar to tx_free_thresh however this is applied to the NIC, where
783         * as tx_free_thresh is when DPDK will check these. This is extended
784         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
785         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
786         */
787                .wthresh = 4,
[c04929c]788        },
[0c866b0]789
790    /* Used internally by DPDK rather than passed to the NIC. The number of
791     * packet descriptors to send before checking for any responses written
792     * back (to confirm the transmission). Default = 32 if set to 0)
793     */
794        .tx_free_thresh = 0,
795
796    /* This is the Report Status threshold, used by 10Gbit cards,
797     * This signals the card to only write back status (such as
798     * transmission successful) after this minimum number of transmit
799     * descriptors are seen. The default is 32 (if set to 0) however if set
800     * to greater than 1 TX wthresh must be set to zero, because this is kindof
801     * a replacement. See the dpdk programmers guide for more restrictions.
802     */
803        .tx_rs_thresh = 1,
[c04929c]804};
805
806/* Attach memory to the port and start the port or restart the port.
807 */
808static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
809    int ret; /* Check return values for errors */
810    struct rte_eth_link link_info; /* Wait for link */
811   
812    /* Already started */
813    if (format_data->paused == DPDK_RUNNING)
814        return 0;
815
816    /* First time started we need to alloc our memory, doing this here
817     * rather than in enviroment setup because we don't have snaplen then */
818    if (format_data->paused == DPDK_NEVER_STARTED) {
819        if (format_data->snaplen == 0) {
820            format_data->snaplen = RX_MBUF_SIZE;
821            port_conf.rxmode.jumbo_frame = 0;
822            port_conf.rxmode.max_rx_pkt_len = 0;
823        } else {
824            /* Use jumbo frames */
825            port_conf.rxmode.jumbo_frame = 1;
826            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
827        }
828
829        /* This is additional overhead so make sure we allow space for this */
830#if GET_MAC_CRC_CHECKSUM
831        format_data->snaplen += ETHER_CRC_LEN;
832#endif
833#if HAS_HW_TIMESTAMPS_82580
834        format_data->snaplen += sizeof(struct hw_timestamp_82580);
835#endif
836
837        /* Create the mbuf pool, which is the place our packets are allocated
838         * from - TODO figure out if there is is a free function (I cannot see one)
839         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
840         * allocate however that extra 1 packet is not used.
841         * (I assume <= vs < error some where in DPDK code)
842         * TX requires nb_tx_buffers + 1 in the case the queue is full
843         * so that will fill the new buffer and wait until slots in the
844         * ring become available.
845         */
846#if DEBUG
[09dc6dc]847    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
[c04929c]848#endif
849        format_data->pktmbuf_pool =
850            rte_mempool_create(format_data->mempool_name,
851                       format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
852                       format_data->snaplen + sizeof(struct rte_mbuf) 
853                                        + RTE_PKTMBUF_HEADROOM,
854                       8, sizeof(struct rte_pktmbuf_pool_private),
855                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
856                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
857
858        if (format_data->pktmbuf_pool == NULL) {
859            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
860                        "pool failed: %s", strerror(rte_errno));
861            return -1;
862        }
863    }
864   
865    /* ----------- Now do the setup for the port mapping ------------ */
866    /* Order of calls must be
867     * rte_eth_dev_configure()
868     * rte_eth_tx_queue_setup()
869     * rte_eth_rx_queue_setup()
870     * rte_eth_dev_start()
871     * other rte_eth calls
872     */
873   
[29bbef0]874   
875    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
876   
[c04929c]877    /* This must be called first before another *eth* function
878     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
879    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
880    if (ret < 0) {
881        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
882                            " %"PRIu8" : %s", format_data->port,
883                            strerror(-ret));
884        return -1;
885    }
[0c866b0]886    /* Initialise the TX queue a minimum value if using this port for
[c04929c]887     * receiving. Otherwise a larger size if writing packets.
888     */
889    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
890                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
891    if (ret < 0) {
892        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
893                            " %"PRIu8" : %s", format_data->port,
894                            strerror(-ret));
895        return -1;
896    }
[0c866b0]897    /* Initialise the RX queue with some packets from memory */
[c04929c]898    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
899                            format_data->nb_rx_buf, SOCKET_ID_ANY, 
900                            &rx_conf, format_data->pktmbuf_pool);
901    if (ret < 0) {
902        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
903                    " %"PRIu8" : %s", format_data->port,
904                    strerror(-ret));
905        return -1;
906    }
907   
908    /* Start device */
909    ret = rte_eth_dev_start(format_data->port);
910    if (ret < 0) {
911        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
912                    strerror(-ret));
913        return -1;
914    }
915
916    /* Default promiscuous to on */
917    if (format_data->promisc == -1)
918        format_data->promisc = 1;
919   
920    if (format_data->promisc == 1)
921        rte_eth_promiscuous_enable(format_data->port);
922    else
923        rte_eth_promiscuous_disable(format_data->port);
924   
925    /* Wait for the link to come up */
926    rte_eth_link_get(format_data->port, &link_info);
927#if DEBUG
[09dc6dc]928    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
[c04929c]929            (int) link_info.link_duplex, (int) link_info.link_speed);
930#endif
931
[0c866b0]932    /* We have now successfully started/unpaused */
[c04929c]933    format_data->paused = DPDK_RUNNING;
934   
935    return 0;
936}
[29bbef0]937
[50ce607]938/* Attach memory to the port and start (or restart) the port/s.
[29bbef0]939 */
[50ce607]940static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues){
[29bbef0]941    int ret, i; /* Check return values for errors */
942    struct rte_eth_link link_info; /* Wait for link */
943   
944    /* Already started */
945    if (format_data->paused == DPDK_RUNNING)
946        return 0;
947
948    /* First time started we need to alloc our memory, doing this here
[0c866b0]949     * rather than in environment setup because we don't have snaplen then */
[29bbef0]950    if (format_data->paused == DPDK_NEVER_STARTED) {
951        if (format_data->snaplen == 0) {
952            format_data->snaplen = RX_MBUF_SIZE;
953            port_conf.rxmode.jumbo_frame = 0;
954            port_conf.rxmode.max_rx_pkt_len = 0;
955        } else {
956            /* Use jumbo frames */
957            port_conf.rxmode.jumbo_frame = 1;
958            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
959        }
960
961        /* This is additional overhead so make sure we allow space for this */
962#if GET_MAC_CRC_CHECKSUM
963        format_data->snaplen += ETHER_CRC_LEN;
964#endif
965#if HAS_HW_TIMESTAMPS_82580
966        format_data->snaplen += sizeof(struct hw_timestamp_82580);
967#endif
968
969        /* Create the mbuf pool, which is the place our packets are allocated
[50ce607]970         * from - TODO figure out if there is is a free function (I cannot see one)
[29bbef0]971         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
972         * allocate however that extra 1 packet is not used.
973         * (I assume <= vs < error some where in DPDK code)
974         * TX requires nb_tx_buffers + 1 in the case the queue is full
975         * so that will fill the new buffer and wait until slots in the
976         * ring become available.
977         */
978#if DEBUG
979    printf("Creating mempool named %s\n", format_data->mempool_name);
980#endif
981        format_data->pktmbuf_pool =
982            rte_mempool_create(format_data->mempool_name,
983                       format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1,
984                       format_data->snaplen + sizeof(struct rte_mbuf) 
985                                        + RTE_PKTMBUF_HEADROOM,
986                       8, sizeof(struct rte_pktmbuf_pool_private),
987                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
988                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
989
990        if (format_data->pktmbuf_pool == NULL) {
991            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
992                        "pool failed: %s", strerror(rte_errno));
993            return -1;
994        }
995    }
996   
997    /* ----------- Now do the setup for the port mapping ------------ */
998    /* Order of calls must be
999     * rte_eth_dev_configure()
1000     * rte_eth_tx_queue_setup()
1001     * rte_eth_rx_queue_setup()
1002     * rte_eth_dev_start()
1003     * other rte_eth calls
1004     */
1005   
1006    /* This must be called first before another *eth* function
1007     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1008    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1009    if (ret < 0) {
1010        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1011                            " %"PRIu8" : %s", format_data->port,
1012                            strerror(-ret));
1013        return -1;
1014    }
1015#if DEBUG
1016    printf("Doing dev configure\n");
1017#endif
[0c866b0]1018    /* Initialise the TX queue a minimum value if using this port for
[29bbef0]1019     * receiving. Otherwise a larger size if writing packets.
1020     */
1021    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1022                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1023    if (ret < 0) {
1024        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1025                            " %"PRIu8" : %s", format_data->port,
1026                            strerror(-ret));
1027        return -1;
1028    }
1029   
1030    for (i=0; i < rx_queues; i++) {
1031#if DEBUG
1032    printf("Doing queue configure\n");
1033#endif 
[0c866b0]1034                /* Initialise the RX queue with some packets from memory */
[29bbef0]1035                ret = rte_eth_rx_queue_setup(format_data->port, i,
1036                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
1037                                                                &rx_conf, format_data->pktmbuf_pool);
[50ce607]1038        /* Init per_thread data structures */
1039        format_data->per_lcore[i].port = format_data->port;
1040        format_data->per_lcore[i].queue_id = i;
1041
[29bbef0]1042                if (ret < 0) {
1043                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1044                                                " %"PRIu8" : %s", format_data->port,
1045                                                strerror(-ret));
1046                        return -1;
1047                }
1048        }
1049   
1050#if DEBUG
[50ce607]1051    fprintf(stderr, "Doing start device\n");
[29bbef0]1052#endif 
1053    /* Start device */
1054    ret = rte_eth_dev_start(format_data->port);
1055#if DEBUG
[09dc6dc]1056    fprintf(stderr, "Done start device\n");
[29bbef0]1057#endif 
1058    if (ret < 0) {
1059        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1060                    strerror(-ret));
1061        return -1;
1062    }
1063
1064
1065    /* Default promiscuous to on */
1066    if (format_data->promisc == -1)
1067        format_data->promisc = 1;
1068   
1069    if (format_data->promisc == 1)
1070        rte_eth_promiscuous_enable(format_data->port);
1071    else
1072        rte_eth_promiscuous_disable(format_data->port);
1073   
1074   
1075    /* We have now successfully started/unpased */
1076    format_data->paused = DPDK_RUNNING;
1077   
1078    // Can use remote launch for all
1079    /*RTE_LCORE_FOREACH_SLAVE(i) {
[17a3dff]1080                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
[29bbef0]1081        }*/
1082   
1083    /* Wait for the link to come up */
1084    rte_eth_link_get(format_data->port, &link_info);
1085#if DEBUG
[09dc6dc]1086    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
[29bbef0]1087            (int) link_info.link_duplex, (int) link_info.link_speed);
1088#endif
1089
1090    return 0;
1091}
[c04929c]1092
1093static int dpdk_start_input (libtrace_t *libtrace) {
1094    char err[500];
1095    err[0] = 0;
1096
1097    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1098        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1099        free(libtrace->format_data);
1100        libtrace->format_data = NULL;
1101        return -1;
1102    }
1103    return 0;
1104}
1105
[50ce607]1106static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
[8af0d01]1107    struct rte_eth_dev_info dev_info;
[50ce607]1108    rte_eth_dev_info_get(port_id, &dev_info);
[8af0d01]1109    return dev_info.max_rx_queues;
1110}
1111
[50ce607]1112static inline size_t dpdk_processor_count () {
1113    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1114    if (nb_cpu <= 0)
1115        return 1;
1116    else
1117        return (size_t) nb_cpu;
1118}
1119
[29bbef0]1120static int dpdk_pstart_input (libtrace_t *libtrace) {
1121    char err[500];
[50ce607]1122    int i=0, phys_cores=0;
[17a3dff]1123    int tot = libtrace->perpkt_thread_count;
[29bbef0]1124    err[0] = 0;
[50ce607]1125
1126    if (rte_lcore_id() != rte_get_master_lcore())
1127        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
1128
1129    // If the master is not on the last thread we move it there
1130    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1131        // Consider error handling here
1132        dpdk_move_master_lcore(RTE_MAX_LCORE - 1) == -1;
1133    }
1134
1135    // Don't exceed the number of cores in the system/detected by dpdk
1136    // We don't have to force this but performance wont be good if we don't
1137    for (i = 0; i < RTE_MAX_LCORE; ++i) {
1138        if (lcore_config[i].detected) {
1139            if (rte_lcore_is_enabled(i))
1140                fprintf(stderr, "Found core %d already in use!\n", i);
1141            else
1142                phys_cores++;
1143        }
1144    }
1145
1146        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1147    tot = MIN(tot, phys_cores);
1148
1149        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
[29bbef0]1150       
1151    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1152        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1153        free(libtrace->format_data);
1154        libtrace->format_data = NULL;
1155        return -1;
1156    }
[50ce607]1157
1158    // Make sure we only start the number that we should
1159    libtrace->perpkt_thread_count = tot;
1160    return 0;
1161}
1162
1163
1164/**
1165 * Register a thread with the DPDK system,
1166 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1167 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1168 * gives it.
1169 *
1170 * We then allow a mapper thread to be started on every real core as DPDK would
1171 * we also bind these to the corresponding CPU cores.
1172 *
1173 * @param libtrace A pointer to the trace
1174 * @param reading True if the thread will be used to read packets, i.e. will
1175 *                call pread_packet(), false if thread used to process packet
1176 *                in any other manner including statistics functions.
1177 */
1178static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1179{
1180    struct rte_config *cfg = rte_eal_get_configuration();
1181    int i;
1182    int new_id = -1;
1183
1184    // If 'reading packets' fill in cores from 0 up and bind affinity
1185    // otherwise start from the MAX core (which is also the master) and work backwards
1186    // in this case physical cores on the system will not exist so we don't bind
1187    // these to any particular physical core
1188    if (reading) {
1189        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1190            if (!rte_lcore_is_enabled(i)) {
1191                new_id = i;
1192                if (!lcore_config[i].detected)
1193                    fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
1194                break;
1195            }
1196        }
1197    } else {
1198        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1199            if (!rte_lcore_is_enabled(i)) {
1200                new_id = i;
1201                break;
1202            }
1203        }
1204    }
1205
1206    if (new_id == -1) {
1207        assert(cfg->lcore_count == RTE_MAX_LCORE);
1208        // TODO proper libtrace style error here!!
1209        fprintf(stderr, "Too many threads for DPDK!!\n");
1210        return -1;
1211    }
1212
1213    // Enable the core in global DPDK structs
1214    cfg->lcore_role[new_id] = ROLE_RTE;
1215    cfg->lcore_count++;
1216    // Set TLS to reflect our new number
1217    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
1218    fprintf(stderr, "original id%d", rte_lcore_id());
1219    RTE_PER_LCORE(_lcore_id) = new_id;
1220    fprintf(stderr, " new id%d\n", rte_lcore_id());
1221
1222    if (reading) {
1223        // Set affinity bind to corresponding core
1224        cpu_set_t cpuset;
1225        CPU_ZERO(&cpuset);
1226        CPU_SET(rte_lcore_id(), &cpuset);
1227        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1228        if (i != 0) {
1229            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
1230            return -1;
1231        }
1232    }
1233
1234    // Map our TLS to the thread data
1235    if (reading) {
1236        if(t->type == THREAD_PERPKT) {
1237            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
1238        } else {
1239            t->format_data = &FORMAT(libtrace)->per_lcore[0];
1240        }
1241    }
1242}
1243
1244
1245/**
1246 * Unregister a thread with the DPDK system.
1247 *
1248 * Only previously registered threads should be calling this just before
1249 * they are destroyed.
1250 */
1251static int dpdk_punregister_thread(libtrace_t libtrace, libtrace_thread_t *t UNUSED)
1252{
1253    struct rte_config *cfg = rte_eal_get_configuration();
1254
1255    assert(rte_lcore_id() >= 0 && rte_lcore_id() < RTE_MAX_LCORE);
1256
1257    // Skip if master!!
1258    if (rte_lcore_id() == rte_get_master_lcore()) {
1259        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1260        return 0;
1261    }
1262
1263    // Disable this core in global DPDK structs
1264    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1265    cfg->lcore_count--;
1266    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1267    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
[29bbef0]1268    return 0;
1269}
1270
[c04929c]1271static int dpdk_start_output(libtrace_out_t *libtrace)
1272{
1273    char err[500];
1274    err[0] = 0;
1275   
1276    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1277        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1278        free(libtrace->format_data);
1279        libtrace->format_data = NULL;
1280        return -1;
1281    }
1282    return 0;
1283}
1284
1285static int dpdk_pause_input(libtrace_t * libtrace){
1286    /* This stops the device, but can be restarted using rte_eth_dev_start() */
1287    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1288#if DEBUG     
[09dc6dc]1289        fprintf(stderr, "Pausing port\n");
[c04929c]1290#endif
1291        rte_eth_dev_stop(FORMAT(libtrace)->port);
1292        FORMAT(libtrace)->paused = DPDK_PAUSED;
1293        /* If we pause it the driver will be reset and likely our counter */
1294#if HAS_HW_TIMESTAMPS_82580
1295        FORMAT(libtrace)->ts_first_sys = 0;
1296        FORMAT(libtrace)->ts_last_sys = 0;
1297#endif
1298    }
1299    return 0;
1300}
1301
1302static int dpdk_write_packet(libtrace_out_t *trace, 
1303                libtrace_packet_t *packet){
1304    struct rte_mbuf* m_buff[1];
1305   
1306    int wirelen = trace_get_wire_length(packet);
1307    int caplen = trace_get_capture_length(packet);
1308   
1309    /* Check for a checksum and remove it */
1310    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1311                                            wirelen == caplen)
1312        caplen -= ETHER_CRC_LEN;
1313
1314    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1315    if (m_buff[0] == NULL) {
1316        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1317        return -1;
1318    } else {
1319        int ret;
1320        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1321        do {
1322            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1323        } while (ret != 1);
1324    }
1325
1326    return 0;
1327}
1328
1329static int dpdk_fin_input(libtrace_t * libtrace) {
1330    /* Free our memory structures */
1331    if (libtrace->format_data != NULL) {
1332        /* Close the device completely, device cannot be restarted */
1333        if (FORMAT(libtrace)->port != 0xFF)
1334            rte_eth_dev_close(FORMAT(libtrace)->port);
1335        /* filter here if we used it */
1336                free(libtrace->format_data);
1337        }
1338
1339    /* Revert to the original PCI drivers */
[2138553]1340    /* No longer in DPDK
1341    rte_eal_pci_exit(); */
[c04929c]1342    return 0;
1343}
1344
1345
1346static int dpdk_fin_output(libtrace_out_t * libtrace) {
1347    /* Free our memory structures */
1348    if (libtrace->format_data != NULL) {
1349        /* Close the device completely, device cannot be restarted */
1350        if (FORMAT(libtrace)->port != 0xFF)
1351            rte_eth_dev_close(FORMAT(libtrace)->port);
1352        /* filter here if we used it */
1353                free(libtrace->format_data);
1354        }
1355
1356    /* Revert to the original PCI drivers */
[2138553]1357    /* No longer in DPDK
1358    rte_eal_pci_exit(); */
[c04929c]1359    return 0;
1360}
1361
1362/**
1363 * Get the start of additional header that we added to a packet.
1364 */
1365static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1366    uint8_t *hdrsize;
1367    assert(packet);
1368    assert(packet->buffer);
1369    hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
1370    /* The byte before the original packet data denotes the size in bytes
1371     * of our additional header that we added sits before the 'size byte' */
1372    hdrsize--;
1373    return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
1374}
1375
1376static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1377    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1378    return hdr->cap_len;
1379}
1380
1381static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1382    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1383    if (size > hdr->cap_len) {
1384        /* Cannot make a packet bigger */
1385                return trace_get_capture_length(packet);
1386        }
1387
1388    /* Reset the cached capture length first*/
1389    packet->capture_length = -1;
1390    hdr->cap_len = (uint32_t) size;
1391        return trace_get_capture_length(packet);
1392}
1393
1394static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1395    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1396    int org_cap_size; /* The original capture size */
1397    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1398        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1399                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
1400                            sizeof(struct hw_timestamp_82580);
1401    } else {
1402        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1403                            (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
1404    }
1405    if (hdr->flags & INCLUDES_CHECKSUM) {
1406        return org_cap_size;
1407    } else {
1408        /* DPDK packets are always TRACE_TYPE_ETH packets */
1409        return org_cap_size + ETHER_CRC_LEN;
1410    }
1411}
1412static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1413    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1414    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1415        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1416                sizeof(struct hw_timestamp_82580);
1417    else
1418        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1419}
1420
1421static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1422                libtrace_packet_t *packet, void *buffer,
1423                libtrace_rt_types_t rt_type, uint32_t flags) {
1424    assert(packet);
1425    if (packet->buffer != buffer &&
1426        packet->buf_control == TRACE_CTRL_PACKET) {
1427        free(packet->buffer);
1428    }
1429
1430    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1431        packet->buf_control = TRACE_CTRL_PACKET;
1432    } else
1433        packet->buf_control = TRACE_CTRL_EXTERNAL;
1434
1435    packet->buffer = buffer;
1436    packet->header = buffer;
1437
1438    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1439    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1440    packet->type = rt_type;
1441    return 0;
1442}
1443
1444/*
1445 * Does any extra preperation to a captured packet.
1446 * This includes adding our extra header to it with the timestamp
1447 */
1448static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
1449                                                        struct rte_mbuf* pkt){
1450    uint8_t * hdr_size;
1451    struct dpdk_addt_hdr *hdr;
1452#if HAS_HW_TIMESTAMPS_82580
1453    struct hw_timestamp_82580 *hw_ts;
1454    struct timeval cur_sys_time;
1455    uint64_t cur_sys_time_ns;
1456    uint64_t estimated_wraps;
1457   
1458    /* Using gettimeofday because it's most likely to be a vsyscall
1459     * We don't want to slow down anything with systemcalls we dont need
1460     * accauracy */
1461    gettimeofday(&cur_sys_time, NULL);
1462#else
1463# if USE_CLOCK_GETTIME
1464    struct timespec cur_sys_time;
1465   
1466    /* This looks terrible and I feel bad doing it. But it's OK
1467     * on new kernels, because this is a vsyscall */
1468    clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1469# else
1470    struct timeval cur_sys_time;
1471    /* Should be a vsyscall */
1472    gettimeofday(&cur_sys_time, NULL);
1473# endif
1474#endif
1475
1476    /* Record the size of our header */
1477    hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
1478    *hdr_size = sizeof(struct dpdk_addt_hdr);
1479    /* Now put our header in front of that size */
1480    hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
1481    memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1482   
1483#if GET_MAC_CRC_CHECKSUM
1484    /* Add back in the CRC sum */
1485    pkt->pkt.pkt_len += ETHER_CRC_LEN;
1486    pkt->pkt.data_len += ETHER_CRC_LEN;
1487    hdr->flags |= INCLUDES_CHECKSUM;
1488#endif
1489
1490#if HAS_HW_TIMESTAMPS_82580
1491    /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1492     *
1493     *        +----------+---+   +--------------+
1494     *  82580 |    24    | 8 |   |      32      |
1495     *        +----------+---+   +--------------+
1496     *          reserved  \______ 40 bits _____/
1497     *
1498     * The 40 bit 82580 SYSTIM overflows every
1499     *   2^40 * 10^-9 /  60  = 18.3 minutes.
1500     *
1501     * NOTE picture is in Big Endian order, in memory it's acutally in Little
1502     * Endian (for the full 64 bits) i.e. picture is mirrored
1503     */
1504   
1505    /* The timestamp is sitting before our packet and is included in pkt_len */
1506    hdr->flags |= INCLUDES_HW_TIMESTAMP;
1507    hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
1508   
1509    /* Despite what the documentation says this is in Little
1510     * Endian byteorder. Mask the reserved section out.
1511     */
1512    hdr->timestamp = le64toh(hw_ts->timestamp) & 
1513                ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1514               
1515    cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1516    if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
1517        FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1518        FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
1519    }
1520   
1521    /* This will have serious problems if packets aren't read quickly
1522     * that is within a couple of seconds because our clock cycles every
1523     * 18 seconds */
1524    estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
1525                            / (1ull<<TS_NBITS_82580);
1526   
1527    /* Estimated_wraps gives the number of times the counter should have
1528     * wrapped (however depending on value last time it could have wrapped
1529     * twice more (if hw clock is close to its max value) or once less (allowing
1530     * for a bit of variance between hw and sys clock). But if the clock
1531     * shouldn't have wrapped once then don't allow it to go backwards in time */
1532    if (unlikely(estimated_wraps >= 2)) {
1533        /* 2 or more wrap arounds add all but the very last wrap */
1534        FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
1535    }
1536   
1537    /* Set the timestamp to the lowest possible value we're considering */
1538    hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
1539                        FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
1540   
1541    /* In most runs only the first if() will need evaluating - i.e our
1542     * estimate is correct. */
1543    if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1544                                hdr->timestamp, MAXSKEW_82580))) {
1545        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1546        FORMAT(libtrace)->wrap_count++;
1547        hdr->timestamp += (1ull<<TS_NBITS_82580);
1548        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1549                                hdr->timestamp, MAXSKEW_82580)) {
1550            /* Failed to match estimated_wraps */
1551            FORMAT(libtrace)->wrap_count++;
1552            hdr->timestamp += (1ull<<TS_NBITS_82580);
1553            if (!WITHIN_VARIANCE(cur_sys_time_ns,
1554                                hdr->timestamp, MAXSKEW_82580)) {
1555                if (estimated_wraps == 0) {
1556                    /* 0 case Failed to match estimated_wraps+2 */
1557                    printf("WARNING - Hardware Timestamp failed to"
1558                                            " match using systemtime!\n");
1559                    hdr->timestamp = cur_sys_time_ns;
1560                } else {
1561                    /* Failed to match estimated_wraps+1 */
1562                    FORMAT(libtrace)->wrap_count++;
1563                    hdr->timestamp += (1ull<<TS_NBITS_82580);
1564                    if (!WITHIN_VARIANCE(cur_sys_time_ns,
1565                                hdr->timestamp, MAXSKEW_82580)) {
1566                        /* Failed to match estimated_wraps+2 */
1567                        printf("WARNING - Hardware Timestamp failed to"
1568                                            " match using systemtime!!\n");
1569                    }
1570                }
1571            }
1572        }
1573    }
1574
1575    /* Log our previous for the next loop */
1576    FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
1577
1578#else
1579# if USE_CLOCK_GETTIME
1580    hdr->timestamp = TS_TO_NS(cur_sys_time);
1581# else
1582    hdr->timestamp = TV_TO_NS(cur_sys_time);
1583# endif
1584#endif
1585
1586    /* Intels samples prefetch into level 0 cache lets assume it is a good
1587     * idea and do the same */
1588    rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
1589    packet->buffer = pkt;
1590    dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1591
1592    /* Set our capture length for the first time */
1593    hdr->cap_len = dpdk_get_wire_length(packet);
1594    if (!(hdr->flags & INCLUDES_CHECKSUM)) {
1595        hdr->cap_len -= ETHER_CRC_LEN;
1596    }
1597   
1598
1599    return dpdk_get_framing_length(packet) +
1600                        dpdk_get_capture_length(packet);
1601}
1602
[29bbef0]1603
1604static void dpdk_fin_packet(libtrace_packet_t *packet)
1605{
1606        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1607                rte_pktmbuf_free(packet->buffer);
1608                packet->buffer = NULL;
1609        }
1610}
1611
[c04929c]1612static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1613    int nb_rx; /* Number of rx packets we've recevied */
1614    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1615
1616    /* Free the last packet buffer */
1617    if (packet->buffer != NULL) {
1618        /* Buffer is owned by DPDK */
1619        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1620            rte_pktmbuf_free(packet->buffer);
1621            packet->buffer = NULL;
1622        } else
1623        /* Buffer is owned by packet i.e. has been malloc'd */
1624        if (packet->buf_control == TRACE_CTRL_PACKET) {
1625            free(packet->buffer);
1626            packet->buffer = NULL;
1627        }
1628    }
1629   
1630    packet->buf_control = TRACE_CTRL_EXTERNAL;
1631    packet->type = TRACE_RT_DATA_DPDK;
1632   
1633    /* Wait for a packet */
1634    while (1) {
1635        /* Poll for a single packet */
1636        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1637                            FORMAT(libtrace)->queue_id, pkts_burst, 1);       
1638        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
1639            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1640        }
1641    }
1642   
1643    /* We'll never get here - but if we did it would be bad */
1644    return -1;
1645}
[50ce607]1646
1647static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
[29bbef0]1648    int nb_rx; /* Number of rx packets we've recevied */
1649    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
1650
1651    /* Free the last packet buffer */
1652    if (packet->buffer != NULL) {
1653        /* Buffer is owned by DPDK */
[50ce607]1654        if ( packet->buf_control == TRACE_CTRL_EXTERNAL) {
[29bbef0]1655            rte_pktmbuf_free(packet->buffer);
1656            packet->buffer = NULL;
1657        } else
1658        /* Buffer is owned by packet i.e. has been malloc'd */
1659        if (packet->buf_control == TRACE_CTRL_PACKET) {
1660            free(packet->buffer);
1661            packet->buffer = NULL;
1662        }
1663    }
1664   
1665    packet->buf_control = TRACE_CTRL_EXTERNAL;
1666    packet->type = TRACE_RT_DATA_DPDK;
1667   
1668    /* Wait for a packet */
1669    while (1) {
1670        /* Poll for a single packet */
[50ce607]1671        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
1672                            PERPKT_FORMAT(t)->queue_id, pkts_burst, 1);
[29bbef0]1673        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
[09dc6dc]1674                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
[29bbef0]1675            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
1676        }
1677        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
[50ce607]1678        if (libtrace_message_queue_count(&t->messages) > 0) {
[29bbef0]1679                        printf("Extra message yay");
1680                        return -2;
1681                }
1682    }
1683   
1684    /* We'll never get here - but if we did it would be bad */
1685    return -1;
1686}
[c04929c]1687
1688static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1689    struct timeval tv;
1690    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1691   
1692    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1693    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1694    return tv;
1695}
1696
1697static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1698    struct timespec ts;
1699    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1700   
1701    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1702    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1703    return ts;
1704}
1705
1706static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1707    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1708}
1709
1710static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1711    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1712    return (libtrace_direction_t) hdr->direction;
1713}
1714
1715static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1716    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1717    hdr->direction = (uint8_t) direction;
1718    return (libtrace_direction_t) hdr->direction;
1719}
1720
1721/*
1722 * NOTE: Drops could occur for other reasons than running out of buffer
1723 * space. Such as failed MAC checksums and oversized packets.
1724 */
1725static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
1726    struct rte_eth_stats stats = {0};
1727   
1728    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1729        return UINT64_MAX;
1730    /* Grab the current stats */
1731    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1732   
1733    /* Get the drop counter */
1734    return (uint64_t) stats.ierrors;
1735}
1736
1737static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
1738    struct rte_eth_stats stats = {0};
1739   
1740    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1741        return UINT64_MAX;
1742    /* Grab the current stats */
1743    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1744   
1745    /* Get the drop counter */
1746    return (uint64_t) stats.ipackets;
1747}
1748
1749/*
1750 * This is the number of packets filtered by the NIC
1751 * and maybe ahead of number read using libtrace.
1752 *
1753 * XXX we are yet to implement any filtering, but if it was this should
1754 * get the result. So this will just return 0 for now.
1755 */
1756static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
1757    struct rte_eth_stats stats = {0};
1758   
1759    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
1760        return UINT64_MAX;
1761    /* Grab the current stats */
1762    rte_eth_stats_get(FORMAT(trace)->port, &stats);
1763   
1764    /* Get the drop counter */
1765    return (uint64_t) stats.fdirmiss;
1766}
1767
1768/* Attempts to read a packet in a non-blocking fashion. If one is not
1769 * available a SLEEP event is returned. We do not have the ability to
1770 * create a select()able file descriptor in DPDK.
1771 */
1772static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
1773                                        libtrace_packet_t *packet) {
1774    libtrace_eventobj_t event = {0,0,0.0,0};
1775    int nb_rx; /* Number of receive packets we've read */
1776    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
1777   
1778    do {
1779   
1780        /* See if we already have a packet waiting */
1781        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
1782                        FORMAT(trace)->queue_id, pkts_burst, 1);
1783       
1784        if (nb_rx > 0) {
1785            /* Free the last packet buffer */
1786            if (packet->buffer != NULL) {
1787                /* Buffer is owned by DPDK */
1788                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1789                    rte_pktmbuf_free(packet->buffer);
1790                    packet->buffer = NULL;
1791                } else
1792                /* Buffer is owned by packet i.e. has been malloc'd */
1793                if (packet->buf_control == TRACE_CTRL_PACKET) {
1794                    free(packet->buffer);
1795                    packet->buffer = NULL;
1796                }
1797            }
1798           
1799            packet->buf_control = TRACE_CTRL_EXTERNAL;
1800            packet->type = TRACE_RT_DATA_DPDK;
1801            event.type = TRACE_EVENT_PACKET;
1802            event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
1803           
1804            /* XXX - Check this passes the filter trace_read_packet normally
1805             * does this for us but this wont */
1806            if (trace->filter) {
1807                if (!trace_apply_filter(trace->filter, packet)) {
1808                    /* Failed the filter so we loop for another packet */
1809                    continue;
1810                }
1811            }
1812        } else {
1813            /* We only want to sleep for a very short time - we are non-blocking */
1814            event.type = TRACE_EVENT_SLEEP;
1815            event.seconds = 0.0001;
1816            event.size = 0;
1817        }
1818       
1819        /* If we get here we have our event */
1820        break;
1821    } while (1);
1822
1823    return event;
1824}
1825
1826
1827static void dpdk_help(void) {
1828    printf("dpdk format module: $Revision: 1752 $\n");
1829    printf("Supported input URIs:\n");
1830    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
1831    printf("\tThe -<coreid> is optional \n");
1832    printf("\t e.g. dpdk:0000:01:00.1\n");
1833    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
1834    printf("\t By default the last CPU core is used if not otherwise specified.\n");
1835    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
1836    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
1837    printf("\n");
1838    printf("Supported output URIs:\n");
1839    printf("\tSame format as the input URI.\n");
1840    printf("\t e.g. dpdk:0000:01:00.1\n");
1841    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
1842    printf("\n");
1843}
1844
[b13b939]1845static struct libtrace_format_t dpdk = {
[c04929c]1846        "dpdk",
1847        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
1848        TRACE_FORMAT_DPDK,
1849        NULL,                   /* probe filename */
1850        NULL,                               /* probe magic */
1851        dpdk_init_input,            /* init_input */
1852        dpdk_config_input,          /* config_input */
1853        dpdk_start_input,           /* start_input */
1854        dpdk_pause_input,           /* pause_input */
1855        dpdk_init_output,           /* init_output */
1856        NULL,                               /* config_output */
1857        dpdk_start_output,          /* start_ouput */
1858        dpdk_fin_input,             /* fin_input */
1859        dpdk_fin_output,        /* fin_output */
1860        dpdk_read_packet,           /* read_packet */
1861        dpdk_prepare_packet,    /* prepare_packet */
[29bbef0]1862        dpdk_fin_packet,                                    /* fin_packet */
[c04929c]1863        dpdk_write_packet,          /* write_packet */
1864        dpdk_get_link_type,         /* get_link_type */
1865        dpdk_get_direction,         /* get_direction */
1866        dpdk_set_direction,         /* set_direction */
1867        NULL,                               /* get_erf_timestamp */
1868        dpdk_get_timeval,           /* get_timeval */
1869        dpdk_get_timespec,          /* get_timespec */
1870        NULL,                               /* get_seconds */
1871        NULL,                               /* seek_erf */
1872        NULL,                               /* seek_timeval */
1873        NULL,                               /* seek_seconds */
1874        dpdk_get_capture_length,/* get_capture_length */
1875        dpdk_get_wire_length,   /* get_wire_length */
1876        dpdk_get_framing_length,/* get_framing_length */
1877        dpdk_set_capture_length,/* set_capture_length */
1878        NULL,                               /* get_received_packets */
1879        dpdk_get_filtered_packets,/* get_filtered_packets */
1880        dpdk_get_dropped_packets,/* get_dropped_packets */
1881    dpdk_get_captured_packets,/* get_captured_packets */
1882        NULL,                       /* get_fd */
1883        dpdk_trace_event,               /* trace_event */
1884    dpdk_help,              /* help */
[b13b939]1885    NULL,                   /* next pointer */
1886    {true, 8},              /* Live, NICs typically have 8 threads */
[29bbef0]1887    dpdk_pstart_input, /* pstart_input */
1888        dpdk_pread_packet, /* pread_packet */
1889        dpdk_pause_input, /* ppause */
1890        dpdk_fin_input, /* p_fin */
[50ce607]1891        dpdk_pconfig_input, /* pconfig_input */
1892    dpdk_pregister_thread, /* pregister_thread */
1893    dpdk_punregister_thread /* unpregister_thread */
[c04929c]1894};
1895
1896void dpdk_constructor(void) {
1897        register_format(&dpdk);
1898}
Note: See TracBrowser for help on using the repository browser.