source: lib/format_dpdk.c @ 6c09048

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 6c09048 was 6c09048, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Fix checking for clock_gettime for the DPDK format

  • Property mode set to 100644
File size: 69.3 KB
Line 
1
2/*
3 * This file is part of libtrace
4 *
5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
6 * New Zealand.
7 *
8 * Author: Richard Sanger
9 *
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
30 *
31 */
32
33/* This format module deals with using the Intel Data Plane Development
34 * Kit capture format.
35 *
36 * Intel Data Plane Development Kit is a LIVE capture format.
37 *
38 * This format also supports writing which will write packets out to the
39 * network as a form of packet replay. This should not be confused with the
40 * RT protocol which is intended to transfer captured packet records between
41 * RT-speaking programs.
42 */
43
44#define _GNU_SOURCE
45
46#include "config.h"
47#include "libtrace.h"
48#include "libtrace_int.h"
49#include "format_helper.h"
50#include "libtrace_arphrd.h"
51#include "hash_toeplitz.h"
52
53#ifdef HAVE_INTTYPES_H
54#  include <inttypes.h>
55#else
56# error "Can't find inttypes.h"
57#endif
58
59#include <stdlib.h>
60#include <assert.h>
61#include <unistd.h>
62#include <endian.h>
63#include <string.h>
64
65#if HAVE_LIBNUMA
66#include <numa.h>
67#endif
68
69/* We can deal with any minor differences by checking the RTE VERSION
70 * Typically DPDK backports some fixes (typically for building against
71 * newer kernels) to the older version of DPDK.
72 *
73 * These get released with the rX suffix. The following macros where added
74 * in these new releases.
75 *
76 * Below this is a log of version that required changes to the libtrace
77 * code (that we still attempt to support).
78 *
79 * DPDK v1.7.1 is recommended.
80 * However 1.5 to 1.8 are likely supported.
81 */
82#include <rte_eal.h>
83#include <rte_version.h>
84#ifndef RTE_VERSION_NUM
85#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
86#endif
87#ifndef RTE_VER_PATCH_RELEASE
88#       define RTE_VER_PATCH_RELEASE 0
89#endif
90#ifndef RTE_VERSION
91#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
92        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
93#endif
94
95/* 1.6.0r2 :
96 *      rte_eal_pci_set_blacklist() is removed
97 *      device_list is renamed to pci_device_list
98 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
99 *      as such we do apply the whitelist before rte_eal_init.
100 *      This also works correctly with DPDK 1.6.0r2.
101 *
102 * Replaced by:
103 *      rte_devargs (we can simply whitelist)
104 */
105#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
106#       define DPDK_USE_BLACKLIST 1
107#else
108#       define DPDK_USE_BLACKLIST 0
109#endif
110
111/*
112 * 1.7.0 :
113 *      rte_pmd_init_all is removed
114 *
115 * Replaced by:
116 *      Nothing, no longer needed
117 */
118#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
119#       define DPDK_USE_PMD_INIT 1
120#else
121#       define DPDK_USE_PMD_INIT 0
122#endif
123
124/* 1.7.0-rc3 :
125 *
126 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
127 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
128 * it twice.
129 */
130#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
131#       define DPDK_USE_PCI_PROBE 1
132#else
133#       define DPDK_USE_PCI_PROBE 0
134#endif
135
136/* 1.8.0-rc1 :
137 * LOG LEVEL is a command line option which overrides what
138 * we previously set it to.
139 */
140#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
141#       define DPDK_USE_LOG_LEVEL 1
142#else
143#       define DPDK_USE_LOG_LEVEL 0
144#endif
145
146#include <rte_per_lcore.h>
147#include <rte_debug.h>
148#include <rte_errno.h>
149#include <rte_common.h>
150#include <rte_log.h>
151#include <rte_memcpy.h>
152#include <rte_prefetch.h>
153#include <rte_branch_prediction.h>
154#include <rte_pci.h>
155#include <rte_ether.h>
156#include <rte_ethdev.h>
157#include <rte_ring.h>
158#include <rte_mempool.h>
159#include <rte_mbuf.h>
160#include <rte_launch.h>
161#include <rte_lcore.h>
162#include <rte_per_lcore.h>
163#include <rte_cycles.h>
164#include <pthread.h>
165
166/* The default size of memory buffers to use - This is the max size of standard
167 * ethernet packet less the size of the MAC CHECKSUM */
168#define RX_MBUF_SIZE 1514
169
170/* The minimum number of memory buffers per queue tx or rx. Search for
171 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
172 */
173#define MIN_NB_BUF 64
174
175/* Number of receive memory buffers to use
176 * By default this is limited by driver to 4k and must be a multiple of 128.
177 * A modification can be made to the driver to remove this limit.
178 * This can be increased in the driver and here.
179 * Should be at least MIN_NB_BUF.
180 */
181#define NB_RX_MBUF 4096
182
183/* Number of send memory buffers to use.
184 * Same limits apply as those to NB_TX_MBUF.
185 */
186#define NB_TX_MBUF 1024
187
188/* The size of the PCI blacklist needs to be big enough to contain
189 * every PCI device address (listed by lspci every bus:device.function tuple).
190 */
191#define BLACK_LIST_SIZE 50
192
193/* The maximum number of characters the mempool name can be */
194#define MEMPOOL_NAME_LEN 20
195
196/* For single threaded libtrace we read packets as a batch/burst
197 * this is the maximum size of said burst */
198#define BURST_SIZE 50
199
200#define MBUF(x) ((struct rte_mbuf *) x)
201/* Get the original placement of the packet data */
202#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
203#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
204#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
205
206#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
207#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
208
209#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
210                        (uint64_t) tv.tv_usec*1000ull)
211#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
212                        (uint64_t) ts.tv_nsec)
213
214#if RTE_PKTMBUF_HEADROOM != 128
215#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
216         "any libtrace instance processing these packet must be have the" \
217         "same RTE_PKTMBUF_HEADROOM set"
218#endif
219
220/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
221 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
222 *
223 * Make sure you understand what these are doing before enabling them.
224 * They might make traces incompatable with other builds etc.
225 *
226 * These are also included to show how to do somethings which aren't
227 * obvious in the DPDK documentation.
228 */
229
230/* Print verbose messages to stderr */
231#define DEBUG 0
232
233/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
234 * only turn on if you know clock_gettime is a vsyscall on your system
235 * overwise could be a large overhead. Again gettimeofday() should be
236 * vsyscall also if it's not you should seriously consider updating your
237 * kernel.
238 */
239#ifdef HAVE_CLOCK_GETTIME
240/* You can turn this on (set to 1) to prefer clock_gettime */
241#define USE_CLOCK_GETTIME 1
242#else
243/* DON'T CHANGE THIS !!! */
244#define USE_CLOCK_GETTIME 0
245#endif
246
247/* This is fairly safe to turn on - currently there appears to be a 'bug'
248 * in DPDK that will remove the checksum by making the packet appear 4bytes
249 * smaller than what it really is. Most formats don't include the checksum
250 * hence writing out a port such as int: ring: and dpdk: assumes there
251 * is no checksum and will attempt to write the checksum as part of the
252 * packet
253 */
254#define GET_MAC_CRC_CHECKSUM 0
255
256/* This requires a modification of the pmd drivers (inside Intel DPDK)
257 * TODO this requires updating (packet sizes are wrong TS most likely also)
258 */
259#define HAS_HW_TIMESTAMPS_82580 0
260
261#if HAS_HW_TIMESTAMPS_82580
262# define TS_NBITS_82580     40
263/* The maximum on the +ve or -ve side that we can be, make it half way */
264# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
265#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
266#endif
267
268/* As per Intel 82580 specification - mismatch in 82580 datasheet
269 * it states ts is stored in Big Endian, however its actually Little */
270struct hw_timestamp_82580 {
271        uint64_t reserved;
272        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
273};
274
275enum paused_state {
276        DPDK_NEVER_STARTED,
277        DPDK_RUNNING,
278        DPDK_PAUSED,
279};
280
281struct dpdk_per_stream_t
282{
283        uint16_t queue_id;
284        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
285#if HAS_HW_TIMESTAMPS_82580
286        /* Timestamping only relevent to RX */
287        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
288        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
289#endif
290} ALIGN_STRUCT(CACHE_LINE_SIZE);
291
292typedef struct dpdk_per_stream_t dpdk_per_stream_t;
293
294/* Used by both input and output however some fields are not used
295 * for output */
296struct dpdk_format_data_t {
297        int8_t promisc; /* promiscuous mode - RX only */
298        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
299        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
300        uint8_t paused; /* See paused_state */
301        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
302        int snaplen; /* The snap length for the capture - RX only */
303        /* We always have to setup both rx and tx queues even if we don't want them */
304        int nb_rx_buf; /* The number of packet buffers in the rx ring */
305        int nb_tx_buf; /* The number of packet buffers in the tx ring */
306        int nic_numa_node; /* The NUMA node that the NIC is attached to */
307        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
308#if DPDK_USE_BLACKLIST
309        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
310        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
311#endif
312        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
313        uint8_t rss_key[40]; // This is the RSS KEY
314        /* To improve single-threaded performance we always batch reading
315         * packets, in a burst, otherwise the parallel library does this for us */
316        struct rte_mbuf* burst_pkts[BURST_SIZE];
317        int burst_size; /* The total number read in the burst */
318        int burst_offset; /* The offset we are into the burst */
319
320        /* Our parallel streams */
321        libtrace_list_t *per_stream;
322};
323
324enum dpdk_addt_hdr_flags {
325        INCLUDES_CHECKSUM = 0x1,
326        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
327};
328
329/**
330 * A structure placed in front of the packet where we can store
331 * additional information about the given packet.
332 * +--------------------------+
333 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
334 * +--------------------------+
335 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
336 * +--------------------------+
337 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
338 * +--------------------------+
339 * *   hw_timestamp_82580     * 16 bytes Optional
340 * +--------------------------+
341 * |       Packet data        | Variable Size
342 * |                          |
343 */
344struct dpdk_addt_hdr {
345        uint64_t timestamp;
346        uint8_t flags;
347        uint8_t direction;
348        uint8_t reserved1;
349        uint8_t reserved2;
350        uint32_t cap_len; /* The size to say the capture is */
351};
352
353/**
354 * We want to blacklist all devices except those on the whitelist
355 * (I say list, but yes it is only the one).
356 *
357 * The default behaviour of rte_pci_probe() will map every possible device
358 * to its DPDK driver. The DPDK driver will take the ethernet device
359 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
360 *
361 * So blacklist all devices except the one that we wish to use so that
362 * the others can still be used as standard ethernet ports.
363 *
364 * @return 0 if successful, otherwise -1 on error.
365 */
366#if DPDK_USE_BLACKLIST
367static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
368{
369        struct rte_pci_device *dev = NULL;
370        format_data->nb_blacklist = 0;
371
372        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
373
374        TAILQ_FOREACH(dev, &device_list, next) {
375        if (whitelist != NULL && whitelist->domain == dev->addr.domain
376            && whitelist->bus == dev->addr.bus
377            && whitelist->devid == dev->addr.devid
378            && whitelist->function == dev->addr.function)
379            continue;
380                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
381                                / sizeof (format_data->blacklist[0])) {
382                        fprintf(stderr, "Warning: too many devices to blacklist consider"
383                                        " increasing BLACK_LIST_SIZE");
384                        break;
385                }
386                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
387                ++format_data->nb_blacklist;
388        }
389
390        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
391        return 0;
392}
393#else /* DPDK_USE_BLACKLIST */
394#include <rte_devargs.h>
395static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
396{
397        char pci_str[20] = {0};
398        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
399                 whitelist->domain,
400                 whitelist->bus,
401                 whitelist->devid,
402                 whitelist->function);
403        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
404                return -1;
405        }
406        return 0;
407}
408#endif
409
410/**
411 * Parse the URI format as a pci address
412 * Fills in addr, note core is optional and is unchanged if
413 * a value for it is not provided.
414 *
415 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
416 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
417 */
418static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
419        int matches;
420        assert(str);
421        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
422                         &addr->domain, &addr->bus, &addr->devid,
423                         &addr->function, core);
424        if (matches >= 4) {
425                return 0;
426        } else {
427                return -1;
428        }
429}
430
431/**
432 * Convert a pci address to the numa node it is
433 * connected to.
434 *
435 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
436 * so we can call it before DPDK
437 *
438 * @return -1 if unknown otherwise a number 0 or higher of the numa node
439 */
440static int pci_to_numa(struct rte_pci_addr * dev_addr) {
441        char path[50] = {0};
442        FILE *file;
443
444        /* Read from the system */
445        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
446                 dev_addr->domain,
447                 dev_addr->bus,
448                 dev_addr->devid,
449                 dev_addr->function);
450
451        if((file = fopen(path, "r")) != NULL) {
452                int numa_node = -1;
453                fscanf(file, "%d", &numa_node);
454                fclose(file);
455                return numa_node;
456        }
457        return -1;
458}
459
460#if DEBUG
461/* For debugging */
462static inline void dump_configuration()
463{
464        struct rte_config * global_config;
465        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
466
467        if (nb_cpu <= 0) {
468                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
469                       " Falling back to the first core.");
470                nb_cpu = 1; /* fallback to just 1 core */
471        }
472        if (nb_cpu > RTE_MAX_LCORE)
473                nb_cpu = RTE_MAX_LCORE;
474
475        global_config = rte_eal_get_configuration();
476
477        if (global_config != NULL) {
478                int i;
479                fprintf(stderr, "Intel DPDK setup\n"
480                        "---Version      : %s\n"
481                        "---Master LCore : %"PRIu32"\n"
482                        "---LCore Count  : %"PRIu32"\n",
483                        rte_version(),
484                        global_config->master_lcore, global_config->lcore_count);
485
486                for (i = 0 ; i < nb_cpu; i++) {
487                        fprintf(stderr, "   ---Core %d : %s\n", i,
488                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
489                }
490
491                const char * proc_type;
492                switch (global_config->process_type) {
493                case RTE_PROC_AUTO:
494                        proc_type = "auto";
495                        break;
496                case RTE_PROC_PRIMARY:
497                        proc_type = "primary";
498                        break;
499                case RTE_PROC_SECONDARY:
500                        proc_type = "secondary";
501                        break;
502                case RTE_PROC_INVALID:
503                        proc_type = "invalid";
504                        break;
505                default:
506                        proc_type = "something worse than invalid!!";
507                }
508                fprintf(stderr, "---Process Type : %s\n", proc_type);
509        }
510
511}
512#endif
513
514/**
515 * Expects to be called from the master lcore and moves it to the given dpdk id
516 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
517 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
518 *               and not already in use.
519 * @return 0 is successful otherwise -1 on error.
520 */
521static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
522        struct rte_config *cfg = rte_eal_get_configuration();
523        cpu_set_t cpuset;
524        int i;
525
526        assert (core < RTE_MAX_LCORE);
527        assert (rte_get_master_lcore() == rte_lcore_id());
528
529        if (core == rte_lcore_id())
530                return 0;
531
532        /* Make sure we are not overwriting someone else */
533        assert(!rte_lcore_is_enabled(core));
534
535        /* Move the core */
536        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
537        cfg->lcore_role[core] = ROLE_RTE;
538        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
539        rte_eal_get_configuration()->master_lcore = core;
540        RTE_PER_LCORE(_lcore_id) = core;
541
542        /* Now change the affinity, either mapped to a single core or all accepted */
543        CPU_ZERO(&cpuset);
544
545        if (lcore_config[core].detected) {
546                CPU_SET(core, &cpuset);
547        } else {
548                for (i = 0; i < RTE_MAX_LCORE; ++i) {
549                        if (lcore_config[i].detected)
550                                CPU_SET(i, &cpuset);
551                }
552        }
553
554        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
555        if (i != 0) {
556                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
557                return -1;
558        }
559        return 0;
560}
561
562/**
563 * XXX This is very bad XXX
564 * But we have to do something to allow getopts nesting
565 * Luckly normally the format is last so it doesn't matter
566 * DPDK only supports modern systems so hopefully this
567 * will continue to work
568 */
569struct saved_getopts {
570        char *optarg;
571        int optind;
572        int opterr;
573        int optopt;
574};
575
576static void save_getopts(struct saved_getopts *opts) {
577        opts->optarg = optarg;
578        opts->optind = optind;
579        opts->opterr = opterr;
580        opts->optopt = optopt;
581}
582
583static void restore_getopts(struct saved_getopts *opts) {
584        optarg = opts->optarg;
585        optind = opts->optind;
586        opterr = opts->opterr;
587        optopt = opts->optopt;
588}
589
590static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
591                                        char * err, int errlen) {
592        int ret; /* Returned error codes */
593        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
594        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
595        char mem_map[20] = {0}; /* The memory name */
596        long nb_cpu; /* The number of CPUs in the system */
597        long my_cpu; /* The CPU number we want to bind to */
598        int i;
599        struct rte_config *cfg = rte_eal_get_configuration();
600        struct saved_getopts save_opts;
601
602        /* This initialises the Environment Abstraction Layer (EAL)
603         * If we had slave workers these are put into WAITING state
604         *
605         * Basically binds this thread to a fixed core, which we choose as
606         * the last core on the machine (assuming fewer interrupts mapped here).
607         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
608         * "-n" the number of memory channels into the CPU (hardware specific)
609         *      - Most likely to be half the number of ram slots in your machine.
610         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
611         * Controls where in memory packets are stored such that they are spread
612         * across the channels. We just use 1 to be safe.
613         *
614         * Using unique file prefixes mean separate memory is used, unlinking
615         * the two processes. However be careful we still cannot access a
616         * port that already in use.
617         */
618        char* argv[] = {"libtrace",
619                        "-c", cpu_number,
620                        "-n", "1",
621                        "--proc-type", "auto",
622                        "--file-prefix", mem_map,
623                        "-m", "512",
624#if DPDK_USE_LOG_LEVEL
625#       if DEBUG
626                        "--log-level", "8", /* RTE_LOG_DEBUG */
627#       else
628                        "--log-level", "5", /* RTE_LOG_WARNING */
629#       endif
630#endif
631                        NULL};
632        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
633
634#if DEBUG
635        rte_set_log_level(RTE_LOG_DEBUG);
636#else
637        rte_set_log_level(RTE_LOG_WARNING);
638#endif
639
640        /* Get the number of cpu cores in the system and use the last core
641         * on the correct numa node */
642        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
643        if (nb_cpu <= 0) {
644                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
645                       " Falling back to the first core.");
646                nb_cpu = 1; /* fallback to the first core */
647        }
648        if (nb_cpu > RTE_MAX_LCORE)
649                nb_cpu = RTE_MAX_LCORE;
650
651        my_cpu = -1;
652        /* This allows the user to specify the core - we would try to do this
653         * automatically but it's hard to tell that this is secondary
654         * before running rte_eal_init(...). Currently we are limited to 1
655         * instance per core due to the way memory is allocated. */
656        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
657                snprintf(err, errlen, "Failed to parse URI");
658                return -1;
659        }
660
661#if HAVE_LIBNUMA
662        format_data->nic_numa_node = pci_to_numa(&use_addr);
663        if (my_cpu < 0) {
664                /* If we can assign to a core on the same numa node */
665                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
666                if(format_data->nic_numa_node >= 0) {
667                        int max_node_cpu = -1;
668                        struct bitmask *mask = numa_allocate_cpumask();
669                        assert(mask);
670                        numa_node_to_cpus(format_data->nic_numa_node, mask);
671                        for (i = 0 ; i < nb_cpu; ++i) {
672                                if (numa_bitmask_isbitset(mask,i))
673                                        max_node_cpu = i+1;
674                        }
675                        my_cpu = max_node_cpu;
676                }
677        }
678#endif
679        if (my_cpu < 0) {
680                my_cpu = nb_cpu;
681        }
682
683
684        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
685                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
686
687        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
688                snprintf(err, errlen,
689                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
690                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
691                return -1;
692        }
693
694        /* Make our mask with all cores turned on this is so that DPDK to
695         * gets CPU info older versions */
696        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
697        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
698
699#if !DPDK_USE_BLACKLIST
700        /* Black list all ports besides the one that we want to use */
701        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
702                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
703                         " are you sure the address is correct?: %s", strerror(-ret));
704                return -1;
705        }
706#endif
707
708        /* Give the memory map a unique name */
709        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
710        /* rte_eal_init it makes a call to getopt so we need to reset the
711         * global optind variable of getopt otherwise this fails */
712        save_getopts(&save_opts);
713        optind = 1;
714        if ((ret = rte_eal_init(argc, argv)) < 0) {
715                snprintf(err, errlen,
716                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
717                return -1;
718        }
719        restore_getopts(&save_opts);
720        // These are still running but will never do anything with DPDK v1.7 we
721        // should remove this XXX in the future
722        for(i = 0; i < RTE_MAX_LCORE; ++i) {
723                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
724                        cfg->lcore_role[i] = ROLE_OFF;
725                        cfg->lcore_count--;
726                }
727        }
728        // Only the master should be running
729        assert(cfg->lcore_count == 1);
730
731        // TODO XXX TODO
732        dpdk_move_master_lcore(NULL, my_cpu-1);
733
734#if DEBUG
735        dump_configuration();
736#endif
737
738#if DPDK_USE_PMD_INIT
739        /* This registers all available NICs with Intel DPDK
740         * These are not loaded until rte_eal_pci_probe() is called.
741         */
742        if ((ret = rte_pmd_init_all()) < 0) {
743                snprintf(err, errlen,
744                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
745                return -1;
746        }
747#endif
748
749#if DPDK_USE_BLACKLIST
750        /* Blacklist all ports besides the one that we want to use */
751        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
752                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
753                         " are you sure the address is correct?: %s", strerror(-ret));
754                return -1;
755        }
756#endif
757
758#if DPDK_USE_PCI_PROBE
759        /* This loads DPDK drivers against all ports that are not blacklisted */
760        if ((ret = rte_eal_pci_probe()) < 0) {
761                snprintf(err, errlen,
762                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
763                return -1;
764        }
765#endif
766
767        format_data->nb_ports = rte_eth_dev_count();
768
769        if (format_data->nb_ports != 1) {
770                snprintf(err, errlen,
771                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
772                         format_data->nb_ports);
773                return -1;
774        }
775
776        struct rte_eth_dev_info dev_info;
777        rte_eth_dev_info_get(0, &dev_info);
778        fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
779                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
780
781        return 0;
782}
783
784static int dpdk_init_input (libtrace_t *libtrace) {
785        dpdk_per_stream_t stream = {0};
786        char err[500];
787        err[0] = 0;
788
789        libtrace->format_data = (struct dpdk_format_data_t *)
790                                malloc(sizeof(struct dpdk_format_data_t));
791        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
792        FORMAT(libtrace)->nb_ports = 0;
793        FORMAT(libtrace)->snaplen = 0; /* Use default */
794        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
795        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
796        FORMAT(libtrace)->nic_numa_node = -1;
797        FORMAT(libtrace)->promisc = -1;
798        FORMAT(libtrace)->pktmbuf_pool = NULL;
799#if DPDK_USE_BLACKLIST
800        FORMAT(libtrace)->nb_blacklist = 0;
801#endif
802        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
803        FORMAT(libtrace)->mempool_name[0] = 0;
804        memset(FORMAT(libtrace)->burst_pkts, 0,
805               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
806        FORMAT(libtrace)->burst_size = 0;
807        FORMAT(libtrace)->burst_offset = 0;
808
809        /* Make our first stream */
810        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
811        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
812
813        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
814                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
815                free(libtrace->format_data);
816                libtrace->format_data = NULL;
817                return -1;
818        }
819        return 0;
820}
821
822static int dpdk_init_output(libtrace_out_t *libtrace)
823{
824        char err[500];
825        err[0] = 0;
826
827        libtrace->format_data = (struct dpdk_format_data_t *)
828                                malloc(sizeof(struct dpdk_format_data_t));
829        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
830        FORMAT(libtrace)->nb_ports = 0;
831        FORMAT(libtrace)->snaplen = 0; /* Use default */
832        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
833        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
834        FORMAT(libtrace)->nic_numa_node = -1;
835        FORMAT(libtrace)->promisc = -1;
836        FORMAT(libtrace)->pktmbuf_pool = NULL;
837#if DPDK_USE_BLACKLIST
838        FORMAT(libtrace)->nb_blacklist = 0;
839#endif
840        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
841        FORMAT(libtrace)->mempool_name[0] = 0;
842        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
843        FORMAT(libtrace)->burst_size = 0;
844        FORMAT(libtrace)->burst_offset = 0;
845
846        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
847                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
848                free(libtrace->format_data);
849                libtrace->format_data = NULL;
850                return -1;
851        }
852        return 0;
853}
854
855static int dpdk_pconfig_input (libtrace_t *libtrace,
856                               trace_parallel_option_t option,
857                               void *data) {
858        switch (option) {
859        case TRACE_OPTION_SET_HASHER:
860                switch (*((enum hasher_types *) data))
861                {
862                case HASHER_BALANCE:
863                case HASHER_UNIDIRECTIONAL:
864                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
865                        return 0;
866                case HASHER_BIDIRECTIONAL:
867                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
868                        return 0;
869                case HASHER_HARDWARE:
870                case HASHER_CUSTOM:
871                        // We don't support these
872                        return -1;
873                }
874                break;
875        }
876        return -1;
877}
878
879/**
880 * Note here snaplen excludes the MAC checksum. Packets over
881 * the requested snaplen will be dropped. (Excluding MAC checksum)
882 *
883 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
884 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
885 * is set the maximum size of the returned packet would be 1518 otherwise
886 * 1514 would be the largest size possibly returned.
887 *
888 */
889static int dpdk_config_input (libtrace_t *libtrace,
890                              trace_option_t option,
891                              void *data) {
892        switch (option) {
893        case TRACE_OPTION_SNAPLEN:
894                /* Only support changing snaplen before a call to start is
895                 * made */
896                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
897                        FORMAT(libtrace)->snaplen=*(int*)data;
898                else
899                        return -1;
900                return 0;
901        case TRACE_OPTION_PROMISC:
902                FORMAT(libtrace)->promisc=*(int*)data;
903                return 0;
904        case TRACE_OPTION_FILTER:
905                /* TODO filtering */
906                break;
907        case TRACE_OPTION_META_FREQ:
908                break;
909        case TRACE_OPTION_EVENT_REALTIME:
910                break;
911        /* Avoid default: so that future options will cause a warning
912         * here to remind us to implement it, or flag it as
913         * unimplementable
914         */
915        }
916
917        /* Don't set an error - trace_config will try to deal with the
918         * option and will set an error if it fails */
919        return -1;
920}
921
922/* Can set jumbo frames/ or limit the size of a frame by setting both
923 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
924 *
925 */
926static struct rte_eth_conf port_conf = {
927        .rxmode = {
928                .mq_mode = ETH_RSS,
929                .split_hdr_size = 0,
930                .header_split   = 0, /**< Header Split disabled */
931                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
932                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
933                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
934                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
935#if GET_MAC_CRC_CHECKSUM
936/* So it appears that if hw_strip_crc is turned off the driver will still
937 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
938 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
939 * So lets just add it back on when we receive the packet.
940 */
941                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
942#else
943/* By default strip the MAC checksum because it's a bit of a hack to
944 * actually read these. And don't want to rely on disabling this to actualy
945 * always cut off the checksum in the future
946 */
947                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
948#endif
949        },
950        .txmode = {
951                .mq_mode = ETH_DCB_NONE,
952        },
953        .rx_adv_conf = {
954                .rss_conf = {
955                        // .rss_key = &rss_key, // We set this per format
956                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
957                },
958        },
959        .intr_conf = {
960                .lsc = 1
961        }
962};
963
964static const struct rte_eth_rxconf rx_conf = {
965        .rx_thresh = {
966                .pthresh = 8,/* RX_PTHRESH prefetch */
967                .hthresh = 8,/* RX_HTHRESH host */
968                .wthresh = 4,/* RX_WTHRESH writeback */
969        },
970        .rx_free_thresh = 0,
971        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
972};
973
974static const struct rte_eth_txconf tx_conf = {
975        .tx_thresh = {
976                /*
977                 * TX_PTHRESH prefetch
978                 * Set on the NIC, if the number of unprocessed descriptors to queued on
979                 * the card fall below this try grab at least hthresh more unprocessed
980                 * descriptors.
981                 */
982                .pthresh = 36,
983
984                /* TX_HTHRESH host
985                 * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
986                 */
987                .hthresh = 0,
988
989                /* TX_WTHRESH writeback
990                 * Set on the NIC, the number of sent descriptors before writing back
991                 * status to confirm the transmission. This is done more efficiently as
992                 * a bulk DMA-transfer rather than writing one at a time.
993                 * Similar to tx_free_thresh however this is applied to the NIC, where
994                 * as tx_free_thresh is when DPDK will check these. This is extended
995                 * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
996                 * descriptors rather only every n'th item, reducing DMA memory bandwidth.
997                 */
998                .wthresh = 4,
999        },
1000
1001        /* Used internally by DPDK rather than passed to the NIC. The number of
1002         * packet descriptors to send before checking for any responses written
1003         * back (to confirm the transmission). Default = 32 if set to 0)
1004         */
1005        .tx_free_thresh = 0,
1006
1007        /* This is the Report Status threshold, used by 10Gbit cards,
1008         * This signals the card to only write back status (such as
1009         * transmission successful) after this minimum number of transmit
1010         * descriptors are seen. The default is 32 (if set to 0) however if set
1011         * to greater than 1 TX wthresh must be set to zero, because this is kindof
1012         * a replacement. See the dpdk programmers guide for more restrictions.
1013         */
1014        .tx_rs_thresh = 1,
1015};
1016
1017/**
1018 * A callback for a link state change (LSC).
1019 *
1020 * Packets may be received before this notification. In fact the DPDK IGXBE
1021 * driver likes to put a delay upto 5sec before sending this.
1022 *
1023 * We use this to ensure the link speed is correct for our timestamp
1024 * calculations. Because packets might be received before the link up we still
1025 * update this when the packet is received.
1026 *
1027 * @param port The DPDK port
1028 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1029 * @param cb_arg The dpdk_format_data_t structure associated with the format
1030 */
1031static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1032                              void *cb_arg) {
1033        struct dpdk_format_data_t * format_data = cb_arg;
1034        struct rte_eth_link link_info;
1035        assert(event == RTE_ETH_EVENT_INTR_LSC);
1036        assert(port == format_data->port);
1037
1038        rte_eth_link_get_nowait(port, &link_info);
1039
1040        if (link_info.link_status)
1041                format_data->link_speed = link_info.link_speed;
1042        else
1043                format_data->link_speed = 0;
1044
1045#if DEBUG
1046        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1047                link_info.link_status ? "up" : "down",
1048                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1049                                          "full-duplex" : "half-duplex",
1050                (int) link_info.link_speed);
1051#endif
1052
1053        /* Turns out DPDK drivers might not come back up if the link speed
1054         * changes. So we reset the autoneg procedure. This is very unsafe
1055         * we have have threads reading packets and we stop the port. */
1056#if 0
1057        if (!link_info.link_status) {
1058                int ret;
1059                rte_eth_dev_stop(port);
1060                ret = rte_eth_dev_start(port);
1061                if (ret < 0) {
1062                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1063                                strerror(-ret));
1064                }
1065        }
1066#endif
1067}
1068
1069/* Attach memory to the port and start (or restart) the port/s.
1070 */
1071static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
1072                              char *err, int errlen, uint16_t rx_queues) {
1073        int ret, i;
1074        struct rte_eth_link link_info; /* Wait for link */
1075        dpdk_per_stream_t empty_stream = {0};
1076
1077        /* Already started */
1078        if (format_data->paused == DPDK_RUNNING)
1079                return 0;
1080
1081        /* First time started we need to alloc our memory, doing this here
1082         * rather than in environment setup because we don't have snaplen then */
1083        if (format_data->paused == DPDK_NEVER_STARTED) {
1084                if (format_data->snaplen == 0) {
1085                        format_data->snaplen = RX_MBUF_SIZE;
1086                        port_conf.rxmode.jumbo_frame = 0;
1087                        port_conf.rxmode.max_rx_pkt_len = 0;
1088                } else {
1089                        /* Use jumbo frames */
1090                        port_conf.rxmode.jumbo_frame = 1;
1091                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1092                }
1093
1094#if GET_MAC_CRC_CHECKSUM
1095                /* This is additional overhead so make sure we allow space for this */
1096                format_data->snaplen += ETHER_CRC_LEN;
1097#endif
1098#if HAS_HW_TIMESTAMPS_82580
1099                format_data->snaplen += sizeof(struct hw_timestamp_82580);
1100#endif
1101
1102                /* Create the mbuf pool, which is the place packets are allocated
1103                 * from - There is no free function (I cannot see one).
1104                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1105                 * allocate however that extra 1 packet is not used.
1106                 * (I assume <= vs < error some where in DPDK code)
1107                 * TX requires nb_tx_buffers + 1 in the case the queue is full
1108                 * so that will fill the new buffer and wait until slots in the
1109                 * ring become available.
1110                 */
1111#if DEBUG
1112                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1113#endif
1114                format_data->pktmbuf_pool =
1115                                rte_mempool_create(format_data->mempool_name,
1116                                                   (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
1117                                                   format_data->snaplen + sizeof(struct rte_mbuf)
1118                                                   + RTE_PKTMBUF_HEADROOM,
1119                                                   128, sizeof(struct rte_pktmbuf_pool_private),
1120                                                   rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1121                                                   format_data->nic_numa_node, 0);
1122
1123                if (format_data->pktmbuf_pool == NULL) {
1124                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1125                                 "pool failed: %s", strerror(rte_errno));
1126                        return -1;
1127                }
1128        }
1129
1130        /* ----------- Now do the setup for the port mapping ------------ */
1131        /* Order of calls must be
1132         * rte_eth_dev_configure()
1133         * rte_eth_tx_queue_setup()
1134         * rte_eth_rx_queue_setup()
1135         * rte_eth_dev_start()
1136         * other rte_eth calls
1137         */
1138
1139        /* This must be called first before another *eth* function
1140         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
1141        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1142        if (ret < 0) {
1143                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1144                         " %"PRIu8" : %s", format_data->port,
1145                         strerror(-ret));
1146                return -1;
1147        }
1148#if DEBUG
1149        fprintf(stderr, "Doing dev configure\n");
1150#endif
1151        /* Initialise the TX queue a minimum value if using this port for
1152         * receiving. Otherwise a larger size if writing packets.
1153         */
1154        ret = rte_eth_tx_queue_setup(format_data->port,
1155                                     0 /* queue XXX */,
1156                                     format_data->nb_tx_buf,
1157                                     SOCKET_ID_ANY,
1158                                     &tx_conf);
1159        if (ret < 0) {
1160                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
1161                         " on port %"PRIu8" : %s", format_data->port,
1162                         strerror(-ret));
1163                return -1;
1164        }
1165
1166        /* Attach memory to our RX queues */
1167        for (i=0; i < rx_queues; i++) {
1168#if DEBUG
1169                fprintf(stderr, "Doing queue configure %d\n", i);
1170#endif
1171
1172                dpdk_per_stream_t *stream;
1173                /* Add storage for the stream */
1174                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
1175                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
1176
1177                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
1178                stream->queue_id = i;
1179
1180                /* Initialise the RX queue with some packets from memory */
1181                ret = rte_eth_rx_queue_setup(format_data->port,
1182                                             stream->queue_id,
1183                                             format_data->nb_rx_buf,
1184                                             format_data->nic_numa_node,
1185                                             &rx_conf,
1186                                             format_data->pktmbuf_pool);
1187                if (ret < 0) {
1188                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
1189                                 " RX queue on port %"PRIu8" : %s",
1190                                 format_data->port,
1191                                 strerror(-ret));
1192                        return -1;
1193                }
1194        }
1195
1196#if DEBUG
1197        fprintf(stderr, "Doing start device\n");
1198#endif
1199        /* Start device */
1200        ret = rte_eth_dev_start(format_data->port);
1201        if (ret < 0) {
1202                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1203                         strerror(-ret));
1204                return -1;
1205        }
1206
1207        /* Default promiscuous to on */
1208        if (format_data->promisc == -1)
1209                format_data->promisc = 1;
1210
1211        if (format_data->promisc == 1)
1212                rte_eth_promiscuous_enable(format_data->port);
1213        else
1214                rte_eth_promiscuous_disable(format_data->port);
1215
1216        /* We have now successfully started/unpased */
1217        format_data->paused = DPDK_RUNNING;
1218
1219
1220        /* Register a callback for link state changes */
1221        ret = rte_eth_dev_callback_register(format_data->port,
1222                                            RTE_ETH_EVENT_INTR_LSC,
1223                                            dpdk_lsc_callback,
1224                                            format_data);
1225#if DEBUG
1226        if (ret)
1227                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1228                        ret, strerror(-ret));
1229#endif
1230
1231        /* Get the current link status */
1232        rte_eth_link_get_nowait(format_data->port, &link_info);
1233        format_data->link_speed = link_info.link_speed;
1234#if DEBUG
1235        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1236                (int) link_info.link_duplex, (int) link_info.link_speed);
1237#endif
1238
1239        return 0;
1240}
1241
1242static int dpdk_start_input (libtrace_t *libtrace) {
1243        char err[500];
1244        err[0] = 0;
1245
1246        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1247                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1248                free(libtrace->format_data);
1249                libtrace->format_data = NULL;
1250                return -1;
1251        }
1252        return 0;
1253}
1254
1255static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1256        struct rte_eth_dev_info dev_info;
1257        rte_eth_dev_info_get(port_id, &dev_info);
1258        return dev_info.max_rx_queues;
1259}
1260
1261static inline size_t dpdk_processor_count () {
1262        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1263        if (nb_cpu <= 0)
1264                return 1;
1265        else
1266                return (size_t) nb_cpu;
1267}
1268
1269static int dpdk_pstart_input (libtrace_t *libtrace) {
1270        char err[500];
1271        int i=0, phys_cores=0;
1272        int tot = libtrace->perpkt_thread_count;
1273        err[0] = 0;
1274
1275        if (rte_lcore_id() != rte_get_master_lcore())
1276                fprintf(stderr, "Warning dpdk_pstart_input should be called"
1277                        " from the master DPDK thread!\n");
1278
1279        /* If the master is not on the last thread we move it there */
1280        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1281                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
1282                        return -1;
1283        }
1284
1285        /* Don't exceed the number of cores in the system/detected by dpdk
1286         * We don't have to force this but performance wont be good if we don't */
1287        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1288                if (lcore_config[i].detected) {
1289                        if (rte_lcore_is_enabled(i))
1290                                fprintf(stderr, "Found core %d already in use!\n", i);
1291                        else
1292                                phys_cores++;
1293                }
1294        }
1295
1296        tot = MIN(libtrace->perpkt_thread_count,
1297                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1298        tot = MIN(tot, phys_cores);
1299
1300#if DEBUG
1301        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
1302                libtrace->perpkt_thread_count, phys_cores);
1303#endif
1304
1305        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1306                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1307                free(libtrace->format_data);
1308                libtrace->format_data = NULL;
1309                return -1;
1310        }
1311
1312        /* Make sure we only start the number that we should */
1313        libtrace->perpkt_thread_count = tot;
1314        return 0;
1315}
1316
1317/**
1318 * Register a thread with the DPDK system,
1319 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1320 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1321 * gives it.
1322 *
1323 * We then allow a mapper thread to be started on every real core as DPDK would,
1324 * we also bind these to the corresponding CPU cores.
1325 *
1326 * @param libtrace A pointer to the trace
1327 * @param reading True if the thread will be used to read packets, i.e. will
1328 *                call pread_packet(), false if thread used to process packet
1329 *                in any other manner including statistics functions.
1330 */
1331static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1332{
1333        struct rte_config *cfg = rte_eal_get_configuration();
1334        int i;
1335        int new_id = -1;
1336
1337        /* If 'reading packets' fill in cores from 0 up and bind affinity
1338         * otherwise start from the MAX core (which is also the master) and work backwards
1339         * in this case physical cores on the system will not exist so we don't bind
1340         * these to any particular physical core */
1341        pthread_mutex_lock(&libtrace->libtrace_lock);
1342        if (reading) {
1343#if HAVE_LIBNUMA
1344                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1345                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
1346                                new_id = i;
1347                                if (!lcore_config[i].detected)
1348                                        new_id = -1;
1349                                break;
1350                        }
1351                }
1352#endif
1353                /* Retry without the the numa restriction */
1354                if (new_id == -1) {
1355                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1356                                if (!rte_lcore_is_enabled(i)) {
1357                                        new_id = i;
1358                                        if (!lcore_config[i].detected)
1359                                                fprintf(stderr, "Warning the"
1360                                                        " number of 'reading' "
1361                                                        "threads exceed cores\n");
1362                                        break;
1363                                }
1364                        }
1365                }
1366        } else {
1367                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1368                        if (!rte_lcore_is_enabled(i)) {
1369                                new_id = i;
1370                                break;
1371                        }
1372                }
1373        }
1374
1375        if (new_id == -1) {
1376                assert(cfg->lcore_count == RTE_MAX_LCORE);
1377                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
1378                              " for DPDK");
1379                pthread_mutex_unlock(&libtrace->libtrace_lock);
1380                return -1;
1381        }
1382
1383        /* Enable the core in global DPDK structs */
1384        cfg->lcore_role[new_id] = ROLE_RTE;
1385        cfg->lcore_count++;
1386         /* I think new threads are going get a default thread number of 0 */
1387        assert(rte_lcore_id() == 0);
1388        fprintf(stderr, "original id%d", rte_lcore_id());
1389        RTE_PER_LCORE(_lcore_id) = new_id;
1390        char name[99];
1391        pthread_getname_np(pthread_self(),
1392                           name, sizeof(name));
1393
1394        fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
1395
1396        if (reading) {
1397                /* Set affinity bind to corresponding core */
1398                cpu_set_t cpuset;
1399                CPU_ZERO(&cpuset);
1400                CPU_SET(rte_lcore_id(), &cpuset);
1401                i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1402                if (i != 0) {
1403                        trace_set_err(libtrace, errno, "Warning "
1404                                      "pthread_setaffinity_np failed");
1405                        pthread_mutex_unlock(&libtrace->libtrace_lock);
1406                        return -1;
1407                }
1408        }
1409
1410        /* Map our TLS to the thread data */
1411        if (reading) {
1412                if(t->type == THREAD_PERPKT) {
1413                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
1414                        if (t->format_data == NULL) {
1415                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
1416                                              "Too many threads registered");
1417                                return -1;
1418                        }
1419                } else {
1420                        t->format_data = FORMAT_DATA_FIRST(libtrace);
1421                }
1422        }
1423        pthread_mutex_unlock(&libtrace->libtrace_lock);
1424        return 0;
1425}
1426
1427/**
1428 * Unregister a thread with the DPDK system.
1429 *
1430 * Only previously registered threads should be calling this just before
1431 * they are destroyed.
1432 */
1433static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1434{
1435        struct rte_config *cfg = rte_eal_get_configuration();
1436
1437        assert(rte_lcore_id() < RTE_MAX_LCORE);
1438        pthread_mutex_lock(&libtrace->libtrace_lock);
1439        /* Skip if master */
1440        if (rte_lcore_id() == rte_get_master_lcore()) {
1441                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1442                pthread_mutex_unlock(&libtrace->libtrace_lock);
1443                return;
1444        }
1445
1446        /* Disable this core in global DPDK structs */
1447        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1448        cfg->lcore_count--;
1449        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1450        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1451        pthread_mutex_unlock(&libtrace->libtrace_lock);
1452        return;
1453}
1454
1455static int dpdk_start_output(libtrace_out_t *libtrace)
1456{
1457        char err[500];
1458        err[0] = 0;
1459
1460        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
1461                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1462                free(libtrace->format_data);
1463                libtrace->format_data = NULL;
1464                return -1;
1465        }
1466        return 0;
1467}
1468
1469static int dpdk_pause_input(libtrace_t * libtrace) {
1470        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
1471        /* This stops the device, but can be restarted using rte_eth_dev_start() */
1472        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1473#if DEBUG
1474                fprintf(stderr, "Pausing DPDK port\n");
1475#endif
1476                rte_eth_dev_stop(FORMAT(libtrace)->port);
1477                FORMAT(libtrace)->paused = DPDK_PAUSED;
1478                /* Empty the queue of packets */
1479                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1480                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1481                }
1482                FORMAT(libtrace)->burst_offset = 0;
1483                FORMAT(libtrace)->burst_size = 0;
1484
1485                for (; tmp != NULL; tmp = tmp->next) {
1486                        dpdk_per_stream_t *stream = tmp->data;
1487                        stream->ts_last_sys = 0;
1488#if HAS_HW_TIMESTAMPS_82580
1489                        stream->ts_first_sys = 0;
1490#endif
1491                }
1492
1493        }
1494        return 0;
1495}
1496
1497static int dpdk_write_packet(libtrace_out_t *trace,
1498                             libtrace_packet_t *packet){
1499        struct rte_mbuf* m_buff[1];
1500
1501        int wirelen = trace_get_wire_length(packet);
1502        int caplen = trace_get_capture_length(packet);
1503
1504        /* Check for a checksum and remove it */
1505        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1506            wirelen == caplen)
1507                caplen -= ETHER_CRC_LEN;
1508
1509        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1510        if (m_buff[0] == NULL) {
1511                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1512                return -1;
1513        } else {
1514                int ret;
1515                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1516                do {
1517                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
1518                } while (ret != 1);
1519        }
1520
1521        return 0;
1522}
1523
1524static int dpdk_fin_input(libtrace_t * libtrace) {
1525        /* Free our memory structures */
1526        if (libtrace->format_data != NULL) {
1527                /* Close the device completely, device cannot be restarted */
1528                if (FORMAT(libtrace)->port != 0xFF)
1529                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1530                                                        RTE_ETH_EVENT_INTR_LSC,
1531                                                        dpdk_lsc_callback,
1532                                                        FORMAT(libtrace));
1533                rte_eth_dev_close(FORMAT(libtrace)->port);
1534                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1535                /* filter here if we used it */
1536                free(libtrace->format_data);
1537        }
1538
1539        return 0;
1540}
1541
1542
1543static int dpdk_fin_output(libtrace_out_t * libtrace) {
1544        /* Free our memory structures */
1545        if (libtrace->format_data != NULL) {
1546                /* Close the device completely, device cannot be restarted */
1547                if (FORMAT(libtrace)->port != 0xFF)
1548                        rte_eth_dev_close(FORMAT(libtrace)->port);
1549                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
1550                /* filter here if we used it */
1551                free(libtrace->format_data);
1552        }
1553
1554        return 0;
1555}
1556
1557/**
1558 * Get the start of the additional header that we added to a packet.
1559 */
1560static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1561        assert(packet);
1562        assert(packet->buffer);
1563        /* Our header sits straight after the mbuf header */
1564        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1565}
1566
1567static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1568        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1569        return hdr->cap_len;
1570}
1571
1572static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1573        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1574        if (size > hdr->cap_len) {
1575                /* Cannot make a packet bigger */
1576                return trace_get_capture_length(packet);
1577        }
1578
1579        /* Reset the cached capture length first*/
1580        packet->capture_length = -1;
1581        hdr->cap_len = (uint32_t) size;
1582        return trace_get_capture_length(packet);
1583}
1584
1585static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1586        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1587        int org_cap_size; /* The original capture size */
1588        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1589                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1590                               sizeof(struct hw_timestamp_82580);
1591        } else {
1592                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1593        }
1594        if (hdr->flags & INCLUDES_CHECKSUM) {
1595                return org_cap_size;
1596        } else {
1597                /* DPDK packets are always TRACE_TYPE_ETH packets */
1598                return org_cap_size + ETHER_CRC_LEN;
1599        }
1600}
1601
1602static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1603        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1604        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1605                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1606                                sizeof(struct hw_timestamp_82580);
1607        else
1608                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1609}
1610
1611static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1612                               libtrace_packet_t *packet, void *buffer,
1613                               libtrace_rt_types_t rt_type, uint32_t flags) {
1614        assert(packet);
1615        if (packet->buffer != buffer &&
1616            packet->buf_control == TRACE_CTRL_PACKET) {
1617                free(packet->buffer);
1618        }
1619
1620        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
1621                packet->buf_control = TRACE_CTRL_PACKET;
1622        else
1623                packet->buf_control = TRACE_CTRL_EXTERNAL;
1624
1625        packet->buffer = buffer;
1626        packet->header = buffer;
1627
1628        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1629        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1630        packet->type = rt_type;
1631        return 0;
1632}
1633
1634/**
1635 * Given a packet size and a link speed, computes the
1636 * time to transmit in nanoseconds.
1637 *
1638 * @param format_data The dpdk format data from which we get the link speed
1639 *        and if unset updates it in a thread safe manner
1640 * @param pkt_size The size of the packet in bytes
1641 * @return The wire time in nanoseconds
1642 */
1643static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1644        uint32_t wire_time;
1645        /* 20 extra bytes of interframe gap and preamble */
1646# if GET_MAC_CRC_CHECKSUM
1647        wire_time = ((pkt_size + 20) * 8000);
1648# else
1649        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1650# endif
1651
1652        /* Division is really slow and introduces a pipeline stall
1653         * The compiler will optimise this into magical multiplication and shifting
1654         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1655         */
1656retry_calc_wiretime:
1657        switch (format_data->link_speed) {
1658        case ETH_LINK_SPEED_40G:
1659                wire_time /=  ETH_LINK_SPEED_40G;
1660                break;
1661        case ETH_LINK_SPEED_20G:
1662                wire_time /= ETH_LINK_SPEED_20G;
1663                break;
1664        case ETH_LINK_SPEED_10G:
1665                wire_time /= ETH_LINK_SPEED_10G;
1666                break;
1667        case ETH_LINK_SPEED_1000:
1668                wire_time /= ETH_LINK_SPEED_1000;
1669                break;
1670        case 0:
1671                {
1672                /* Maybe the link was down originally, but now it should be up */
1673                struct rte_eth_link link = {0};
1674                rte_eth_link_get_nowait(format_data->port, &link);
1675                if (link.link_status && link.link_speed) {
1676                        format_data->link_speed = link.link_speed;
1677#ifdef DEBUG
1678                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1679#endif
1680                        goto retry_calc_wiretime;
1681                }
1682                /* We don't know the link speed, make sure numbers are counting up */
1683                wire_time = 1;
1684                break;
1685                }
1686        default:
1687                wire_time /= format_data->link_speed;
1688        }
1689        return wire_time;
1690}
1691
1692/**
1693 * Does any extra preperation to all captured packets
1694 * This includes adding our extra header to it with the timestamp,
1695 * and any snapping
1696 *
1697 * @param format_data The DPDK format data
1698 * @param plc The DPDK per lcore format data
1699 * @param pkts An array of size nb_pkts of DPDK packets
1700 */
1701static inline void dpdk_ready_pkts(libtrace_t *libtrace,
1702                                   struct dpdk_per_stream_t *plc,
1703                                   struct rte_mbuf **pkts,
1704                                   size_t nb_pkts) {
1705        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1706        struct dpdk_addt_hdr *hdr;
1707        size_t i;
1708        uint64_t cur_sys_time_ns;
1709#if HAS_HW_TIMESTAMPS_82580
1710        struct hw_timestamp_82580 *hw_ts;
1711        uint64_t estimated_wraps;
1712#else
1713
1714#endif
1715
1716#if USE_CLOCK_GETTIME
1717        struct timespec cur_sys_time = {0};
1718        /* This looks terrible and I feel bad doing it. But it's OK
1719         * on new kernels, because this is a fast vsyscall */
1720        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1721        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1722#else
1723        struct timeval cur_sys_time = {0};
1724        /* Also a fast vsyscall */
1725        gettimeofday(&cur_sys_time, NULL);
1726        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1727#endif
1728
1729        /* The system clock is not perfect so when running
1730         * at linerate we could timestamp a packet in the past.
1731         * To avoid this we munge the timestamp to appear 1ns
1732         * after the previous packet. We should eventually catch up
1733         * to system time since a 64byte packet on a 10G link takes 67ns.
1734         *
1735         * Note with parallel readers timestamping packets
1736         * with duplicate stamps or out of order is unavoidable without
1737         * hardware timestamping from the NIC.
1738         */
1739#if !HAS_HW_TIMESTAMPS_82580
1740        if (plc->ts_last_sys >= cur_sys_time_ns) {
1741                cur_sys_time_ns = plc->ts_last_sys + 1;
1742        }
1743#endif
1744
1745        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1746        for (i = 0 ; i < nb_pkts ; ++i) {
1747
1748                /* We put our header straight after the dpdk header */
1749                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1750                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1751
1752#if GET_MAC_CRC_CHECKSUM
1753                /* Add back in the CRC sum */
1754                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1755                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1756                hdr->flags |= INCLUDES_CHECKSUM;
1757#endif
1758
1759                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1760
1761#if HAS_HW_TIMESTAMPS_82580
1762                /* The timestamp is sitting before our packet and is included in pkt_len */
1763                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1764                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1765                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1766
1767                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1768                 *
1769                 *        +----------+---+   +--------------+
1770                 *  82580 |    24    | 8 |   |      32      |
1771                 *        +----------+---+   +--------------+
1772                 *          reserved  \______ 40 bits _____/
1773                 *
1774                 * The 40 bit 82580 SYSTIM overflows every
1775                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1776                 *
1777                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1778                 * Endian (for the full 64 bits) i.e. picture is mirrored
1779                 */
1780
1781                /* Despite what the documentation says this is in Little
1782                 * Endian byteorder. Mask the reserved section out.
1783                 */
1784                hdr->timestamp = le64toh(hw_ts->timestamp) &
1785                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1786
1787                if (unlikely(plc->ts_first_sys == 0)) {
1788                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1789                        plc->ts_last_sys = plc->ts_first_sys;
1790                }
1791
1792                /* This will have serious problems if packets aren't read quickly
1793                 * that is within a couple of seconds because our clock cycles every
1794                 * 18 seconds */
1795                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1796                                  / (1ull<<TS_NBITS_82580);
1797
1798                /* Estimated_wraps gives the number of times the counter should have
1799                 * wrapped (however depending on value last time it could have wrapped
1800                 * twice more (if hw clock is close to its max value) or once less (allowing
1801                 * for a bit of variance between hw and sys clock). But if the clock
1802                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1803                if (unlikely(estimated_wraps >= 2)) {
1804                        /* 2 or more wrap arounds add all but the very last wrap */
1805                        plc->wrap_count += estimated_wraps - 1;
1806                }
1807
1808                /* Set the timestamp to the lowest possible value we're considering */
1809                hdr->timestamp += plc->ts_first_sys +
1810                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1811
1812                /* In most runs only the first if() will need evaluating - i.e our
1813                 * estimate is correct. */
1814                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1815                                              hdr->timestamp, MAXSKEW_82580))) {
1816                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1817                        plc->wrap_count++;
1818                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1819                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1820                                             hdr->timestamp, MAXSKEW_82580)) {
1821                                /* Failed to match estimated_wraps */
1822                                plc->wrap_count++;
1823                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1824                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1825                                                     hdr->timestamp, MAXSKEW_82580)) {
1826                                        if (estimated_wraps == 0) {
1827                                                /* 0 case Failed to match estimated_wraps+2 */
1828                                                printf("WARNING - Hardware Timestamp failed to"
1829                                                       " match using systemtime!\n");
1830                                                hdr->timestamp = cur_sys_time_ns;
1831                                        } else {
1832                                                /* Failed to match estimated_wraps+1 */
1833                                                plc->wrap_count++;
1834                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1835                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1836                                                                     hdr->timestamp, MAXSKEW_82580)) {
1837                                                        /* Failed to match estimated_wraps+2 */
1838                                                        printf("WARNING - Hardware Timestamp failed to"
1839                                                               " match using systemtime!!\n");
1840                                                }
1841                                        }
1842                                }
1843                        }
1844                }
1845#else
1846
1847                hdr->timestamp = cur_sys_time_ns;
1848                /* Offset the next packet by the wire time of previous */
1849                calculate_wire_time(format_data, hdr->cap_len);
1850
1851#endif
1852        }
1853
1854        plc->ts_last_sys = cur_sys_time_ns;
1855
1856        return;
1857}
1858
1859
1860static void dpdk_fin_packet(libtrace_packet_t *packet)
1861{
1862        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1863                rte_pktmbuf_free(packet->buffer);
1864                packet->buffer = NULL;
1865        }
1866}
1867
1868/** Reads at least one packet or returns an error
1869 */
1870static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
1871                                           dpdk_per_stream_t *stream,
1872                                           libtrace_message_queue_t *mesg,
1873                                           struct rte_mbuf* pkts_burst[],
1874                                           size_t nb_packets) {
1875        size_t nb_rx; /* Number of rx packets we've recevied */
1876        while (1) {
1877                /* Poll for a batch of packets */
1878                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
1879                                         stream->queue_id, pkts_burst, nb_packets);
1880                if (nb_rx > 0) {
1881                        /* Got some packets - otherwise we keep spining */
1882                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
1883                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
1884                        return nb_rx;
1885                }
1886                /* Check the message queue this could be less than 0 */
1887                if (mesg && libtrace_message_queue_count(mesg) > 0)
1888                        return READ_MESSAGE;
1889                if (libtrace_halt)
1890                        return READ_EOF;
1891                /* Wait a while, polling on memory degrades performance
1892                 * This relieves the pressure on memory allowing the NIC to DMA */
1893                rte_delay_us(10);
1894        }
1895
1896        /* We'll never get here - but if we did it would be bad */
1897        return READ_ERROR;
1898}
1899
1900static int dpdk_pread_packets (libtrace_t *libtrace,
1901                                    libtrace_thread_t *t,
1902                                    libtrace_packet_t **packets,
1903                                    size_t nb_packets) {
1904        int nb_rx; /* Number of rx packets we've recevied */
1905        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
1906        size_t i;
1907        dpdk_per_stream_t *stream = t->format_data;
1908
1909        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
1910                                         pkts_burst, nb_packets);
1911
1912        if (nb_rx > 0) {
1913                for (i = 0; i < nb_rx; ++i) {
1914                        if (packets[i]->buffer != NULL) {
1915                                /* The packet should always be finished */
1916                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
1917                                free(packets[i]->buffer);
1918                        }
1919                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
1920                        packets[i]->type = TRACE_RT_DATA_DPDK;
1921                        packets[i]->buffer = pkts_burst[i];
1922                        packets[i]->trace = libtrace;
1923                        packets[i]->error = 1;
1924                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
1925                }
1926        }
1927
1928        return nb_rx;
1929}
1930
1931static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
1932        int nb_rx; /* Number of rx packets we've received */
1933        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
1934
1935        /* Free the last packet buffer */
1936        if (packet->buffer != NULL) {
1937                /* The packet should always be finished */
1938                assert(packet->buf_control == TRACE_CTRL_PACKET);
1939                free(packet->buffer);
1940                packet->buffer = NULL;
1941        }
1942
1943        packet->buf_control = TRACE_CTRL_EXTERNAL;
1944        packet->type = TRACE_RT_DATA_DPDK;
1945
1946        /* Check if we already have some packets buffered */
1947        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
1948                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
1949                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1950                return 1; // TODO should be bytes read, which essentially useless anyway
1951        }
1952
1953        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
1954                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
1955
1956        if (nb_rx > 0) {
1957                FORMAT(libtrace)->burst_size = nb_rx;
1958                FORMAT(libtrace)->burst_offset = 1;
1959                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
1960                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
1961                return 1;
1962        }
1963        return nb_rx;
1964}
1965
1966static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
1967        struct timeval tv;
1968        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1969
1970        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1971        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
1972        return tv;
1973}
1974
1975static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
1976        struct timespec ts;
1977        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1978
1979        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
1980        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
1981        return ts;
1982}
1983
1984static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
1985        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
1986}
1987
1988static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
1989        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1990        return (libtrace_direction_t) hdr->direction;
1991}
1992
1993static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
1994        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1995        hdr->direction = (uint8_t) direction;
1996        return (libtrace_direction_t) hdr->direction;
1997}
1998
1999static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
2000        struct rte_eth_stats dev_stats = {0};
2001
2002        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2003                return;
2004
2005        /* Grab the current stats */
2006        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
2007
2008        stats->captured_valid = true;
2009        stats->captured = dev_stats.ipackets;
2010
2011        /* Not that we support adding filters but if we did this
2012         * would work */
2013        stats->filtered += dev_stats.fdirmiss;
2014
2015        stats->dropped_valid = true;
2016        stats->dropped = dev_stats.imissed;
2017
2018        /* DPDK errors includes drops */
2019        stats->errors_valid = true;
2020        stats->errors = dev_stats.ierrors - dev_stats.imissed;
2021
2022        stats->received_valid = true;
2023        stats->received = dev_stats.ipackets + dev_stats.imissed;
2024
2025}
2026
2027/* Attempts to read a packet in a non-blocking fashion. If one is not
2028 * available a SLEEP event is returned. We do not have the ability to
2029 * create a select()able file descriptor in DPDK.
2030 */
2031static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2032                                            libtrace_packet_t *packet) {
2033        libtrace_eventobj_t event = {0,0,0.0,0};
2034        int nb_rx; /* Number of receive packets we've read */
2035        struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2036
2037        do {
2038
2039                /* See if we already have a packet waiting */
2040                nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2041                                         FORMAT_DATA_FIRST(trace)->queue_id,
2042                                         pkts_burst, 1);
2043
2044                if (nb_rx > 0) {
2045                        /* Free the last packet buffer */
2046                        if (packet->buffer != NULL) {
2047                                /* The packet should always be finished */
2048                                assert(packet->buf_control == TRACE_CTRL_PACKET);
2049                                free(packet->buffer);
2050                                packet->buffer = NULL;
2051                        }
2052
2053                        packet->buf_control = TRACE_CTRL_EXTERNAL;
2054                        packet->type = TRACE_RT_DATA_DPDK;
2055                        event.type = TRACE_EVENT_PACKET;
2056                        dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
2057                        packet->buffer = FORMAT(trace)->burst_pkts[0];
2058                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
2059                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
2060
2061                        /* XXX - Check this passes the filter trace_read_packet normally
2062                         * does this for us but this wont */
2063                        if (trace->filter) {
2064                                if (!trace_apply_filter(trace->filter, packet)) {
2065                                        /* Failed the filter so we loop for another packet */
2066                                        trace->filtered_packets ++;
2067                                        continue;
2068                                }
2069                        }
2070                        trace->accepted_packets ++;
2071                } else {
2072                        /* We only want to sleep for a very short time - we are non-blocking */
2073                        event.type = TRACE_EVENT_SLEEP;
2074                        event.seconds = 0.0001;
2075                        event.size = 0;
2076                }
2077
2078                /* If we get here we have our event */
2079                break;
2080        } while (1);
2081
2082        return event;
2083}
2084
2085static void dpdk_help(void) {
2086        printf("dpdk format module: $Revision: 1752 $\n");
2087        printf("Supported input URIs:\n");
2088        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2089        printf("\tThe -<coreid> is optional \n");
2090        printf("\t e.g. dpdk:0000:01:00.1\n");
2091        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2092        printf("\t By default the last CPU core is used if not otherwise specified.\n");
2093        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2094        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2095        printf("\n");
2096        printf("Supported output URIs:\n");
2097        printf("\tSame format as the input URI.\n");
2098        printf("\t e.g. dpdk:0000:01:00.1\n");
2099        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2100        printf("\n");
2101}
2102
2103static struct libtrace_format_t dpdk = {
2104        "dpdk",
2105        "$Id$",
2106        TRACE_FORMAT_DPDK,
2107        NULL,                               /* probe filename */
2108        NULL,                               /* probe magic */
2109        dpdk_init_input,                    /* init_input */
2110        dpdk_config_input,                  /* config_input */
2111        dpdk_start_input,                   /* start_input */
2112        dpdk_pause_input,                   /* pause_input */
2113        dpdk_init_output,                   /* init_output */
2114        NULL,                               /* config_output */
2115        dpdk_start_output,                  /* start_ouput */
2116        dpdk_fin_input,                     /* fin_input */
2117        dpdk_fin_output,                    /* fin_output */
2118        dpdk_read_packet,                   /* read_packet */
2119        dpdk_prepare_packet,                /* prepare_packet */
2120        dpdk_fin_packet,                    /* fin_packet */
2121        dpdk_write_packet,                  /* write_packet */
2122        dpdk_get_link_type,                 /* get_link_type */
2123        dpdk_get_direction,                 /* get_direction */
2124        dpdk_set_direction,                 /* set_direction */
2125        NULL,                               /* get_erf_timestamp */
2126        dpdk_get_timeval,                   /* get_timeval */
2127        dpdk_get_timespec,                  /* get_timespec */
2128        NULL,                               /* get_seconds */
2129        NULL,                               /* seek_erf */
2130        NULL,                               /* seek_timeval */
2131        NULL,                               /* seek_seconds */
2132        dpdk_get_capture_length,            /* get_capture_length */
2133        dpdk_get_wire_length,               /* get_wire_length */
2134        dpdk_get_framing_length,            /* get_framing_length */
2135        dpdk_set_capture_length,            /* set_capture_length */
2136        NULL,                               /* get_received_packets */
2137        NULL,                               /* get_filtered_packets */
2138        NULL,                               /* get_dropped_packets */
2139        dpdk_get_stats,                     /* get_statistics */
2140        NULL,                               /* get_fd */
2141        dpdk_trace_event,                   /* trace_event */
2142        dpdk_help,                          /* help */
2143        NULL,                               /* next pointer */
2144        {true, 8},                          /* Live, NICs typically have 8 threads */
2145        dpdk_pstart_input,                  /* pstart_input */
2146        dpdk_pread_packets,                 /* pread_packets */
2147        dpdk_pause_input,                   /* ppause */
2148        dpdk_fin_input,                     /* p_fin */
2149        dpdk_pconfig_input,                 /* pconfig_input */
2150        dpdk_pregister_thread,              /* pregister_thread */
2151        dpdk_punregister_thread,            /* punregister_thread */
2152        NULL                                /* get thread stats */
2153};
2154
2155void dpdk_constructor(void) {
2156        register_format(&dpdk);
2157}
Note: See TracBrowser for help on using the repository browser.