source: lib/format_dpdk.c @ 4631115

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 4631115 was 4631115, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Merge branch 'master' into develop.

Update to include fixes/features etc from the 3.0.22 release.

Conflicts:

README
lib/format_dag25.c
lib/format_dpdk.c
lib/format_linux.c
lib/trace.c
test/Makefile

  • Property mode set to 100644
File size: 75.6 KB
Line 
1
2/*
3 * This file is part of libtrace
4 *
5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
6 * New Zealand.
7 *
8 * Author: Richard Sanger
9 *
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
30 *
31 */
32
33/* This format module deals with using the Intel Data Plane Development
34 * Kit capture format.
35 *
36 * Intel Data Plane Development Kit is a LIVE capture format.
37 *
38 * This format also supports writing which will write packets out to the
39 * network as a form of packet replay. This should not be confused with the
40 * RT protocol which is intended to transfer captured packet records between
41 * RT-speaking programs.
42 */
43
44#define _GNU_SOURCE
45
46#include "config.h"
47#include "libtrace.h"
48#include "libtrace_int.h"
49#include "format_helper.h"
50#include "libtrace_arphrd.h"
51#include "hash_toeplitz.h"
52
53#ifdef HAVE_INTTYPES_H
54#  include <inttypes.h>
55#else
56# error "Can't find inttypes.h"
57#endif
58
59#include <stdlib.h>
60#include <assert.h>
61#include <unistd.h>
62#include <endian.h>
63#include <string.h>
64
65#if HAVE_LIBNUMA
66#include <numa.h>
67#endif
68
69/* We can deal with any minor differences by checking the RTE VERSION
70 * Typically DPDK backports some fixes (typically for building against
71 * newer kernels) to the older version of DPDK.
72 *
73 * These get released with the rX suffix. The following macros where added
74 * in these new releases.
75 *
76 * Below this is a log of version that required changes to the libtrace
77 * code (that we still attempt to support).
78 *
79 * DPDK v1.7.1 is recommended.
80 * However 1.5 to 1.8 are likely supported.
81 */
82#include <rte_eal.h>
83#include <rte_version.h>
84#ifndef RTE_VERSION_NUM
85#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
86#endif
87#ifndef RTE_VER_PATCH_RELEASE
88#       define RTE_VER_PATCH_RELEASE 0
89#endif
90#ifndef RTE_VERSION
91#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
92        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
93#endif
94
95/* 1.6.0r2 :
96 *      rte_eal_pci_set_blacklist() is removed
97 *      device_list is renamed to pci_device_list
98 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
99 *      as such we do apply the whitelist before rte_eal_init.
100 *      This also works correctly with DPDK 1.6.0r2.
101 *
102 * Replaced by:
103 *      rte_devargs (we can simply whitelist)
104 */
105#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
106#       define DPDK_USE_BLACKLIST 1
107#else
108#       define DPDK_USE_BLACKLIST 0
109#endif
110
111/*
112 * 1.7.0 :
113 *      rte_pmd_init_all is removed
114 *
115 * Replaced by:
116 *      Nothing, no longer needed
117 */
118#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
119#       define DPDK_USE_PMD_INIT 1
120#else
121#       define DPDK_USE_PMD_INIT 0
122#endif
123
124/* 1.7.0-rc3 :
125 *
126 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
127 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
128 * it twice.
129 */
130#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
131#       define DPDK_USE_PCI_PROBE 1
132#else
133#       define DPDK_USE_PCI_PROBE 0
134#endif
135
136/* 1.8.0-rc1 :
137 * LOG LEVEL is a command line option which overrides what
138 * we previously set it to.
139 */
140#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
141#       define DPDK_USE_LOG_LEVEL 1
142#else
143#       define DPDK_USE_LOG_LEVEL 0
144#endif
145
146#include <rte_per_lcore.h>
147#include <rte_debug.h>
148#include <rte_errno.h>
149#include <rte_common.h>
150#include <rte_log.h>
151#include <rte_memcpy.h>
152#include <rte_prefetch.h>
153#include <rte_branch_prediction.h>
154#include <rte_pci.h>
155#include <rte_ether.h>
156#include <rte_ethdev.h>
157#include <rte_ring.h>
158#include <rte_mempool.h>
159#include <rte_mbuf.h>
160#include <rte_launch.h>
161#include <rte_lcore.h>
162#include <rte_per_lcore.h>
163#include <rte_cycles.h>
164#include <pthread.h>
165
166/* The default size of memory buffers to use - This is the max size of standard
167 * ethernet packet less the size of the MAC CHECKSUM */
168#define RX_MBUF_SIZE 1514
169
170/* The minimum number of memory buffers per queue tx or rx. Search for
171 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
172 */
173#define MIN_NB_BUF 64
174
175/* Number of receive memory buffers to use
176 * By default this is limited by driver to 4k and must be a multiple of 128.
177 * A modification can be made to the driver to remove this limit.
178 * This can be increased in the driver and here.
179 * Should be at least MIN_NB_BUF.
180 */
181#define NB_RX_MBUF 4096
182
183/* Number of send memory buffers to use.
184 * Same limits apply as those to NB_TX_MBUF.
185 */
186#define NB_TX_MBUF 1024
187
188/* The size of the PCI blacklist needs to be big enough to contain
189 * every PCI device address (listed by lspci every bus:device.function tuple).
190 */
191#define BLACK_LIST_SIZE 50
192
193/* The maximum number of characters the mempool name can be */
194#define MEMPOOL_NAME_LEN 20
195
196/* For single threaded libtrace we read packets as a batch/burst
197 * this is the maximum size of said burst */
198#define BURST_SIZE 50
199
200#define MBUF(x) ((struct rte_mbuf *) x)
201/* Get the original placement of the packet data */
202#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
203#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
204#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
205
206#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
207                        (uint64_t) tv.tv_usec*1000ull)
208#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
209                        (uint64_t) ts.tv_nsec)
210
211#if RTE_PKTMBUF_HEADROOM != 128
212#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
213         "any libtrace instance processing these packet must be have the" \
214         "same RTE_PKTMBUF_HEADROOM set"
215#endif
216
217/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
218 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
219 *
220 * Make sure you understand what these are doing before enabling them.
221 * They might make traces incompatable with other builds etc.
222 *
223 * These are also included to show how to do somethings which aren't
224 * obvious in the DPDK documentation.
225 */
226
227/* Print verbose messages to stderr */
228#define DEBUG 0
229
230/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
231 * only turn on if you know clock_gettime is a vsyscall on your system
232 * overwise could be a large overhead. Again gettimeofday() should be
233 * vsyscall also if it's not you should seriously consider updating your
234 * kernel.
235 */
236#ifdef HAVE_LIBRT
237/* You can turn this on (set to 1) to prefer clock_gettime */
238#define USE_CLOCK_GETTIME 1
239#else
240/* DONT CHANGE THIS !!! */
241#define USE_CLOCK_GETTIME 1
242#endif
243
244/* This is fairly safe to turn on - currently there appears to be a 'bug'
245 * in DPDK that will remove the checksum by making the packet appear 4bytes
246 * smaller than what it really is. Most formats don't include the checksum
247 * hence writing out a port such as int: ring: and dpdk: assumes there
248 * is no checksum and will attempt to write the checksum as part of the
249 * packet
250 */
251#define GET_MAC_CRC_CHECKSUM 0
252
253/* This requires a modification of the pmd drivers (inside Intel DPDK)
254 * TODO this requires updating (packet sizes are wrong TS most likely also)
255 */
256#define HAS_HW_TIMESTAMPS_82580 0
257
258#if HAS_HW_TIMESTAMPS_82580
259# define TS_NBITS_82580     40
260/* The maximum on the +ve or -ve side that we can be, make it half way */
261# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
262#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
263#endif
264
265/* As per Intel 82580 specification - mismatch in 82580 datasheet
266 * it states ts is stored in Big Endian, however its actually Little */
267struct hw_timestamp_82580 {
268    uint64_t reserved;
269    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
270};
271
272enum paused_state {
273    DPDK_NEVER_STARTED,
274    DPDK_RUNNING,
275    DPDK_PAUSED,
276};
277
278struct dpdk_per_lcore_t
279{
280        uint16_t queue_id;
281        uint8_t port;
282        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
283#if HAS_HW_TIMESTAMPS_82580
284        /* Timestamping only relevent to RX */
285        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
286        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
287#endif
288};
289
290/* Used by both input and output however some fields are not used
291 * for output */
292struct dpdk_format_data_t {
293    int8_t promisc; /* promiscuous mode - RX only */
294    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
295    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
296    uint8_t paused; /* See paused_state */
297    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
298    uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
299    int snaplen; /* The snap length for the capture - RX only */
300    /* We always have to setup both rx and tx queues even if we don't want them */
301    int nb_rx_buf; /* The number of packet buffers in the rx ring */
302    int nb_tx_buf; /* The number of packet buffers in the tx ring */
303    int nic_numa_node; /* The NUMA node that the NIC is attached to */
304    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
305#if DPDK_USE_BLACKLIST
306    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
307        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
308#endif
309    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
310    uint8_t rss_key[40]; // This is the RSS KEY
311    /* To improve performance we always batch reading packets, in a burst */
312    struct rte_mbuf* burst_pkts[BURST_SIZE];
313    int burst_size; /* The total number read in the burst */
314    int burst_offset; /* The offset we are into the burst */
315        // DPDK normally seems to have a limit of 8 queues for a given card
316        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
317};
318
319enum dpdk_addt_hdr_flags {
320    INCLUDES_CHECKSUM = 0x1,
321    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
322};
323
324/**
325 * A structure placed in front of the packet where we can store
326 * additional information about the given packet.
327 * +--------------------------+
328 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
329 * +--------------------------+
330 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
331 * +--------------------------+
332 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
333 * +--------------------------+
334 * *   hw_timestamp_82580     * 16 bytes Optional
335 * +--------------------------+
336 * |       Packet data        | Variable Size
337 * |                          |
338 */
339struct dpdk_addt_hdr {
340    uint64_t timestamp;
341    uint8_t flags;
342    uint8_t direction;
343    uint8_t reserved1;
344    uint8_t reserved2;
345    uint32_t cap_len; /* The size to say the capture is */
346};
347
348/**
349 * We want to blacklist all devices except those on the whitelist
350 * (I say list, but yes it is only the one).
351 *
352 * The default behaviour of rte_pci_probe() will map every possible device
353 * to its DPDK driver. The DPDK driver will take the ethernet device
354 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
355 *
356 * So blacklist all devices except the one that we wish to use so that
357 * the others can still be used as standard ethernet ports.
358 *
359 * @return 0 if successful, otherwise -1 on error.
360 */
361#if DPDK_USE_BLACKLIST
362static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
363{
364        struct rte_pci_device *dev = NULL;
365        format_data->nb_blacklist = 0;
366
367        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
368
369        TAILQ_FOREACH(dev, &device_list, next) {
370        if (whitelist != NULL && whitelist->domain == dev->addr.domain
371            && whitelist->bus == dev->addr.bus
372            && whitelist->devid == dev->addr.devid
373            && whitelist->function == dev->addr.function)
374            continue;
375                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
376                                / sizeof (format_data->blacklist[0])) {
377                        fprintf(stderr, "Warning: too many devices to blacklist consider"
378                                        " increasing BLACK_LIST_SIZE");
379                        break;
380                }
381                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
382                ++format_data->nb_blacklist;
383        }
384
385        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
386        return 0;
387}
388#else /* DPDK_USE_BLACKLIST */
389#include <rte_devargs.h>
390static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
391{
392        char pci_str[20] = {0};
393        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
394                 whitelist->domain,
395                 whitelist->bus,
396                 whitelist->devid,
397                 whitelist->function);
398        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
399                return -1;
400        }
401        return 0;
402}
403#endif
404
405/**
406 * Parse the URI format as a pci address
407 * Fills in addr, note core is optional and is unchanged if
408 * a value for it is not provided.
409 *
410 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
411 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
412 */
413static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
414    int matches;
415    assert(str);
416    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
417                     &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
418    if (matches >= 4) {
419        return 0;
420    } else {
421        return -1;
422    }
423}
424
425/**
426 * Convert a pci address to the numa node it is
427 * connected to.
428 *
429 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
430 * so we can call it before DPDK
431 *
432 * @return -1 if unknown otherwise a number 0 or higher of the numa node
433 */
434static int pci_to_numa(struct rte_pci_addr * dev_addr) {
435        char path[50] = {0};
436        FILE *file;
437
438        /* Read from the system */
439        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
440                 dev_addr->domain,
441                 dev_addr->bus,
442                 dev_addr->devid,
443                 dev_addr->function);
444
445        if((file = fopen(path, "r")) != NULL) {
446                int numa_node = -1;
447                fscanf(file, "%d", &numa_node);
448                fclose(file);
449                return numa_node;
450        }
451        return -1;
452}
453
454#if DEBUG
455/* For debugging */
456static inline void dump_configuration()
457{
458    struct rte_config * global_config;
459    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
460
461    if (nb_cpu <= 0) {
462        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
463        nb_cpu = 1; /* fallback to just 1 core */
464    }
465    if (nb_cpu > RTE_MAX_LCORE)
466        nb_cpu = RTE_MAX_LCORE;
467
468    global_config = rte_eal_get_configuration();
469
470    if (global_config != NULL) {
471        int i;
472        fprintf(stderr, "Intel DPDK setup\n"
473               "---Version      : %s\n"
474               "---Master LCore : %"PRIu32"\n"
475               "---LCore Count  : %"PRIu32"\n",
476               rte_version(),
477               global_config->master_lcore, global_config->lcore_count);
478
479        for (i = 0 ; i < nb_cpu; i++) {
480            fprintf(stderr, "   ---Core %d : %s\n", i,
481                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
482        }
483
484        const char * proc_type;
485        switch (global_config->process_type) {
486            case RTE_PROC_AUTO:
487                proc_type = "auto";
488                break;
489            case RTE_PROC_PRIMARY:
490                proc_type = "primary";
491                break;
492            case RTE_PROC_SECONDARY:
493                proc_type = "secondary";
494                break;
495            case RTE_PROC_INVALID:
496                proc_type = "invalid";
497                break;
498            default:
499                proc_type = "something worse than invalid!!";
500        }
501        fprintf(stderr, "---Process Type : %s\n", proc_type);
502    }
503
504}
505#endif
506
507/**
508 * Expects to be called from the master lcore and moves it to the given dpdk id
509 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
510 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
511 *               and not already in use.
512 * @return 0 is successful otherwise -1 on error.
513 */
514static inline int dpdk_move_master_lcore(size_t core) {
515    struct rte_config *cfg = rte_eal_get_configuration();
516    cpu_set_t cpuset;
517    int i;
518
519    assert (core < RTE_MAX_LCORE);
520    assert (rte_get_master_lcore() == rte_lcore_id());
521
522    if (core == rte_lcore_id())
523        return 0;
524
525    // Make sure we are not overwriting someone else
526    assert(!rte_lcore_is_enabled(core));
527
528    // Move the core
529    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
530    cfg->lcore_role[core] = ROLE_RTE;
531    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
532    rte_eal_get_configuration()->master_lcore = core;
533    RTE_PER_LCORE(_lcore_id) = core;
534
535    // Now change the affinity
536    CPU_ZERO(&cpuset);
537
538    if (lcore_config[core].detected) {
539        CPU_SET(core, &cpuset);
540    } else {
541        for (i = 0; i < RTE_MAX_LCORE; ++i) {
542            if (lcore_config[i].detected)
543                CPU_SET(i, &cpuset);
544        }
545    }
546
547    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
548    if (i != 0) {
549        // TODO proper libtrace style error here!!
550        fprintf(stderr, "pthread_setaffinity_np failed\n");
551        return -1;
552    }
553    return 0;
554}
555
556
557/**
558 * XXX This is very bad XXX
559 * But we have to do something to allow getopts nesting
560 * Luckly normally the format is last so it doesn't matter
561 * DPDK only supports modern systems so hopefully this
562 * will continue to work
563 */
564struct saved_getopts {
565        char *optarg;
566        int optind;
567        int opterr;
568        int optopt;
569};
570
571static void save_getopts(struct saved_getopts *opts) {
572        opts->optarg = optarg;
573        opts->optind = optind;
574        opts->opterr = opterr;
575        opts->optopt = optopt;
576}
577
578static void restore_getopts(struct saved_getopts *opts) {
579        optarg = opts->optarg;
580        optind = opts->optind;
581        opterr = opts->opterr;
582        optopt = opts->optopt;
583}
584
585static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
586                                        char * err, int errlen) {
587    int ret; /* Returned error codes */
588    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
589    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
590    char mem_map[20] = {0}; /* The memory name */
591    long nb_cpu; /* The number of CPUs in the system */
592    long my_cpu; /* The CPU number we want to bind to */
593    int i;
594    struct rte_config *cfg = rte_eal_get_configuration();
595        struct saved_getopts save_opts;
596
597#if DEBUG
598    rte_set_log_level(RTE_LOG_DEBUG);
599#else
600    rte_set_log_level(RTE_LOG_WARNING);
601#endif
602    /*
603     * Using unique file prefixes mean separate memory is used, unlinking
604     * the two processes. However be careful we still cannot access a
605     * port that already in use.
606     *
607     * Using unique file prefixes mean separate memory is used, unlinking
608     * the two processes. However be careful we still cannot access a
609     * port that already in use.
610     */
611    char* argv[] = {"libtrace",
612                    "-c", cpu_number,
613                    "-n", "1",
614                    "--proc-type", "auto",
615                    "--file-prefix", mem_map,
616                    "-m", "256",
617#if DPDK_USE_LOG_LEVEL
618#       if DEBUG
619                    "--log-level", "8", /* RTE_LOG_DEBUG */
620#       else
621                    "--log-level", "5", /* RTE_LOG_WARNING */
622#       endif
623#endif
624                    NULL};
625    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
626
627    /* This initialises the Environment Abstraction Layer (EAL)
628     * If we had slave workers these are put into WAITING state
629     *
630     * Basically binds this thread to a fixed core, which we choose as
631     * the last core on the machine (assuming fewer interrupts mapped here).
632     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
633     * "-n" the number of memory channels into the CPU (hardware specific)
634     *      - Most likely to be half the number of ram slots in your machine.
635     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
636     * Controls where in memory packets are stored and should spread across
637     * the channels. We just use 1 to be safe.
638     */
639
640    /* Get the number of cpu cores in the system and use the last core
641     * on the correct numa node */
642    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
643    if (nb_cpu <= 0) {
644        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
645        nb_cpu = 1; /* fallback to the first core */
646    }
647    if (nb_cpu > RTE_MAX_LCORE)
648        nb_cpu = RTE_MAX_LCORE;
649
650    my_cpu = -1;
651    /* This allows the user to specify the core - we would try to do this
652     * automatically but it's hard to tell that this is secondary
653     * before running rte_eal_init(...). Currently we are limited to 1
654     * instance per core due to the way memory is allocated. */
655    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
656        snprintf(err, errlen, "Failed to parse URI");
657        return -1;
658    }
659
660#if HAVE_LIBNUMA
661        format_data->nic_numa_node = pci_to_numa(&use_addr);
662        if (my_cpu < 0) {
663                /* If we can assign to a core on the same numa node */
664                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
665                if(format_data->nic_numa_node >= 0) {
666                        int max_node_cpu = -1;
667                        struct bitmask *mask = numa_allocate_cpumask();
668                        assert(mask);
669                        numa_node_to_cpus(format_data->nic_numa_node, mask);
670                        for (i = 0 ; i < nb_cpu; ++i) {
671                                if (numa_bitmask_isbitset(mask,i))
672                                        max_node_cpu = i+1;
673                        }
674                        my_cpu = max_node_cpu;
675                }
676        }
677#endif
678        if (my_cpu < 0) {
679                my_cpu = nb_cpu;
680        }
681
682
683    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
684                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
685
686    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
687        snprintf(err, errlen,
688          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
689          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
690        return -1;
691    }
692
693    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
694       info older versions */
695    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
696    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
697
698#if !DPDK_USE_BLACKLIST
699    /* Black list all ports besides the one that we want to use */
700        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
701                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
702                         " are you sure the address is correct?: %s", strerror(-ret));
703                return -1;
704        }
705#endif
706
707        /* Give the memory map a unique name */
708        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
709    /* rte_eal_init it makes a call to getopt so we need to reset the
710     * global optind variable of getopt otherwise this fails */
711        save_getopts(&save_opts);
712    optind = 1;
713    if ((ret = rte_eal_init(argc, argv)) < 0) {
714        snprintf(err, errlen,
715          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
716        return -1;
717    }
718        restore_getopts(&save_opts);
719    // These are still running but will never do anything with DPDK v1.7 we
720    // should remove this XXX in the future
721    for(i = 0; i < RTE_MAX_LCORE; ++i) {
722            if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
723            cfg->lcore_role[i] = ROLE_OFF;
724            cfg->lcore_count--;
725        }
726    }
727    // Only the master should be running
728    assert(cfg->lcore_count == 1);
729
730    dpdk_move_master_lcore(my_cpu-1);
731
732#if DEBUG
733    dump_configuration();
734#endif
735
736#if DPDK_USE_PMD_INIT
737    /* This registers all available NICs with Intel DPDK
738     * These are not loaded until rte_eal_pci_probe() is called.
739     */
740    if ((ret = rte_pmd_init_all()) < 0) {
741        snprintf(err, errlen,
742          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
743        return -1;
744    }
745#endif
746
747#if DPDK_USE_BLACKLIST
748    /* Blacklist all ports besides the one that we want to use */
749        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
750                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
751                         " are you sure the address is correct?: %s", strerror(-ret));
752                return -1;
753        }
754#endif
755
756#if DPDK_USE_PCI_PROBE
757    /* This loads DPDK drivers against all ports that are not blacklisted */
758        if ((ret = rte_eal_pci_probe()) < 0) {
759        snprintf(err, errlen,
760            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
761        return -1;
762    }
763#endif
764
765    format_data->nb_ports = rte_eth_dev_count();
766
767    if (format_data->nb_ports != 1) {
768        snprintf(err, errlen,
769            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
770            format_data->nb_ports);
771        return -1;
772    }
773
774    struct rte_eth_dev_info dev_info;
775    rte_eth_dev_info_get(0, &dev_info);
776    fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
777                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
778
779    return 0;
780}
781
782static int dpdk_init_input (libtrace_t *libtrace) {
783    char err[500];
784    err[0] = 0;
785
786    libtrace->format_data = (struct dpdk_format_data_t *)
787                            malloc(sizeof(struct dpdk_format_data_t));
788    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
789    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
790    FORMAT(libtrace)->nb_ports = 0;
791    FORMAT(libtrace)->snaplen = 0; /* Use default */
792    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
793    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
794    FORMAT(libtrace)->nic_numa_node = -1;
795    FORMAT(libtrace)->promisc = -1;
796    FORMAT(libtrace)->pktmbuf_pool = NULL;
797#if DPDK_USE_BLACKLIST
798    FORMAT(libtrace)->nb_blacklist = 0;
799#endif
800    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
801    FORMAT(libtrace)->mempool_name[0] = 0;
802    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
803    FORMAT(libtrace)->burst_size = 0;
804    FORMAT(libtrace)->burst_offset = 0;
805
806    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
807        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
808        free(libtrace->format_data);
809        libtrace->format_data = NULL;
810        return -1;
811    }
812    return 0;
813};
814
815static int dpdk_init_output(libtrace_out_t *libtrace)
816{
817    char err[500];
818    err[0] = 0;
819
820    libtrace->format_data = (struct dpdk_format_data_t *)
821                            malloc(sizeof(struct dpdk_format_data_t));
822    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
823    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
824    FORMAT(libtrace)->nb_ports = 0;
825    FORMAT(libtrace)->snaplen = 0; /* Use default */
826    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
827    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
828    FORMAT(libtrace)->nic_numa_node = -1;
829    FORMAT(libtrace)->promisc = -1;
830    FORMAT(libtrace)->pktmbuf_pool = NULL;
831#if DPDK_USE_BLACKLIST
832    FORMAT(libtrace)->nb_blacklist = 0;
833#endif
834    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
835    FORMAT(libtrace)->mempool_name[0] = 0;
836    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
837    FORMAT(libtrace)->burst_size = 0;
838    FORMAT(libtrace)->burst_offset = 0;
839
840    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
841        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
842        free(libtrace->format_data);
843        libtrace->format_data = NULL;
844        return -1;
845    }
846    return 0;
847};
848
849static int dpdk_pconfig_input (libtrace_t *libtrace,
850                                trace_parallel_option_t option,
851                                void *data) {
852        switch (option) {
853                case TRACE_OPTION_SET_HASHER:
854                        switch (*((enum hasher_types *) data))
855                        {
856                                case HASHER_BALANCE:
857                                case HASHER_UNIDIRECTIONAL:
858                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
859                                        return 0;
860                                case HASHER_BIDIRECTIONAL:
861                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
862                                        return 0;
863                                case HASHER_HARDWARE:
864                                case HASHER_CUSTOM:
865                                        // We don't support these
866                                        return -1;
867                        }
868        break;
869        }
870        return -1;
871}
872/**
873 * Note here snaplen excludes the MAC checksum. Packets over
874 * the requested snaplen will be dropped. (Excluding MAC checksum)
875 *
876 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
877 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
878 * is set the maximum size of the returned packet would be 1518 otherwise
879 * 1514 would be the largest size possibly returned.
880 *
881 */
882static int dpdk_config_input (libtrace_t *libtrace,
883                                        trace_option_t option,
884                                        void *data) {
885    switch (option) {
886        case TRACE_OPTION_SNAPLEN:
887            /* Only support changing snaplen before a call to start is
888             * made */
889            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
890                FORMAT(libtrace)->snaplen=*(int*)data;
891            else
892                return -1;
893            return 0;
894                case TRACE_OPTION_PROMISC:
895                        FORMAT(libtrace)->promisc=*(int*)data;
896            return 0;
897        case TRACE_OPTION_FILTER:
898            /* TODO filtering */
899            break;
900        case TRACE_OPTION_META_FREQ:
901            break;
902        case TRACE_OPTION_EVENT_REALTIME:
903            break;
904        /* Avoid default: so that future options will cause a warning
905         * here to remind us to implement it, or flag it as
906         * unimplementable
907         */
908    }
909
910        /* Don't set an error - trace_config will try to deal with the
911         * option and will set an error if it fails */
912    return -1;
913}
914
915/* Can set jumbo frames/ or limit the size of a frame by setting both
916 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
917 *
918 */
919static struct rte_eth_conf port_conf = {
920        .rxmode = {
921                .mq_mode = ETH_RSS,
922                .split_hdr_size = 0,
923                .header_split   = 0, /**< Header Split disabled */
924                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
925                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
926                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
927                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
928#if GET_MAC_CRC_CHECKSUM
929/* So it appears that if hw_strip_crc is turned off the driver will still
930 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
931 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
932 * So lets just add it back on when we receive the packet.
933 */
934                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
935#else
936/* By default strip the MAC checksum because it's a bit of a hack to
937 * actually read these. And don't want to rely on disabling this to actualy
938 * always cut off the checksum in the future
939 */
940                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
941#endif
942        },
943        .txmode = {
944                .mq_mode = ETH_DCB_NONE,
945        },
946        .rx_adv_conf = {
947                .rss_conf = {
948                        // .rss_key = &rss_key, // We set this per format
949                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
950                },
951        },
952        .intr_conf = {
953                .lsc = 1
954        }
955};
956
957static const struct rte_eth_rxconf rx_conf = {
958        .rx_thresh = {
959                .pthresh = 8,/* RX_PTHRESH prefetch */
960                .hthresh = 8,/* RX_HTHRESH host */
961                .wthresh = 4,/* RX_WTHRESH writeback */
962        },
963    .rx_free_thresh = 0,
964    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
965};
966
967static const struct rte_eth_txconf tx_conf = {
968        .tx_thresh = {
969        /**
970         * TX_PTHRESH prefetch
971         * Set on the NIC, if the number of unprocessed descriptors to queued on
972         * the card fall below this try grab at least hthresh more unprocessed
973         * descriptors.
974         */
975                .pthresh = 36,
976
977        /* TX_HTHRESH host
978         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
979         */
980                .hthresh = 0,
981
982        /* TX_WTHRESH writeback
983         * Set on the NIC, the number of sent descriptors before writing back
984         * status to confirm the transmission. This is done more efficiently as
985         * a bulk DMA-transfer rather than writing one at a time.
986         * Similar to tx_free_thresh however this is applied to the NIC, where
987         * as tx_free_thresh is when DPDK will check these. This is extended
988         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
989         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
990         */
991                .wthresh = 4,
992        },
993
994    /* Used internally by DPDK rather than passed to the NIC. The number of
995     * packet descriptors to send before checking for any responses written
996     * back (to confirm the transmission). Default = 32 if set to 0)
997     */
998        .tx_free_thresh = 0,
999
1000    /* This is the Report Status threshold, used by 10Gbit cards,
1001     * This signals the card to only write back status (such as
1002     * transmission successful) after this minimum number of transmit
1003     * descriptors are seen. The default is 32 (if set to 0) however if set
1004     * to greater than 1 TX wthresh must be set to zero, because this is kindof
1005     * a replacement. See the dpdk programmers guide for more restrictions.
1006     */
1007        .tx_rs_thresh = 1,
1008};
1009
1010/**
1011 * A callback for a link state change (LSC).
1012 *
1013 * Packets may be received before this notification. In fact the DPDK IGXBE
1014 * driver likes to put a delay upto 5sec before sending this.
1015 *
1016 * We use this to ensure the link speed is correct for our timestamp
1017 * calculations. Because packets might be received before the link up we still
1018 * update this when the packet is received.
1019 *
1020 * @param port The DPDK port
1021 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1022 * @param cb_arg The dpdk_format_data_t structure associated with the format
1023 */
1024static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1025                              void *cb_arg) {
1026        struct dpdk_format_data_t * format_data = cb_arg;
1027        struct rte_eth_link link_info;
1028        assert(event == RTE_ETH_EVENT_INTR_LSC);
1029        assert(port == format_data->port);
1030
1031        rte_eth_link_get_nowait(port, &link_info);
1032
1033        if (link_info.link_status)
1034                format_data->link_speed = link_info.link_speed;
1035        else
1036                format_data->link_speed = 0;
1037
1038#if DEBUG
1039        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1040                link_info.link_status ? "up" : "down",
1041                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1042                                          "full-duplex" : "half-duplex",
1043                (int) link_info.link_speed);
1044#endif
1045
1046        /* Turns out DPDK drivers might not come back up if the link speed
1047         * changes. So we reset the autoneg procedure. This is very unsafe
1048         * we have have threads reading packets and we stop the port. */
1049#if 0
1050        if (!link_info.link_status) {
1051                int ret;
1052                rte_eth_dev_stop(port);
1053                ret = rte_eth_dev_start(port);
1054                if (ret < 0) {
1055                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1056                                strerror(-ret));
1057                }
1058        }
1059#endif
1060}
1061
1062/* Attach memory to the port and start the port or restart the port.
1063 */
1064static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
1065    int ret; /* Check return values for errors */
1066    struct rte_eth_link link_info; /* Wait for link */
1067    unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id());
1068
1069    /* Already started */
1070    if (format_data->paused == DPDK_RUNNING)
1071        return 0;
1072
1073    /* First time started we need to alloc our memory, doing this here
1074     * rather than in environment setup because we don't have snaplen then */
1075    if (format_data->paused == DPDK_NEVER_STARTED) {
1076        if (format_data->snaplen == 0) {
1077            format_data->snaplen = RX_MBUF_SIZE;
1078            port_conf.rxmode.jumbo_frame = 0;
1079            port_conf.rxmode.max_rx_pkt_len = 0;
1080        } else {
1081            /* Use jumbo frames */
1082            port_conf.rxmode.jumbo_frame = 1;
1083            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1084        }
1085
1086        /* This is additional overhead so make sure we allow space for this */
1087#if GET_MAC_CRC_CHECKSUM
1088        format_data->snaplen += ETHER_CRC_LEN;
1089#endif
1090#if HAS_HW_TIMESTAMPS_82580
1091        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1092#endif
1093
1094        /* Create the mbuf pool, which is the place our packets are allocated
1095         * from - TODO figure out if there is is a free function (I cannot see one)
1096         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1097         * allocate however that extra 1 packet is not used.
1098         * (I assume <= vs < error some where in DPDK code)
1099         * TX requires nb_tx_buffers + 1 in the case the queue is full
1100         * so that will fill the new buffer and wait until slots in the
1101         * ring become available.
1102         */
1103#if DEBUG
1104    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1105#endif
1106    format_data->pktmbuf_pool =
1107            rte_mempool_create(format_data->mempool_name,
1108                       (format_data->nb_rx_buf + format_data->nb_tx_buf + 1),
1109                       format_data->snaplen + sizeof(struct rte_mbuf)
1110                                        + RTE_PKTMBUF_HEADROOM,
1111                       128, sizeof(struct rte_pktmbuf_pool_private),
1112                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1113                       cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/);
1114
1115    if (format_data->pktmbuf_pool == NULL) {
1116            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"
1117                        "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node);
1118            return -1;
1119        }
1120    }
1121
1122    /* ----------- Now do the setup for the port mapping ------------ */
1123    /* Order of calls must be
1124     * rte_eth_dev_configure()
1125     * rte_eth_tx_queue_setup()
1126     * rte_eth_rx_queue_setup()
1127     * rte_eth_dev_start()
1128     * other rte_eth calls
1129     */
1130
1131
1132    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1133
1134    /* This must be called first before another *eth* function
1135     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1136    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
1137    if (ret < 0) {
1138        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1139                            " %"PRIu8" : %s", format_data->port,
1140                            strerror(-ret));
1141        return -1;
1142    }
1143    /* Initialise the TX queue a minimum value if using this port for
1144     * receiving. Otherwise a larger size if writing packets.
1145     */
1146    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1147                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1148    if (ret < 0) {
1149        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1150                            " %"PRIu8" : %s", format_data->port,
1151                            strerror(-ret));
1152        return -1;
1153    }
1154    /* Initialise the RX queue with some packets from memory */
1155    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
1156                                 format_data->nb_rx_buf, cpu_numa_node,
1157                                 &rx_conf, format_data->pktmbuf_pool);
1158    if (ret < 0) {
1159        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1160                    " %"PRIu8" : %s", format_data->port,
1161                    strerror(-ret));
1162        return -1;
1163    }
1164
1165    /* Start device */
1166    ret = rte_eth_dev_start(format_data->port);
1167    if (ret < 0) {
1168        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1169                    strerror(-ret));
1170        return -1;
1171    }
1172
1173    /* Default promiscuous to on */
1174    if (format_data->promisc == -1)
1175        format_data->promisc = 1;
1176
1177    if (format_data->promisc == 1)
1178        rte_eth_promiscuous_enable(format_data->port);
1179    else
1180        rte_eth_promiscuous_disable(format_data->port);
1181
1182        /* Register a callback for link state changes */
1183        ret = rte_eth_dev_callback_register(format_data->port,
1184                                            RTE_ETH_EVENT_INTR_LSC,
1185                                            dpdk_lsc_callback,
1186                                            format_data);
1187        /* If this fails it is not a show stopper */
1188#if DEBUG
1189        fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1190                ret, strerror(-ret));
1191#endif
1192
1193    /* Get the current link status */
1194    rte_eth_link_get_nowait(format_data->port, &link_info);
1195    format_data->link_speed = link_info.link_speed;
1196#if DEBUG
1197    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1198            (int) link_info.link_duplex, (int) link_info.link_speed);
1199#endif
1200    /* We have now successfully started/unpaused */
1201    format_data->paused = DPDK_RUNNING;
1202
1203    return 0;
1204}
1205
1206/* Attach memory to the port and start (or restart) the port/s.
1207 */
1208static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) {
1209    int ret, i; /* Check return values for errors */
1210    struct rte_eth_link link_info; /* Wait for link */
1211
1212    /* Already started */
1213    if (format_data->paused == DPDK_RUNNING)
1214        return 0;
1215
1216    /* First time started we need to alloc our memory, doing this here
1217     * rather than in environment setup because we don't have snaplen then */
1218    if (format_data->paused == DPDK_NEVER_STARTED) {
1219        if (format_data->snaplen == 0) {
1220            format_data->snaplen = RX_MBUF_SIZE;
1221            port_conf.rxmode.jumbo_frame = 0;
1222            port_conf.rxmode.max_rx_pkt_len = 0;
1223        } else {
1224            /* Use jumbo frames */
1225            port_conf.rxmode.jumbo_frame = 1;
1226            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1227        }
1228
1229        /* This is additional overhead so make sure we allow space for this */
1230#if GET_MAC_CRC_CHECKSUM
1231        format_data->snaplen += ETHER_CRC_LEN;
1232#endif
1233#if HAS_HW_TIMESTAMPS_82580
1234        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1235#endif
1236
1237        /* Create the mbuf pool, which is the place our packets are allocated
1238         * from - TODO figure out if there is a free function (I cannot see one)
1239         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1240         * allocate however that extra 1 packet is not used.
1241         * (I assume <= vs < error some where in DPDK code)
1242         * TX requires nb_tx_buffers + 1 in the case the queue is full
1243         * so that will fill the new buffer and wait until slots in the
1244         * ring become available.
1245         */
1246#if DEBUG
1247    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1248#endif
1249    format_data->pktmbuf_pool =
1250            rte_mempool_create(format_data->mempool_name,
1251                       (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
1252                       format_data->snaplen + sizeof(struct rte_mbuf)
1253                                        + RTE_PKTMBUF_HEADROOM,
1254                       128, sizeof(struct rte_pktmbuf_pool_private),
1255                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1256                       format_data->nic_numa_node, 0);
1257
1258        if (format_data->pktmbuf_pool == NULL) {
1259            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1260                        "pool failed: %s", strerror(rte_errno));
1261            return -1;
1262        }
1263    }
1264
1265    /* ----------- Now do the setup for the port mapping ------------ */
1266    /* Order of calls must be
1267     * rte_eth_dev_configure()
1268     * rte_eth_tx_queue_setup()
1269     * rte_eth_rx_queue_setup()
1270     * rte_eth_dev_start()
1271     * other rte_eth calls
1272     */
1273
1274    /* This must be called first before another *eth* function
1275     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1276    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1277    if (ret < 0) {
1278        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1279                            " %"PRIu8" : %s", format_data->port,
1280                            strerror(-ret));
1281        return -1;
1282    }
1283#if DEBUG
1284    fprintf(stderr, "Doing dev configure\n");
1285#endif
1286    /* Initialise the TX queue a minimum value if using this port for
1287     * receiving. Otherwise a larger size if writing packets.
1288     */
1289    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1290                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1291    if (ret < 0) {
1292        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1293                            " %"PRIu8" : %s", format_data->port,
1294                            strerror(-ret));
1295        return -1;
1296    }
1297
1298    for (i=0; i < rx_queues; i++) {
1299#if DEBUG
1300    fprintf(stderr, "Doing queue configure\n");
1301#endif
1302
1303                /* Initialise the RX queue with some packets from memory */
1304                ret = rte_eth_rx_queue_setup(format_data->port, i,
1305                                             format_data->nb_rx_buf, format_data->nic_numa_node,
1306                                             &rx_conf, format_data->pktmbuf_pool);
1307        /* Init per_thread data structures */
1308        format_data->per_lcore[i].port = format_data->port;
1309        format_data->per_lcore[i].queue_id = i;
1310
1311                if (ret < 0) {
1312                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1313                                                " %"PRIu8" : %s", format_data->port,
1314                                                strerror(-ret));
1315                        return -1;
1316                }
1317        }
1318
1319#if DEBUG
1320    fprintf(stderr, "Doing start device\n");
1321#endif
1322    /* Start device */
1323    ret = rte_eth_dev_start(format_data->port);
1324#if DEBUG
1325    fprintf(stderr, "Done start device\n");
1326#endif
1327    if (ret < 0) {
1328        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1329                    strerror(-ret));
1330        return -1;
1331    }
1332
1333
1334    /* Default promiscuous to on */
1335    if (format_data->promisc == -1)
1336        format_data->promisc = 1;
1337
1338    if (format_data->promisc == 1)
1339        rte_eth_promiscuous_enable(format_data->port);
1340    else
1341        rte_eth_promiscuous_disable(format_data->port);
1342
1343
1344    /* We have now successfully started/unpased */
1345    format_data->paused = DPDK_RUNNING;
1346
1347    // Can use remote launch for all
1348    /*RTE_LCORE_FOREACH_SLAVE(i) {
1349                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
1350        }*/
1351
1352    /* Register a callback for link state changes */
1353    ret = rte_eth_dev_callback_register(format_data->port,
1354                                        RTE_ETH_EVENT_INTR_LSC,
1355                                        dpdk_lsc_callback,
1356                                        format_data);
1357    /* If this fails it is not a show stopper */
1358#if DEBUG
1359    fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1360            ret, strerror(-ret));
1361#endif
1362
1363    /* Get the current link status */
1364    rte_eth_link_get_nowait(format_data->port, &link_info);
1365    format_data->link_speed = link_info.link_speed;
1366#if DEBUG
1367    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1368            (int) link_info.link_duplex, (int) link_info.link_speed);
1369        struct rte_eth_rss_reta reta_conf = {0};
1370        reta_conf.mask_lo = ~reta_conf.mask_lo;
1371        reta_conf.mask_hi = ~reta_conf.mask_hi;
1372        int qew = rte_eth_dev_rss_reta_query(format_data->port, &reta_conf);
1373        fprintf(stderr, "err=%d", qew);
1374        for (i = 0; i < ETH_RSS_RETA_NUM_ENTRIES; i++) {
1375                fprintf(stderr, "[%d] = %d\n", i, (int)reta_conf.reta[i]);
1376        }
1377
1378#endif
1379
1380    return 0;
1381}
1382
1383static int dpdk_start_input (libtrace_t *libtrace) {
1384    char err[500];
1385    err[0] = 0;
1386
1387    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1388        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1389        free(libtrace->format_data);
1390        libtrace->format_data = NULL;
1391        return -1;
1392    }
1393    return 0;
1394}
1395
1396static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1397    struct rte_eth_dev_info dev_info;
1398    rte_eth_dev_info_get(port_id, &dev_info);
1399    return dev_info.max_rx_queues;
1400}
1401
1402static inline size_t dpdk_processor_count () {
1403    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1404    if (nb_cpu <= 0)
1405        return 1;
1406    else
1407        return (size_t) nb_cpu;
1408}
1409
1410static int dpdk_pstart_input (libtrace_t *libtrace) {
1411    char err[500];
1412    int i=0, phys_cores=0;
1413    int tot = libtrace->perpkt_thread_count;
1414    err[0] = 0;
1415
1416    if (rte_lcore_id() != rte_get_master_lcore())
1417        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
1418
1419    // If the master is not on the last thread we move it there
1420    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1421        // Consider error handling here
1422        dpdk_move_master_lcore(RTE_MAX_LCORE - 1);
1423    }
1424
1425    // Don't exceed the number of cores in the system/detected by dpdk
1426    // We don't have to force this but performance wont be good if we don't
1427    for (i = 0; i < RTE_MAX_LCORE; ++i) {
1428        if (lcore_config[i].detected) {
1429            if (rte_lcore_is_enabled(i))
1430                fprintf(stderr, "Found core %d already in use!\n", i);
1431            else
1432                phys_cores++;
1433        }
1434    }
1435
1436        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1437        tot = MIN(tot, phys_cores);
1438
1439        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
1440
1441    if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1442        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1443        free(libtrace->format_data);
1444        libtrace->format_data = NULL;
1445        return -1;
1446    }
1447
1448    // Make sure we only start the number that we should
1449    libtrace->perpkt_thread_count = tot;
1450    return 0;
1451}
1452
1453
1454/**
1455 * Register a thread with the DPDK system,
1456 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1457 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1458 * gives it.
1459 *
1460 * We then allow a mapper thread to be started on every real core as DPDK would,
1461 * we also bind these to the corresponding CPU cores.
1462 *
1463 * @param libtrace A pointer to the trace
1464 * @param reading True if the thread will be used to read packets, i.e. will
1465 *                call pread_packet(), false if thread used to process packet
1466 *                in any other manner including statistics functions.
1467 */
1468static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1469{
1470    struct rte_config *cfg = rte_eal_get_configuration();
1471    int i;
1472    int new_id = -1;
1473
1474    // If 'reading packets' fill in cores from 0 up and bind affinity
1475    // otherwise start from the MAX core (which is also the master) and work backwards
1476    // in this case physical cores on the system will not exist so we don't bind
1477    // these to any particular physical core
1478    pthread_mutex_lock(&libtrace->libtrace_lock);
1479    if (reading) {
1480#if HAVE_LIBNUMA
1481        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1482                if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
1483                                new_id = i;
1484                        if (!lcore_config[i].detected)
1485                                new_id = -1;
1486                        break;
1487                }
1488        }
1489#endif
1490        /* Retry without the the numa restriction */
1491        if (new_id == -1) {
1492                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1493                                if (!rte_lcore_is_enabled(i)) {
1494                                        new_id = i;
1495                                if (!lcore_config[i].detected)
1496                                        fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
1497                                break;
1498                        }
1499                }
1500        }
1501    } else {
1502        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1503            if (!rte_lcore_is_enabled(i)) {
1504                new_id = i;
1505                break;
1506            }
1507        }
1508    }
1509
1510    if (new_id == -1) {
1511        assert(cfg->lcore_count == RTE_MAX_LCORE);
1512        // TODO proper libtrace style error here!!
1513        fprintf(stderr, "Too many threads for DPDK!!\n");
1514        pthread_mutex_unlock(&libtrace->libtrace_lock);
1515        return -1;
1516    }
1517
1518    // Enable the core in global DPDK structs
1519    cfg->lcore_role[new_id] = ROLE_RTE;
1520    cfg->lcore_count++;
1521    // Set TLS to reflect our new number
1522    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
1523    fprintf(stderr, "original id%d", rte_lcore_id());
1524    RTE_PER_LCORE(_lcore_id) = new_id;
1525        char name[99];
1526        pthread_getname_np(pthread_self(),
1527                              name, sizeof(name));
1528
1529    fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
1530
1531    if (reading) {
1532        // Set affinity bind to corresponding core
1533        cpu_set_t cpuset;
1534        CPU_ZERO(&cpuset);
1535        CPU_SET(rte_lcore_id(), &cpuset);
1536        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1537        if (i != 0) {
1538            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
1539            pthread_mutex_unlock(&libtrace->libtrace_lock);
1540            return -1;
1541        }
1542    }
1543
1544    // Map our TLS to the thread data
1545    if (reading) {
1546        if(t->type == THREAD_PERPKT) {
1547            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
1548        } else {
1549            t->format_data = &FORMAT(libtrace)->per_lcore[0];
1550        }
1551    }
1552    pthread_mutex_unlock(&libtrace->libtrace_lock);
1553    return 0;
1554}
1555
1556
1557/**
1558 * Unregister a thread with the DPDK system.
1559 *
1560 * Only previously registered threads should be calling this just before
1561 * they are destroyed.
1562 */
1563static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1564{
1565    struct rte_config *cfg = rte_eal_get_configuration();
1566
1567    assert(rte_lcore_id() < RTE_MAX_LCORE);
1568    pthread_mutex_lock(&libtrace->libtrace_lock);
1569    // Skip if master!!
1570    if (rte_lcore_id() == rte_get_master_lcore()) {
1571        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1572        pthread_mutex_unlock(&libtrace->libtrace_lock);
1573        return;
1574    }
1575
1576    // Disable this core in global DPDK structs
1577    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1578    cfg->lcore_count--;
1579    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1580    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1581    pthread_mutex_unlock(&libtrace->libtrace_lock);
1582    return;
1583}
1584
1585static int dpdk_start_output(libtrace_out_t *libtrace)
1586{
1587    char err[500];
1588    err[0] = 0;
1589
1590    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1591        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1592        free(libtrace->format_data);
1593        libtrace->format_data = NULL;
1594        return -1;
1595    }
1596    return 0;
1597}
1598
1599static int dpdk_pause_input(libtrace_t * libtrace) {
1600    /* This stops the device, but can be restarted using rte_eth_dev_start() */
1601    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1602#if DEBUG
1603        fprintf(stderr, "Pausing DPDK port\n");
1604#endif
1605        rte_eth_dev_stop(FORMAT(libtrace)->port);
1606        FORMAT(libtrace)->paused = DPDK_PAUSED;
1607        /* Empty the queue of packets */
1608        for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1609                rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1610        }
1611        FORMAT(libtrace)->burst_offset = 0;
1612        FORMAT(libtrace)->burst_size = 0;
1613        /* If we pause it the driver will be reset and likely our counter */
1614
1615        FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0;
1616#if HAS_HW_TIMESTAMPS_82580
1617        FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0;
1618#endif
1619    }
1620    return 0;
1621}
1622
1623static int dpdk_write_packet(libtrace_out_t *trace,
1624                libtrace_packet_t *packet){
1625    struct rte_mbuf* m_buff[1];
1626
1627    int wirelen = trace_get_wire_length(packet);
1628    int caplen = trace_get_capture_length(packet);
1629
1630    /* Check for a checksum and remove it */
1631    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1632                                            wirelen == caplen)
1633        caplen -= ETHER_CRC_LEN;
1634
1635    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1636    if (m_buff[0] == NULL) {
1637        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1638        return -1;
1639    } else {
1640        int ret;
1641        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1642        do {
1643            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1644        } while (ret != 1);
1645    }
1646
1647    return 0;
1648}
1649
1650static int dpdk_fin_input(libtrace_t * libtrace) {
1651    /* Free our memory structures */
1652    if (libtrace->format_data != NULL) {
1653        /* Close the device completely, device cannot be restarted */
1654        if (FORMAT(libtrace)->port != 0xFF)
1655                rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1656                                                RTE_ETH_EVENT_INTR_LSC,
1657                                                dpdk_lsc_callback,
1658                                                FORMAT(libtrace));
1659                rte_eth_dev_close(FORMAT(libtrace)->port);
1660                /* filter here if we used it */
1661                free(libtrace->format_data);
1662        }
1663
1664    /* Revert to the original PCI drivers */
1665    /* No longer in DPDK
1666    rte_eal_pci_exit(); */
1667    return 0;
1668}
1669
1670
1671static int dpdk_fin_output(libtrace_out_t * libtrace) {
1672    /* Free our memory structures */
1673    if (libtrace->format_data != NULL) {
1674        /* Close the device completely, device cannot be restarted */
1675        if (FORMAT(libtrace)->port != 0xFF)
1676            rte_eth_dev_close(FORMAT(libtrace)->port);
1677        /* filter here if we used it */
1678                free(libtrace->format_data);
1679        }
1680
1681    /* Revert to the original PCI drivers */
1682    /* No longer in DPDK
1683    rte_eal_pci_exit(); */
1684    return 0;
1685}
1686
1687/**
1688 * Get the start of the additional header that we added to a packet.
1689 */
1690static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1691    assert(packet);
1692    assert(packet->buffer);
1693    /* Our header sits straight after the mbuf header */
1694    return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1695}
1696
1697static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1698    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1699    return hdr->cap_len;
1700}
1701
1702static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1703    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1704    if (size > hdr->cap_len) {
1705        /* Cannot make a packet bigger */
1706                return trace_get_capture_length(packet);
1707        }
1708
1709    /* Reset the cached capture length first*/
1710    packet->capture_length = -1;
1711    hdr->cap_len = (uint32_t) size;
1712        return trace_get_capture_length(packet);
1713}
1714
1715static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1716    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1717    int org_cap_size; /* The original capture size */
1718    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1719        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1720                            sizeof(struct hw_timestamp_82580);
1721    } else {
1722        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1723    }
1724    if (hdr->flags & INCLUDES_CHECKSUM) {
1725        return org_cap_size;
1726    } else {
1727        /* DPDK packets are always TRACE_TYPE_ETH packets */
1728        return org_cap_size + ETHER_CRC_LEN;
1729    }
1730}
1731static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1732    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1733    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1734        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1735                sizeof(struct hw_timestamp_82580);
1736    else
1737        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1738}
1739
1740static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1741                libtrace_packet_t *packet, void *buffer,
1742                libtrace_rt_types_t rt_type, uint32_t flags) {
1743    assert(packet);
1744    if (packet->buffer != buffer &&
1745        packet->buf_control == TRACE_CTRL_PACKET) {
1746        free(packet->buffer);
1747    }
1748
1749    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1750        packet->buf_control = TRACE_CTRL_PACKET;
1751    } else
1752        packet->buf_control = TRACE_CTRL_EXTERNAL;
1753
1754    packet->buffer = buffer;
1755    packet->header = buffer;
1756
1757    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1758    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1759    packet->type = rt_type;
1760    return 0;
1761}
1762
1763
1764/**
1765 * Given a packet size and a link speed, computes the
1766 * time to transmit in nanoseconds.
1767 *
1768 * @param format_data The dpdk format data from which we get the link speed
1769 *        and if unset updates it in a thread safe manner
1770 * @param pkt_size The size of the packet in bytes
1771 * @return The wire time in nanoseconds
1772 */
1773static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1774        uint32_t wire_time;
1775        /* 20 extra bytes of interframe gap and preamble */
1776# if GET_MAC_CRC_CHECKSUM
1777        wire_time = ((pkt_size + 20) * 8000);
1778# else
1779        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1780# endif
1781
1782        /* Division is really slow and introduces a pipeline stall
1783         * The compiler will optimise this into magical multiplication and shifting
1784         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1785         */
1786retry_calc_wiretime:
1787        switch (format_data->link_speed) {
1788        case ETH_LINK_SPEED_40G:
1789                wire_time /=  ETH_LINK_SPEED_40G;
1790                break;
1791        case ETH_LINK_SPEED_20G:
1792                wire_time /= ETH_LINK_SPEED_20G;
1793                break;
1794        case ETH_LINK_SPEED_10G:
1795                wire_time /= ETH_LINK_SPEED_10G;
1796                break;
1797        case ETH_LINK_SPEED_1000:
1798                wire_time /= ETH_LINK_SPEED_1000;
1799                break;
1800        case 0:
1801                {
1802                /* Maybe the link was down originally, but now it should be up */
1803                struct rte_eth_link link = {0};
1804                rte_eth_link_get_nowait(format_data->port, &link);
1805                if (link.link_status && link.link_speed) {
1806                        format_data->link_speed = link.link_speed;
1807#ifdef DEBUG
1808                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1809#endif
1810                        goto retry_calc_wiretime;
1811                }
1812                /* We don't know the link speed, make sure numbers are counting up */
1813                wire_time = 1;
1814                break;
1815                }
1816        default:
1817                wire_time /= format_data->link_speed;
1818        }
1819        return wire_time;
1820}
1821
1822
1823
1824/*
1825 * Does any extra preperation to all captured packets
1826 * This includes adding our extra header to it with the timestamp,
1827 * and any snapping
1828 *
1829 * @param format_data The DPDK format data
1830 * @param plc The DPDK per lcore format data
1831 * @param pkts An array of size nb_pkts of DPDK packets
1832 * @param nb_pkts The number of packets in pkts and optionally packets
1833 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
1834 */
1835static inline void dpdk_ready_pkts(struct dpdk_format_data_t *format_data, struct dpdk_per_lcore_t *plc,
1836                                   struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
1837        struct dpdk_addt_hdr *hdr;
1838        size_t i;
1839        uint64_t cur_sys_time_ns;
1840#if HAS_HW_TIMESTAMPS_82580
1841        struct hw_timestamp_82580 *hw_ts;
1842        uint64_t estimated_wraps;
1843#else
1844
1845#endif
1846
1847#if USE_CLOCK_GETTIME
1848        struct timespec cur_sys_time = {0};
1849        /* This looks terrible and I feel bad doing it. But it's OK
1850         * on new kernels, because this is a fast vsyscall */
1851        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1852        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1853#else
1854        struct timeval cur_sys_time = {0};
1855        /* Also a fast vsyscall */
1856        gettimeofday(&cur_sys_time, NULL);
1857        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1858#endif
1859
1860        /* The system clock is not perfect so when running
1861         * at linerate we could timestamp a packet in the past.
1862         * To avoid this we munge the timestamp to appear 1ns
1863         * after the previous packet. We should eventually catch up
1864         * to system time since a 64byte packet on a 10G link takes 67ns.
1865         *
1866         * Note with parallel readers timestamping packets
1867         * with duplicate stamps or out of order is unavoidable without
1868         * hardware timestamping from the NIC.
1869         */
1870#if !HAS_HW_TIMESTAMPS_82580
1871        if (plc->ts_last_sys >= cur_sys_time_ns) {
1872                cur_sys_time_ns = plc->ts_last_sys + 1;
1873        }
1874#endif
1875
1876        assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); // TODO static compile time assert sometime??
1877        for (i = 0 ; i < nb_pkts ; ++i) {
1878
1879                /* We put our header straight after the dpdk header */
1880                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1881                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1882
1883#if GET_MAC_CRC_CHECKSUM
1884<<<<<<< HEAD
1885                /* Add back in the CRC sum */
1886                pkts[i]->pkt.pkt_len += ETHER_CRC_LEN;
1887                pkts[i]->pkt.data_len += ETHER_CRC_LEN;
1888                hdr->flags |= INCLUDES_CHECKSUM;
1889=======
1890    /* Add back in the CRC sum */
1891    rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1892    rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1893    hdr->flags |= INCLUDES_CHECKSUM;
1894>>>>>>> master
1895#endif
1896
1897                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1898
1899#if HAS_HW_TIMESTAMPS_82580
1900                /* The timestamp is sitting before our packet and is included in pkt_len */
1901                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1902                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1903                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1904
1905                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1906                 *
1907                 *        +----------+---+   +--------------+
1908                 *  82580 |    24    | 8 |   |      32      |
1909                 *        +----------+---+   +--------------+
1910                 *          reserved  \______ 40 bits _____/
1911                 *
1912                 * The 40 bit 82580 SYSTIM overflows every
1913                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1914                 *
1915                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1916                 * Endian (for the full 64 bits) i.e. picture is mirrored
1917                 */
1918
1919                /* Despite what the documentation says this is in Little
1920                 * Endian byteorder. Mask the reserved section out.
1921                 */
1922                hdr->timestamp = le64toh(hw_ts->timestamp) &
1923                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1924
1925                if (unlikely(plc->ts_first_sys == 0)) {
1926                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1927                        plc->ts_last_sys = plc->ts_first_sys;
1928                }
1929
1930                /* This will have serious problems if packets aren't read quickly
1931                 * that is within a couple of seconds because our clock cycles every
1932                 * 18 seconds */
1933                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1934                                  / (1ull<<TS_NBITS_82580);
1935
1936                /* Estimated_wraps gives the number of times the counter should have
1937                 * wrapped (however depending on value last time it could have wrapped
1938                 * twice more (if hw clock is close to its max value) or once less (allowing
1939                 * for a bit of variance between hw and sys clock). But if the clock
1940                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1941                if (unlikely(estimated_wraps >= 2)) {
1942                        /* 2 or more wrap arounds add all but the very last wrap */
1943                        plc->wrap_count += estimated_wraps - 1;
1944                }
1945
1946                /* Set the timestamp to the lowest possible value we're considering */
1947                hdr->timestamp += plc->ts_first_sys +
1948                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1949
1950                /* In most runs only the first if() will need evaluating - i.e our
1951                 * estimate is correct. */
1952                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1953                                              hdr->timestamp, MAXSKEW_82580))) {
1954                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1955                        plc->wrap_count++;
1956                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1957                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1958                                             hdr->timestamp, MAXSKEW_82580)) {
1959                                /* Failed to match estimated_wraps */
1960                                plc->wrap_count++;
1961                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1962                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1963                                                     hdr->timestamp, MAXSKEW_82580)) {
1964                                        if (estimated_wraps == 0) {
1965                                                /* 0 case Failed to match estimated_wraps+2 */
1966                                                printf("WARNING - Hardware Timestamp failed to"
1967                                                       " match using systemtime!\n");
1968                                                hdr->timestamp = cur_sys_time_ns;
1969                                        } else {
1970                                                /* Failed to match estimated_wraps+1 */
1971                                                plc->wrap_count++;
1972                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1973                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1974                                                                     hdr->timestamp, MAXSKEW_82580)) {
1975                                                        /* Failed to match estimated_wraps+2 */
1976                                                        printf("WARNING - Hardware Timestamp failed to"
1977                                                               " match using systemtime!!\n");
1978                                                }
1979                                        }
1980                                }
1981                        }
1982                }
1983#else
1984
1985                hdr->timestamp = cur_sys_time_ns;
1986                /* Offset the next packet by the wire time of previous */
1987                calculate_wire_time(format_data, hdr->cap_len);
1988
1989#endif
1990                if(packets) {
1991                        packets[i]->buffer = pkts[i];
1992                        packets[i]->header = pkts[i];
1993#if HAS_HW_TIMESTAMPS_82580
1994                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
1995                                              RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580);
1996#else
1997                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
1998                                              RTE_PKTMBUF_HEADROOM;
1999#endif
2000                        packets[i]->error = 1;
2001                }
2002        }
2003
2004        plc->ts_last_sys = cur_sys_time_ns;
2005
2006        return;
2007}
2008
2009
2010static void dpdk_fin_packet(libtrace_packet_t *packet)
2011{
2012        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2013                rte_pktmbuf_free(packet->buffer);
2014                packet->buffer = NULL;
2015        }
2016}
2017
2018
2019static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2020    int nb_rx; /* Number of rx packets we've received */
2021
2022    /* Free the last packet buffer */
2023    if (packet->buffer != NULL) {
2024        /* Buffer is owned by DPDK */
2025        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2026            rte_pktmbuf_free(packet->buffer);
2027            packet->buffer = NULL;
2028        } else
2029        /* Buffer is owned by packet i.e. has been malloc'd */
2030        if (packet->buf_control == TRACE_CTRL_PACKET) {
2031            free(packet->buffer);
2032            packet->buffer = NULL;
2033        }
2034    }
2035
2036    packet->buf_control = TRACE_CTRL_EXTERNAL;
2037    packet->type = TRACE_RT_DATA_DPDK;
2038
2039    /* Check if we already have some packets buffered */
2040    if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2041            packet->buffer =  FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2042            dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2043            return 1; // TODO should be bytes read, which essentially useless anyway
2044    }
2045    /* Wait for a packet */
2046    while (1) {
2047        /* Poll for a single packet */
2048        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2049                                 FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2050        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
2051                FORMAT(libtrace)->burst_size = nb_rx;
2052                FORMAT(libtrace)->burst_offset = 1;
2053                dpdk_ready_pkts(FORMAT(libtrace), &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
2054                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2055                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2056                return 1; // TODO should be bytes read, which essentially useless anyway
2057        }
2058        if (libtrace_halt) {
2059                return 0;
2060        }
2061        /* Wait a while, polling on memory degrades performance
2062         * This relieves the pressure on memory allowing the NIC to DMA */
2063        rte_delay_us(10);
2064    }
2065
2066    /* We'll never get here - but if we did it would be bad */
2067    return -1;
2068}
2069
2070static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
2071    size_t nb_rx; /* Number of rx packets we've recevied */
2072    struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2073    size_t i;
2074
2075    for (i = 0 ; i < nb_packets ; ++i) {
2076            /* Free the last packet buffer */
2077            if (packets[i]->buffer != NULL) {
2078                /* Buffer is owned by DPDK */
2079                if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) {
2080                    rte_pktmbuf_free(packets[i]->buffer);
2081                    packets[i]->buffer = NULL;
2082                } else
2083                /* Buffer is owned by packet i.e. has been malloc'd */
2084                if (packets[i]->buf_control == TRACE_CTRL_PACKET) {
2085                    free(packets[i]->buffer);
2086                    packets[i]->buffer = NULL;
2087                }
2088            }
2089            packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2090            packets[i]->type = TRACE_RT_DATA_DPDK;
2091    }
2092
2093    /* Wait for a packet */
2094    while (1) {
2095        /* Poll for a single packet */
2096        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
2097                            PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets);
2098        if (nb_rx > 0) {
2099                /* Got some packets - otherwise we keep spining */
2100                //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2101                dpdk_ready_pkts(FORMAT(libtrace), PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
2102                return nb_rx;
2103        }
2104        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
2105        if (libtrace_message_queue_count(&t->messages) > 0) {
2106                printf("Extra message yay");
2107                return -2;
2108        }
2109        if (libtrace_halt) {
2110                return 0;
2111        }
2112        /* Wait a while, polling on memory degrades performance
2113         * This relieves the pressure on memory allowing the NIC to DMA */
2114        rte_delay_us(10);
2115    }
2116
2117    /* We'll never get here - but if we did it would be bad */
2118    return -1;
2119}
2120
2121static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2122    struct timeval tv;
2123    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2124
2125    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2126    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2127    return tv;
2128}
2129
2130static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2131    struct timespec ts;
2132    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2133
2134    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2135    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2136    return ts;
2137}
2138
2139static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2140    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2141}
2142
2143static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2144    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2145    return (libtrace_direction_t) hdr->direction;
2146}
2147
2148static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2149    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2150    hdr->direction = (uint8_t) direction;
2151    return (libtrace_direction_t) hdr->direction;
2152}
2153
2154/*
2155 * NOTE: Drops could occur for other reasons than running out of buffer
2156 * space. Such as failed MAC checksums and oversized packets.
2157 */
2158static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
2159    struct rte_eth_stats stats = {0};
2160
2161    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2162        return UINT64_MAX;
2163    /* Grab the current stats */
2164    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2165
2166    /* Get the drop counter */
2167    return (uint64_t) stats.ierrors;
2168}
2169
2170static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
2171    struct rte_eth_stats stats = {0};
2172
2173    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2174        return UINT64_MAX;
2175    /* Grab the current stats */
2176    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2177
2178    /* Get the drop counter */
2179    return (uint64_t) stats.ipackets;
2180}
2181
2182/*
2183 * This is the number of packets filtered by the NIC
2184 * and maybe ahead of number read using libtrace.
2185 *
2186 * XXX we are yet to implement any filtering, but if it was this should
2187 * get the result. So this will just return 0 for now.
2188 */
2189static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
2190    struct rte_eth_stats stats = {0};
2191
2192    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2193        return UINT64_MAX;
2194    /* Grab the current stats */
2195    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2196
2197    /* Get the drop counter */
2198    return (uint64_t) stats.fdirmiss;
2199}
2200
2201/* Attempts to read a packet in a non-blocking fashion. If one is not
2202 * available a SLEEP event is returned. We do not have the ability to
2203 * create a select()able file descriptor in DPDK.
2204 */
2205static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2206                                        libtrace_packet_t *packet) {
2207    libtrace_eventobj_t event = {0,0,0.0,0};
2208    int nb_rx; /* Number of receive packets we've read */
2209    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2210
2211    do {
2212
2213        /* See if we already have a packet waiting */
2214        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2215                        FORMAT(trace)->queue_id, pkts_burst, 1);
2216
2217        if (nb_rx > 0) {
2218            /* Free the last packet buffer */
2219            if (packet->buffer != NULL) {
2220                /* Buffer is owned by DPDK */
2221                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2222                    rte_pktmbuf_free(packet->buffer);
2223                    packet->buffer = NULL;
2224                } else
2225                /* Buffer is owned by packet i.e. has been malloc'd */
2226                if (packet->buf_control == TRACE_CTRL_PACKET) {
2227                    free(packet->buffer);
2228                    packet->buffer = NULL;
2229                }
2230            }
2231
2232            packet->buf_control = TRACE_CTRL_EXTERNAL;
2233            packet->type = TRACE_RT_DATA_DPDK;
2234            event.type = TRACE_EVENT_PACKET;
2235            dpdk_ready_pkts(FORMAT(trace), &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
2236            event.size = 1; // TODO should be bytes read, which essentially useless anyway
2237
2238            /* XXX - Check this passes the filter trace_read_packet normally
2239             * does this for us but this wont */
2240            if (trace->filter) {
2241                if (!trace_apply_filter(trace->filter, packet)) {
2242                    /* Failed the filter so we loop for another packet */
2243                    trace->filtered_packets ++;
2244                    continue;
2245                }
2246            }
2247            trace->accepted_packets ++;
2248        } else {
2249            /* We only want to sleep for a very short time - we are non-blocking */
2250            event.type = TRACE_EVENT_SLEEP;
2251            event.seconds = 0.0001;
2252            event.size = 0;
2253        }
2254
2255        /* If we get here we have our event */
2256        break;
2257    } while (1);
2258
2259    return event;
2260}
2261
2262
2263static void dpdk_help(void) {
2264    printf("dpdk format module: $Revision: 1752 $\n");
2265    printf("Supported input URIs:\n");
2266    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2267    printf("\tThe -<coreid> is optional \n");
2268    printf("\t e.g. dpdk:0000:01:00.1\n");
2269    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2270    printf("\t By default the last CPU core is used if not otherwise specified.\n");
2271    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2272    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2273    printf("\n");
2274    printf("Supported output URIs:\n");
2275    printf("\tSame format as the input URI.\n");
2276    printf("\t e.g. dpdk:0000:01:00.1\n");
2277    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2278    printf("\n");
2279}
2280
2281static struct libtrace_format_t dpdk = {
2282        "dpdk",
2283        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
2284        TRACE_FORMAT_DPDK,
2285        NULL,                   /* probe filename */
2286        NULL,                               /* probe magic */
2287        dpdk_init_input,            /* init_input */
2288        dpdk_config_input,          /* config_input */
2289        dpdk_start_input,           /* start_input */
2290        dpdk_pause_input,           /* pause_input */
2291        dpdk_init_output,           /* init_output */
2292        NULL,                               /* config_output */
2293        dpdk_start_output,          /* start_ouput */
2294        dpdk_fin_input,             /* fin_input */
2295        dpdk_fin_output,        /* fin_output */
2296        dpdk_read_packet,           /* read_packet */
2297        dpdk_prepare_packet,    /* prepare_packet */
2298        dpdk_fin_packet,                                    /* fin_packet */
2299        dpdk_write_packet,          /* write_packet */
2300        dpdk_get_link_type,         /* get_link_type */
2301        dpdk_get_direction,         /* get_direction */
2302        dpdk_set_direction,         /* set_direction */
2303        NULL,                               /* get_erf_timestamp */
2304        dpdk_get_timeval,           /* get_timeval */
2305        dpdk_get_timespec,          /* get_timespec */
2306        NULL,                               /* get_seconds */
2307        NULL,                               /* seek_erf */
2308        NULL,                               /* seek_timeval */
2309        NULL,                               /* seek_seconds */
2310        dpdk_get_capture_length,/* get_capture_length */
2311        dpdk_get_wire_length,   /* get_wire_length */
2312        dpdk_get_framing_length,/* get_framing_length */
2313        dpdk_set_capture_length,/* set_capture_length */
2314        NULL,                               /* get_received_packets */
2315        dpdk_get_filtered_packets,/* get_filtered_packets */
2316        dpdk_get_dropped_packets,/* get_dropped_packets */
2317    dpdk_get_captured_packets,/* get_captured_packets */
2318        NULL,                       /* get_fd */
2319        dpdk_trace_event,               /* trace_event */
2320    dpdk_help,              /* help */
2321    NULL,                   /* next pointer */
2322    {true, 8},              /* Live, NICs typically have 8 threads */
2323    dpdk_pstart_input, /* pstart_input */
2324        dpdk_pread_packets, /* pread_packets */
2325        dpdk_pause_input, /* ppause */
2326        dpdk_fin_input, /* p_fin */
2327        dpdk_pconfig_input, /* pconfig_input */
2328    dpdk_pregister_thread, /* pregister_thread */
2329    dpdk_punregister_thread /* unpregister_thread */
2330};
2331
2332void dpdk_constructor(void) {
2333        register_format(&dpdk);
2334}
Note: See TracBrowser for help on using the repository browser.