source: lib/format_dpdk.c @ 10c47a0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 10c47a0 was 10c47a0, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Fixes DAG DUCK reporting for parallel libtrace.
In parallel libtrace DUCK is only ever sent to the first thread.

It is now up each formats pread_packet to tag the trace along with
the error (AKA bytes read) to each packet.

Change logic in parallel libtrace to alwaus prefer pread over read if
it exists.

Fix some unresolved conflict in DPDK that I missed, that was ifdef'd out.

  • Property mode set to 100644
File size: 74.8 KB
Line 
1
2/*
3 * This file is part of libtrace
4 *
5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
6 * New Zealand.
7 *
8 * Author: Richard Sanger
9 *
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
30 *
31 */
32
33/* This format module deals with using the Intel Data Plane Development
34 * Kit capture format.
35 *
36 * Intel Data Plane Development Kit is a LIVE capture format.
37 *
38 * This format also supports writing which will write packets out to the
39 * network as a form of packet replay. This should not be confused with the
40 * RT protocol which is intended to transfer captured packet records between
41 * RT-speaking programs.
42 */
43
44#define _GNU_SOURCE
45
46#include "config.h"
47#include "libtrace.h"
48#include "libtrace_int.h"
49#include "format_helper.h"
50#include "libtrace_arphrd.h"
51#include "hash_toeplitz.h"
52
53#ifdef HAVE_INTTYPES_H
54#  include <inttypes.h>
55#else
56# error "Can't find inttypes.h"
57#endif
58
59#include <stdlib.h>
60#include <assert.h>
61#include <unistd.h>
62#include <endian.h>
63#include <string.h>
64
65#if HAVE_LIBNUMA
66#include <numa.h>
67#endif
68
69/* We can deal with any minor differences by checking the RTE VERSION
70 * Typically DPDK backports some fixes (typically for building against
71 * newer kernels) to the older version of DPDK.
72 *
73 * These get released with the rX suffix. The following macros where added
74 * in these new releases.
75 *
76 * Below this is a log of version that required changes to the libtrace
77 * code (that we still attempt to support).
78 *
79 * DPDK v1.7.1 is recommended.
80 * However 1.5 to 1.8 are likely supported.
81 */
82#include <rte_eal.h>
83#include <rte_version.h>
84#ifndef RTE_VERSION_NUM
85#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
86#endif
87#ifndef RTE_VER_PATCH_RELEASE
88#       define RTE_VER_PATCH_RELEASE 0
89#endif
90#ifndef RTE_VERSION
91#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
92        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
93#endif
94
95/* 1.6.0r2 :
96 *      rte_eal_pci_set_blacklist() is removed
97 *      device_list is renamed to pci_device_list
98 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
99 *      as such we do apply the whitelist before rte_eal_init.
100 *      This also works correctly with DPDK 1.6.0r2.
101 *
102 * Replaced by:
103 *      rte_devargs (we can simply whitelist)
104 */
105#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
106#       define DPDK_USE_BLACKLIST 1
107#else
108#       define DPDK_USE_BLACKLIST 0
109#endif
110
111/*
112 * 1.7.0 :
113 *      rte_pmd_init_all is removed
114 *
115 * Replaced by:
116 *      Nothing, no longer needed
117 */
118#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
119#       define DPDK_USE_PMD_INIT 1
120#else
121#       define DPDK_USE_PMD_INIT 0
122#endif
123
124/* 1.7.0-rc3 :
125 *
126 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
127 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
128 * it twice.
129 */
130#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
131#       define DPDK_USE_PCI_PROBE 1
132#else
133#       define DPDK_USE_PCI_PROBE 0
134#endif
135
136/* 1.8.0-rc1 :
137 * LOG LEVEL is a command line option which overrides what
138 * we previously set it to.
139 */
140#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
141#       define DPDK_USE_LOG_LEVEL 1
142#else
143#       define DPDK_USE_LOG_LEVEL 0
144#endif
145
146#include <rte_per_lcore.h>
147#include <rte_debug.h>
148#include <rte_errno.h>
149#include <rte_common.h>
150#include <rte_log.h>
151#include <rte_memcpy.h>
152#include <rte_prefetch.h>
153#include <rte_branch_prediction.h>
154#include <rte_pci.h>
155#include <rte_ether.h>
156#include <rte_ethdev.h>
157#include <rte_ring.h>
158#include <rte_mempool.h>
159#include <rte_mbuf.h>
160#include <rte_launch.h>
161#include <rte_lcore.h>
162#include <rte_per_lcore.h>
163#include <rte_cycles.h>
164#include <pthread.h>
165
166/* The default size of memory buffers to use - This is the max size of standard
167 * ethernet packet less the size of the MAC CHECKSUM */
168#define RX_MBUF_SIZE 1514
169
170/* The minimum number of memory buffers per queue tx or rx. Search for
171 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
172 */
173#define MIN_NB_BUF 64
174
175/* Number of receive memory buffers to use
176 * By default this is limited by driver to 4k and must be a multiple of 128.
177 * A modification can be made to the driver to remove this limit.
178 * This can be increased in the driver and here.
179 * Should be at least MIN_NB_BUF.
180 */
181#define NB_RX_MBUF 4096
182
183/* Number of send memory buffers to use.
184 * Same limits apply as those to NB_TX_MBUF.
185 */
186#define NB_TX_MBUF 1024
187
188/* The size of the PCI blacklist needs to be big enough to contain
189 * every PCI device address (listed by lspci every bus:device.function tuple).
190 */
191#define BLACK_LIST_SIZE 50
192
193/* The maximum number of characters the mempool name can be */
194#define MEMPOOL_NAME_LEN 20
195
196/* For single threaded libtrace we read packets as a batch/burst
197 * this is the maximum size of said burst */
198#define BURST_SIZE 50
199
200#define MBUF(x) ((struct rte_mbuf *) x)
201/* Get the original placement of the packet data */
202#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
203#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
204#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
205
206#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
207                        (uint64_t) tv.tv_usec*1000ull)
208#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
209                        (uint64_t) ts.tv_nsec)
210
211#if RTE_PKTMBUF_HEADROOM != 128
212#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
213         "any libtrace instance processing these packet must be have the" \
214         "same RTE_PKTMBUF_HEADROOM set"
215#endif
216
217/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
218 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
219 *
220 * Make sure you understand what these are doing before enabling them.
221 * They might make traces incompatable with other builds etc.
222 *
223 * These are also included to show how to do somethings which aren't
224 * obvious in the DPDK documentation.
225 */
226
227/* Print verbose messages to stderr */
228#define DEBUG 0
229
230/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
231 * only turn on if you know clock_gettime is a vsyscall on your system
232 * overwise could be a large overhead. Again gettimeofday() should be
233 * vsyscall also if it's not you should seriously consider updating your
234 * kernel.
235 */
236#ifdef HAVE_LIBRT
237/* You can turn this on (set to 1) to prefer clock_gettime */
238#define USE_CLOCK_GETTIME 1
239#else
240/* DONT CHANGE THIS !!! */
241#define USE_CLOCK_GETTIME 1
242#endif
243
244/* This is fairly safe to turn on - currently there appears to be a 'bug'
245 * in DPDK that will remove the checksum by making the packet appear 4bytes
246 * smaller than what it really is. Most formats don't include the checksum
247 * hence writing out a port such as int: ring: and dpdk: assumes there
248 * is no checksum and will attempt to write the checksum as part of the
249 * packet
250 */
251#define GET_MAC_CRC_CHECKSUM 0
252
253/* This requires a modification of the pmd drivers (inside Intel DPDK)
254 * TODO this requires updating (packet sizes are wrong TS most likely also)
255 */
256#define HAS_HW_TIMESTAMPS_82580 0
257
258#if HAS_HW_TIMESTAMPS_82580
259# define TS_NBITS_82580     40
260/* The maximum on the +ve or -ve side that we can be, make it half way */
261# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
262#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
263#endif
264
265/* As per Intel 82580 specification - mismatch in 82580 datasheet
266 * it states ts is stored in Big Endian, however its actually Little */
267struct hw_timestamp_82580 {
268    uint64_t reserved;
269    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
270};
271
272enum paused_state {
273    DPDK_NEVER_STARTED,
274    DPDK_RUNNING,
275    DPDK_PAUSED,
276};
277
278struct dpdk_per_lcore_t
279{
280        uint16_t queue_id;
281        uint8_t port;
282        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
283#if HAS_HW_TIMESTAMPS_82580
284        /* Timestamping only relevent to RX */
285        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
286        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
287#endif
288};
289
290/* Used by both input and output however some fields are not used
291 * for output */
292struct dpdk_format_data_t {
293    int8_t promisc; /* promiscuous mode - RX only */
294    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
295    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
296    uint8_t paused; /* See paused_state */
297    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
298    uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
299    int snaplen; /* The snap length for the capture - RX only */
300    /* We always have to setup both rx and tx queues even if we don't want them */
301    int nb_rx_buf; /* The number of packet buffers in the rx ring */
302    int nb_tx_buf; /* The number of packet buffers in the tx ring */
303    int nic_numa_node; /* The NUMA node that the NIC is attached to */
304    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
305#if DPDK_USE_BLACKLIST
306    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
307        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
308#endif
309    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
310    uint8_t rss_key[40]; // This is the RSS KEY
311    /* To improve performance we always batch reading packets, in a burst */
312    struct rte_mbuf* burst_pkts[BURST_SIZE];
313    int burst_size; /* The total number read in the burst */
314    int burst_offset; /* The offset we are into the burst */
315        // DPDK normally seems to have a limit of 8 queues for a given card
316        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
317};
318
319enum dpdk_addt_hdr_flags {
320    INCLUDES_CHECKSUM = 0x1,
321    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
322};
323
324/**
325 * A structure placed in front of the packet where we can store
326 * additional information about the given packet.
327 * +--------------------------+
328 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
329 * +--------------------------+
330 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
331 * +--------------------------+
332 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
333 * +--------------------------+
334 * *   hw_timestamp_82580     * 16 bytes Optional
335 * +--------------------------+
336 * |       Packet data        | Variable Size
337 * |                          |
338 */
339struct dpdk_addt_hdr {
340    uint64_t timestamp;
341    uint8_t flags;
342    uint8_t direction;
343    uint8_t reserved1;
344    uint8_t reserved2;
345    uint32_t cap_len; /* The size to say the capture is */
346};
347
348/**
349 * We want to blacklist all devices except those on the whitelist
350 * (I say list, but yes it is only the one).
351 *
352 * The default behaviour of rte_pci_probe() will map every possible device
353 * to its DPDK driver. The DPDK driver will take the ethernet device
354 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
355 *
356 * So blacklist all devices except the one that we wish to use so that
357 * the others can still be used as standard ethernet ports.
358 *
359 * @return 0 if successful, otherwise -1 on error.
360 */
361#if DPDK_USE_BLACKLIST
362static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
363{
364        struct rte_pci_device *dev = NULL;
365        format_data->nb_blacklist = 0;
366
367        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
368
369        TAILQ_FOREACH(dev, &device_list, next) {
370        if (whitelist != NULL && whitelist->domain == dev->addr.domain
371            && whitelist->bus == dev->addr.bus
372            && whitelist->devid == dev->addr.devid
373            && whitelist->function == dev->addr.function)
374            continue;
375                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
376                                / sizeof (format_data->blacklist[0])) {
377                        fprintf(stderr, "Warning: too many devices to blacklist consider"
378                                        " increasing BLACK_LIST_SIZE");
379                        break;
380                }
381                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
382                ++format_data->nb_blacklist;
383        }
384
385        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
386        return 0;
387}
388#else /* DPDK_USE_BLACKLIST */
389#include <rte_devargs.h>
390static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
391{
392        char pci_str[20] = {0};
393        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
394                 whitelist->domain,
395                 whitelist->bus,
396                 whitelist->devid,
397                 whitelist->function);
398        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
399                return -1;
400        }
401        return 0;
402}
403#endif
404
405/**
406 * Parse the URI format as a pci address
407 * Fills in addr, note core is optional and is unchanged if
408 * a value for it is not provided.
409 *
410 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
411 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
412 */
413static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
414    int matches;
415    assert(str);
416    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
417                     &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
418    if (matches >= 4) {
419        return 0;
420    } else {
421        return -1;
422    }
423}
424
425/**
426 * Convert a pci address to the numa node it is
427 * connected to.
428 *
429 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
430 * so we can call it before DPDK
431 *
432 * @return -1 if unknown otherwise a number 0 or higher of the numa node
433 */
434static int pci_to_numa(struct rte_pci_addr * dev_addr) {
435        char path[50] = {0};
436        FILE *file;
437
438        /* Read from the system */
439        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
440                 dev_addr->domain,
441                 dev_addr->bus,
442                 dev_addr->devid,
443                 dev_addr->function);
444
445        if((file = fopen(path, "r")) != NULL) {
446                int numa_node = -1;
447                fscanf(file, "%d", &numa_node);
448                fclose(file);
449                return numa_node;
450        }
451        return -1;
452}
453
454#if DEBUG
455/* For debugging */
456static inline void dump_configuration()
457{
458    struct rte_config * global_config;
459    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
460
461    if (nb_cpu <= 0) {
462        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
463        nb_cpu = 1; /* fallback to just 1 core */
464    }
465    if (nb_cpu > RTE_MAX_LCORE)
466        nb_cpu = RTE_MAX_LCORE;
467
468    global_config = rte_eal_get_configuration();
469
470    if (global_config != NULL) {
471        int i;
472        fprintf(stderr, "Intel DPDK setup\n"
473               "---Version      : %s\n"
474               "---Master LCore : %"PRIu32"\n"
475               "---LCore Count  : %"PRIu32"\n",
476               rte_version(),
477               global_config->master_lcore, global_config->lcore_count);
478
479        for (i = 0 ; i < nb_cpu; i++) {
480            fprintf(stderr, "   ---Core %d : %s\n", i,
481                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
482        }
483
484        const char * proc_type;
485        switch (global_config->process_type) {
486            case RTE_PROC_AUTO:
487                proc_type = "auto";
488                break;
489            case RTE_PROC_PRIMARY:
490                proc_type = "primary";
491                break;
492            case RTE_PROC_SECONDARY:
493                proc_type = "secondary";
494                break;
495            case RTE_PROC_INVALID:
496                proc_type = "invalid";
497                break;
498            default:
499                proc_type = "something worse than invalid!!";
500        }
501        fprintf(stderr, "---Process Type : %s\n", proc_type);
502    }
503
504}
505#endif
506
507/**
508 * Expects to be called from the master lcore and moves it to the given dpdk id
509 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
510 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
511 *               and not already in use.
512 * @return 0 is successful otherwise -1 on error.
513 */
514static inline int dpdk_move_master_lcore(size_t core) {
515    struct rte_config *cfg = rte_eal_get_configuration();
516    cpu_set_t cpuset;
517    int i;
518
519    assert (core < RTE_MAX_LCORE);
520    assert (rte_get_master_lcore() == rte_lcore_id());
521
522    if (core == rte_lcore_id())
523        return 0;
524
525    // Make sure we are not overwriting someone else
526    assert(!rte_lcore_is_enabled(core));
527
528    // Move the core
529    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
530    cfg->lcore_role[core] = ROLE_RTE;
531    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
532    rte_eal_get_configuration()->master_lcore = core;
533    RTE_PER_LCORE(_lcore_id) = core;
534
535    // Now change the affinity
536    CPU_ZERO(&cpuset);
537
538    if (lcore_config[core].detected) {
539        CPU_SET(core, &cpuset);
540    } else {
541        for (i = 0; i < RTE_MAX_LCORE; ++i) {
542            if (lcore_config[i].detected)
543                CPU_SET(i, &cpuset);
544        }
545    }
546
547    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
548    if (i != 0) {
549        // TODO proper libtrace style error here!!
550        fprintf(stderr, "pthread_setaffinity_np failed\n");
551        return -1;
552    }
553    return 0;
554}
555
556
557/**
558 * XXX This is very bad XXX
559 * But we have to do something to allow getopts nesting
560 * Luckly normally the format is last so it doesn't matter
561 * DPDK only supports modern systems so hopefully this
562 * will continue to work
563 */
564struct saved_getopts {
565        char *optarg;
566        int optind;
567        int opterr;
568        int optopt;
569};
570
571static void save_getopts(struct saved_getopts *opts) {
572        opts->optarg = optarg;
573        opts->optind = optind;
574        opts->opterr = opterr;
575        opts->optopt = optopt;
576}
577
578static void restore_getopts(struct saved_getopts *opts) {
579        optarg = opts->optarg;
580        optind = opts->optind;
581        opterr = opts->opterr;
582        optopt = opts->optopt;
583}
584
585static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
586                                        char * err, int errlen) {
587    int ret; /* Returned error codes */
588    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
589    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
590    char mem_map[20] = {0}; /* The memory name */
591    long nb_cpu; /* The number of CPUs in the system */
592    long my_cpu; /* The CPU number we want to bind to */
593    int i;
594    struct rte_config *cfg = rte_eal_get_configuration();
595        struct saved_getopts save_opts;
596
597#if DEBUG
598    rte_set_log_level(RTE_LOG_DEBUG);
599#else
600    rte_set_log_level(RTE_LOG_WARNING);
601#endif
602    /*
603     * Using unique file prefixes mean separate memory is used, unlinking
604     * the two processes. However be careful we still cannot access a
605     * port that already in use.
606     *
607     * Using unique file prefixes mean separate memory is used, unlinking
608     * the two processes. However be careful we still cannot access a
609     * port that already in use.
610     */
611    char* argv[] = {"libtrace",
612                    "-c", cpu_number,
613                    "-n", "1",
614                    "--proc-type", "auto",
615                    "--file-prefix", mem_map,
616                    "-m", "256",
617#if DPDK_USE_LOG_LEVEL
618#       if DEBUG
619                    "--log-level", "8", /* RTE_LOG_DEBUG */
620#       else
621                    "--log-level", "5", /* RTE_LOG_WARNING */
622#       endif
623#endif
624                    NULL};
625    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
626
627    /* This initialises the Environment Abstraction Layer (EAL)
628     * If we had slave workers these are put into WAITING state
629     *
630     * Basically binds this thread to a fixed core, which we choose as
631     * the last core on the machine (assuming fewer interrupts mapped here).
632     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
633     * "-n" the number of memory channels into the CPU (hardware specific)
634     *      - Most likely to be half the number of ram slots in your machine.
635     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
636     * Controls where in memory packets are stored and should spread across
637     * the channels. We just use 1 to be safe.
638     */
639
640    /* Get the number of cpu cores in the system and use the last core
641     * on the correct numa node */
642    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
643    if (nb_cpu <= 0) {
644        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
645        nb_cpu = 1; /* fallback to the first core */
646    }
647    if (nb_cpu > RTE_MAX_LCORE)
648        nb_cpu = RTE_MAX_LCORE;
649
650    my_cpu = -1;
651    /* This allows the user to specify the core - we would try to do this
652     * automatically but it's hard to tell that this is secondary
653     * before running rte_eal_init(...). Currently we are limited to 1
654     * instance per core due to the way memory is allocated. */
655    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
656        snprintf(err, errlen, "Failed to parse URI");
657        return -1;
658    }
659
660#if HAVE_LIBNUMA
661        format_data->nic_numa_node = pci_to_numa(&use_addr);
662        if (my_cpu < 0) {
663                /* If we can assign to a core on the same numa node */
664                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
665                if(format_data->nic_numa_node >= 0) {
666                        int max_node_cpu = -1;
667                        struct bitmask *mask = numa_allocate_cpumask();
668                        assert(mask);
669                        numa_node_to_cpus(format_data->nic_numa_node, mask);
670                        for (i = 0 ; i < nb_cpu; ++i) {
671                                if (numa_bitmask_isbitset(mask,i))
672                                        max_node_cpu = i+1;
673                        }
674                        my_cpu = max_node_cpu;
675                }
676        }
677#endif
678        if (my_cpu < 0) {
679                my_cpu = nb_cpu;
680        }
681
682
683    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
684                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
685
686    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
687        snprintf(err, errlen,
688          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
689          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
690        return -1;
691    }
692
693    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
694       info older versions */
695    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
696    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
697
698#if !DPDK_USE_BLACKLIST
699    /* Black list all ports besides the one that we want to use */
700        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
701                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
702                         " are you sure the address is correct?: %s", strerror(-ret));
703                return -1;
704        }
705#endif
706
707        /* Give the memory map a unique name */
708        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
709    /* rte_eal_init it makes a call to getopt so we need to reset the
710     * global optind variable of getopt otherwise this fails */
711        save_getopts(&save_opts);
712    optind = 1;
713    if ((ret = rte_eal_init(argc, argv)) < 0) {
714        snprintf(err, errlen,
715          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
716        return -1;
717    }
718        restore_getopts(&save_opts);
719    // These are still running but will never do anything with DPDK v1.7 we
720    // should remove this XXX in the future
721    for(i = 0; i < RTE_MAX_LCORE; ++i) {
722            if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
723            cfg->lcore_role[i] = ROLE_OFF;
724            cfg->lcore_count--;
725        }
726    }
727    // Only the master should be running
728    assert(cfg->lcore_count == 1);
729
730    dpdk_move_master_lcore(my_cpu-1);
731
732#if DEBUG
733    dump_configuration();
734#endif
735
736#if DPDK_USE_PMD_INIT
737    /* This registers all available NICs with Intel DPDK
738     * These are not loaded until rte_eal_pci_probe() is called.
739     */
740    if ((ret = rte_pmd_init_all()) < 0) {
741        snprintf(err, errlen,
742          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
743        return -1;
744    }
745#endif
746
747#if DPDK_USE_BLACKLIST
748    /* Blacklist all ports besides the one that we want to use */
749        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
750                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
751                         " are you sure the address is correct?: %s", strerror(-ret));
752                return -1;
753        }
754#endif
755
756#if DPDK_USE_PCI_PROBE
757    /* This loads DPDK drivers against all ports that are not blacklisted */
758        if ((ret = rte_eal_pci_probe()) < 0) {
759        snprintf(err, errlen,
760            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
761        return -1;
762    }
763#endif
764
765    format_data->nb_ports = rte_eth_dev_count();
766
767    if (format_data->nb_ports != 1) {
768        snprintf(err, errlen,
769            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
770            format_data->nb_ports);
771        return -1;
772    }
773
774    struct rte_eth_dev_info dev_info;
775    rte_eth_dev_info_get(0, &dev_info);
776    fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
777                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
778
779    return 0;
780}
781
782static int dpdk_init_input (libtrace_t *libtrace) {
783    char err[500];
784    err[0] = 0;
785
786    libtrace->format_data = (struct dpdk_format_data_t *)
787                            malloc(sizeof(struct dpdk_format_data_t));
788    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
789    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
790    FORMAT(libtrace)->nb_ports = 0;
791    FORMAT(libtrace)->snaplen = 0; /* Use default */
792    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
793    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
794    FORMAT(libtrace)->nic_numa_node = -1;
795    FORMAT(libtrace)->promisc = -1;
796    FORMAT(libtrace)->pktmbuf_pool = NULL;
797#if DPDK_USE_BLACKLIST
798    FORMAT(libtrace)->nb_blacklist = 0;
799#endif
800    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
801    FORMAT(libtrace)->mempool_name[0] = 0;
802    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
803    FORMAT(libtrace)->burst_size = 0;
804    FORMAT(libtrace)->burst_offset = 0;
805
806    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
807        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
808        free(libtrace->format_data);
809        libtrace->format_data = NULL;
810        return -1;
811    }
812    return 0;
813};
814
815static int dpdk_init_output(libtrace_out_t *libtrace)
816{
817    char err[500];
818    err[0] = 0;
819
820    libtrace->format_data = (struct dpdk_format_data_t *)
821                            malloc(sizeof(struct dpdk_format_data_t));
822    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
823    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
824    FORMAT(libtrace)->nb_ports = 0;
825    FORMAT(libtrace)->snaplen = 0; /* Use default */
826    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
827    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
828    FORMAT(libtrace)->nic_numa_node = -1;
829    FORMAT(libtrace)->promisc = -1;
830    FORMAT(libtrace)->pktmbuf_pool = NULL;
831#if DPDK_USE_BLACKLIST
832    FORMAT(libtrace)->nb_blacklist = 0;
833#endif
834    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
835    FORMAT(libtrace)->mempool_name[0] = 0;
836    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
837    FORMAT(libtrace)->burst_size = 0;
838    FORMAT(libtrace)->burst_offset = 0;
839
840    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
841        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
842        free(libtrace->format_data);
843        libtrace->format_data = NULL;
844        return -1;
845    }
846    return 0;
847};
848
849static int dpdk_pconfig_input (libtrace_t *libtrace,
850                                trace_parallel_option_t option,
851                                void *data) {
852        switch (option) {
853                case TRACE_OPTION_SET_HASHER:
854                        switch (*((enum hasher_types *) data))
855                        {
856                                case HASHER_BALANCE:
857                                case HASHER_UNIDIRECTIONAL:
858                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
859                                        return 0;
860                                case HASHER_BIDIRECTIONAL:
861                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
862                                        return 0;
863                                case HASHER_HARDWARE:
864                                case HASHER_CUSTOM:
865                                        // We don't support these
866                                        return -1;
867                        }
868        break;
869        }
870        return -1;
871}
872/**
873 * Note here snaplen excludes the MAC checksum. Packets over
874 * the requested snaplen will be dropped. (Excluding MAC checksum)
875 *
876 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
877 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
878 * is set the maximum size of the returned packet would be 1518 otherwise
879 * 1514 would be the largest size possibly returned.
880 *
881 */
882static int dpdk_config_input (libtrace_t *libtrace,
883                                        trace_option_t option,
884                                        void *data) {
885    switch (option) {
886        case TRACE_OPTION_SNAPLEN:
887            /* Only support changing snaplen before a call to start is
888             * made */
889            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
890                FORMAT(libtrace)->snaplen=*(int*)data;
891            else
892                return -1;
893            return 0;
894                case TRACE_OPTION_PROMISC:
895                        FORMAT(libtrace)->promisc=*(int*)data;
896            return 0;
897        case TRACE_OPTION_FILTER:
898            /* TODO filtering */
899            break;
900        case TRACE_OPTION_META_FREQ:
901            break;
902        case TRACE_OPTION_EVENT_REALTIME:
903            break;
904        /* Avoid default: so that future options will cause a warning
905         * here to remind us to implement it, or flag it as
906         * unimplementable
907         */
908    }
909
910        /* Don't set an error - trace_config will try to deal with the
911         * option and will set an error if it fails */
912    return -1;
913}
914
915/* Can set jumbo frames/ or limit the size of a frame by setting both
916 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
917 *
918 */
919static struct rte_eth_conf port_conf = {
920        .rxmode = {
921                .mq_mode = ETH_RSS,
922                .split_hdr_size = 0,
923                .header_split   = 0, /**< Header Split disabled */
924                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
925                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
926                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
927                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
928#if GET_MAC_CRC_CHECKSUM
929/* So it appears that if hw_strip_crc is turned off the driver will still
930 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
931 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
932 * So lets just add it back on when we receive the packet.
933 */
934                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
935#else
936/* By default strip the MAC checksum because it's a bit of a hack to
937 * actually read these. And don't want to rely on disabling this to actualy
938 * always cut off the checksum in the future
939 */
940                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
941#endif
942        },
943        .txmode = {
944                .mq_mode = ETH_DCB_NONE,
945        },
946        .rx_adv_conf = {
947                .rss_conf = {
948                        // .rss_key = &rss_key, // We set this per format
949                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
950                },
951        },
952        .intr_conf = {
953                .lsc = 1
954        }
955};
956
957static const struct rte_eth_rxconf rx_conf = {
958        .rx_thresh = {
959                .pthresh = 8,/* RX_PTHRESH prefetch */
960                .hthresh = 8,/* RX_HTHRESH host */
961                .wthresh = 4,/* RX_WTHRESH writeback */
962        },
963    .rx_free_thresh = 0,
964    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
965};
966
967static const struct rte_eth_txconf tx_conf = {
968        .tx_thresh = {
969        /**
970         * TX_PTHRESH prefetch
971         * Set on the NIC, if the number of unprocessed descriptors to queued on
972         * the card fall below this try grab at least hthresh more unprocessed
973         * descriptors.
974         */
975                .pthresh = 36,
976
977        /* TX_HTHRESH host
978         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
979         */
980                .hthresh = 0,
981
982        /* TX_WTHRESH writeback
983         * Set on the NIC, the number of sent descriptors before writing back
984         * status to confirm the transmission. This is done more efficiently as
985         * a bulk DMA-transfer rather than writing one at a time.
986         * Similar to tx_free_thresh however this is applied to the NIC, where
987         * as tx_free_thresh is when DPDK will check these. This is extended
988         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
989         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
990         */
991                .wthresh = 4,
992        },
993
994    /* Used internally by DPDK rather than passed to the NIC. The number of
995     * packet descriptors to send before checking for any responses written
996     * back (to confirm the transmission). Default = 32 if set to 0)
997     */
998        .tx_free_thresh = 0,
999
1000    /* This is the Report Status threshold, used by 10Gbit cards,
1001     * This signals the card to only write back status (such as
1002     * transmission successful) after this minimum number of transmit
1003     * descriptors are seen. The default is 32 (if set to 0) however if set
1004     * to greater than 1 TX wthresh must be set to zero, because this is kindof
1005     * a replacement. See the dpdk programmers guide for more restrictions.
1006     */
1007        .tx_rs_thresh = 1,
1008};
1009
1010/**
1011 * A callback for a link state change (LSC).
1012 *
1013 * Packets may be received before this notification. In fact the DPDK IGXBE
1014 * driver likes to put a delay upto 5sec before sending this.
1015 *
1016 * We use this to ensure the link speed is correct for our timestamp
1017 * calculations. Because packets might be received before the link up we still
1018 * update this when the packet is received.
1019 *
1020 * @param port The DPDK port
1021 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1022 * @param cb_arg The dpdk_format_data_t structure associated with the format
1023 */
1024static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1025                              void *cb_arg) {
1026        struct dpdk_format_data_t * format_data = cb_arg;
1027        struct rte_eth_link link_info;
1028        assert(event == RTE_ETH_EVENT_INTR_LSC);
1029        assert(port == format_data->port);
1030
1031        rte_eth_link_get_nowait(port, &link_info);
1032
1033        if (link_info.link_status)
1034                format_data->link_speed = link_info.link_speed;
1035        else
1036                format_data->link_speed = 0;
1037
1038#if DEBUG
1039        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1040                link_info.link_status ? "up" : "down",
1041                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1042                                          "full-duplex" : "half-duplex",
1043                (int) link_info.link_speed);
1044#endif
1045
1046        /* Turns out DPDK drivers might not come back up if the link speed
1047         * changes. So we reset the autoneg procedure. This is very unsafe
1048         * we have have threads reading packets and we stop the port. */
1049#if 0
1050        if (!link_info.link_status) {
1051                int ret;
1052                rte_eth_dev_stop(port);
1053                ret = rte_eth_dev_start(port);
1054                if (ret < 0) {
1055                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1056                                strerror(-ret));
1057                }
1058        }
1059#endif
1060}
1061
1062/* Attach memory to the port and start the port or restart the port.
1063 */
1064static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
1065    int ret; /* Check return values for errors */
1066    struct rte_eth_link link_info; /* Wait for link */
1067    unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id());
1068
1069    /* Already started */
1070    if (format_data->paused == DPDK_RUNNING)
1071        return 0;
1072
1073    /* First time started we need to alloc our memory, doing this here
1074     * rather than in environment setup because we don't have snaplen then */
1075    if (format_data->paused == DPDK_NEVER_STARTED) {
1076        if (format_data->snaplen == 0) {
1077            format_data->snaplen = RX_MBUF_SIZE;
1078            port_conf.rxmode.jumbo_frame = 0;
1079            port_conf.rxmode.max_rx_pkt_len = 0;
1080        } else {
1081            /* Use jumbo frames */
1082            port_conf.rxmode.jumbo_frame = 1;
1083            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1084        }
1085
1086        /* This is additional overhead so make sure we allow space for this */
1087#if GET_MAC_CRC_CHECKSUM
1088        format_data->snaplen += ETHER_CRC_LEN;
1089#endif
1090#if HAS_HW_TIMESTAMPS_82580
1091        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1092#endif
1093
1094        /* Create the mbuf pool, which is the place our packets are allocated
1095         * from - TODO figure out if there is is a free function (I cannot see one)
1096         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1097         * allocate however that extra 1 packet is not used.
1098         * (I assume <= vs < error some where in DPDK code)
1099         * TX requires nb_tx_buffers + 1 in the case the queue is full
1100         * so that will fill the new buffer and wait until slots in the
1101         * ring become available.
1102         */
1103#if DEBUG
1104    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1105#endif
1106    format_data->pktmbuf_pool =
1107            rte_mempool_create(format_data->mempool_name,
1108                       (format_data->nb_rx_buf + format_data->nb_tx_buf + 1),
1109                       format_data->snaplen + sizeof(struct rte_mbuf)
1110                                        + RTE_PKTMBUF_HEADROOM,
1111                       128, sizeof(struct rte_pktmbuf_pool_private),
1112                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1113                       cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/);
1114
1115    if (format_data->pktmbuf_pool == NULL) {
1116            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"
1117                        "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node);
1118            return -1;
1119        }
1120    }
1121
1122    /* ----------- Now do the setup for the port mapping ------------ */
1123    /* Order of calls must be
1124     * rte_eth_dev_configure()
1125     * rte_eth_tx_queue_setup()
1126     * rte_eth_rx_queue_setup()
1127     * rte_eth_dev_start()
1128     * other rte_eth calls
1129     */
1130
1131
1132    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1133
1134    /* This must be called first before another *eth* function
1135     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1136    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
1137    if (ret < 0) {
1138        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1139                            " %"PRIu8" : %s", format_data->port,
1140                            strerror(-ret));
1141        return -1;
1142    }
1143    /* Initialise the TX queue a minimum value if using this port for
1144     * receiving. Otherwise a larger size if writing packets.
1145     */
1146    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1147                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1148    if (ret < 0) {
1149        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1150                            " %"PRIu8" : %s", format_data->port,
1151                            strerror(-ret));
1152        return -1;
1153    }
1154    /* Initialise the RX queue with some packets from memory */
1155    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
1156                                 format_data->nb_rx_buf, cpu_numa_node,
1157                                 &rx_conf, format_data->pktmbuf_pool);
1158    if (ret < 0) {
1159        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1160                    " %"PRIu8" : %s", format_data->port,
1161                    strerror(-ret));
1162        return -1;
1163    }
1164
1165    /* Start device */
1166    ret = rte_eth_dev_start(format_data->port);
1167    if (ret < 0) {
1168        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1169                    strerror(-ret));
1170        return -1;
1171    }
1172
1173    /* Default promiscuous to on */
1174    if (format_data->promisc == -1)
1175        format_data->promisc = 1;
1176
1177    if (format_data->promisc == 1)
1178        rte_eth_promiscuous_enable(format_data->port);
1179    else
1180        rte_eth_promiscuous_disable(format_data->port);
1181
1182        /* Register a callback for link state changes */
1183        ret = rte_eth_dev_callback_register(format_data->port,
1184                                            RTE_ETH_EVENT_INTR_LSC,
1185                                            dpdk_lsc_callback,
1186                                            format_data);
1187        /* If this fails it is not a show stopper */
1188#if DEBUG
1189        fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1190                ret, strerror(-ret));
1191#endif
1192
1193    /* Get the current link status */
1194    rte_eth_link_get_nowait(format_data->port, &link_info);
1195    format_data->link_speed = link_info.link_speed;
1196#if DEBUG
1197    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1198            (int) link_info.link_duplex, (int) link_info.link_speed);
1199#endif
1200    /* We have now successfully started/unpaused */
1201    format_data->paused = DPDK_RUNNING;
1202
1203    return 0;
1204}
1205
1206/* Attach memory to the port and start (or restart) the port/s.
1207 */
1208static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) {
1209    int ret, i; /* Check return values for errors */
1210    struct rte_eth_link link_info; /* Wait for link */
1211
1212    /* Already started */
1213    if (format_data->paused == DPDK_RUNNING)
1214        return 0;
1215
1216    /* First time started we need to alloc our memory, doing this here
1217     * rather than in environment setup because we don't have snaplen then */
1218    if (format_data->paused == DPDK_NEVER_STARTED) {
1219        if (format_data->snaplen == 0) {
1220            format_data->snaplen = RX_MBUF_SIZE;
1221            port_conf.rxmode.jumbo_frame = 0;
1222            port_conf.rxmode.max_rx_pkt_len = 0;
1223        } else {
1224            /* Use jumbo frames */
1225            port_conf.rxmode.jumbo_frame = 1;
1226            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1227        }
1228
1229        /* This is additional overhead so make sure we allow space for this */
1230#if GET_MAC_CRC_CHECKSUM
1231        format_data->snaplen += ETHER_CRC_LEN;
1232#endif
1233#if HAS_HW_TIMESTAMPS_82580
1234        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1235#endif
1236
1237        /* Create the mbuf pool, which is the place our packets are allocated
1238         * from - TODO figure out if there is a free function (I cannot see one)
1239         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1240         * allocate however that extra 1 packet is not used.
1241         * (I assume <= vs < error some where in DPDK code)
1242         * TX requires nb_tx_buffers + 1 in the case the queue is full
1243         * so that will fill the new buffer and wait until slots in the
1244         * ring become available.
1245         */
1246#if DEBUG
1247    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1248#endif
1249    format_data->pktmbuf_pool =
1250            rte_mempool_create(format_data->mempool_name,
1251                       (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
1252                       format_data->snaplen + sizeof(struct rte_mbuf)
1253                                        + RTE_PKTMBUF_HEADROOM,
1254                       128, sizeof(struct rte_pktmbuf_pool_private),
1255                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1256                       format_data->nic_numa_node, 0);
1257
1258        if (format_data->pktmbuf_pool == NULL) {
1259            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1260                        "pool failed: %s", strerror(rte_errno));
1261            return -1;
1262        }
1263    }
1264
1265    /* ----------- Now do the setup for the port mapping ------------ */
1266    /* Order of calls must be
1267     * rte_eth_dev_configure()
1268     * rte_eth_tx_queue_setup()
1269     * rte_eth_rx_queue_setup()
1270     * rte_eth_dev_start()
1271     * other rte_eth calls
1272     */
1273
1274    /* This must be called first before another *eth* function
1275     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1276    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1277    if (ret < 0) {
1278        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1279                            " %"PRIu8" : %s", format_data->port,
1280                            strerror(-ret));
1281        return -1;
1282    }
1283#if DEBUG
1284    fprintf(stderr, "Doing dev configure\n");
1285#endif
1286    /* Initialise the TX queue a minimum value if using this port for
1287     * receiving. Otherwise a larger size if writing packets.
1288     */
1289    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1290                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1291    if (ret < 0) {
1292        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1293                            " %"PRIu8" : %s", format_data->port,
1294                            strerror(-ret));
1295        return -1;
1296    }
1297
1298    for (i=0; i < rx_queues; i++) {
1299#if DEBUG
1300    fprintf(stderr, "Doing queue configure\n");
1301#endif
1302
1303                /* Initialise the RX queue with some packets from memory */
1304                ret = rte_eth_rx_queue_setup(format_data->port, i,
1305                                             format_data->nb_rx_buf, format_data->nic_numa_node,
1306                                             &rx_conf, format_data->pktmbuf_pool);
1307        /* Init per_thread data structures */
1308        format_data->per_lcore[i].port = format_data->port;
1309        format_data->per_lcore[i].queue_id = i;
1310
1311                if (ret < 0) {
1312                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1313                                                " %"PRIu8" : %s", format_data->port,
1314                                                strerror(-ret));
1315                        return -1;
1316                }
1317        }
1318
1319#if DEBUG
1320    fprintf(stderr, "Doing start device\n");
1321#endif
1322    /* Start device */
1323    ret = rte_eth_dev_start(format_data->port);
1324#if DEBUG
1325    fprintf(stderr, "Done start device\n");
1326#endif
1327    if (ret < 0) {
1328        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1329                    strerror(-ret));
1330        return -1;
1331    }
1332
1333
1334    /* Default promiscuous to on */
1335    if (format_data->promisc == -1)
1336        format_data->promisc = 1;
1337
1338    if (format_data->promisc == 1)
1339        rte_eth_promiscuous_enable(format_data->port);
1340    else
1341        rte_eth_promiscuous_disable(format_data->port);
1342
1343
1344    /* We have now successfully started/unpased */
1345    format_data->paused = DPDK_RUNNING;
1346
1347    // Can use remote launch for all
1348    /*RTE_LCORE_FOREACH_SLAVE(i) {
1349                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
1350        }*/
1351
1352    /* Register a callback for link state changes */
1353    ret = rte_eth_dev_callback_register(format_data->port,
1354                                        RTE_ETH_EVENT_INTR_LSC,
1355                                        dpdk_lsc_callback,
1356                                        format_data);
1357    /* If this fails it is not a show stopper */
1358#if DEBUG
1359    fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1360            ret, strerror(-ret));
1361#endif
1362
1363    /* Get the current link status */
1364    rte_eth_link_get_nowait(format_data->port, &link_info);
1365    format_data->link_speed = link_info.link_speed;
1366#if DEBUG
1367    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1368            (int) link_info.link_duplex, (int) link_info.link_speed);
1369#endif
1370
1371    return 0;
1372}
1373
1374static int dpdk_start_input (libtrace_t *libtrace) {
1375    char err[500];
1376    err[0] = 0;
1377
1378    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1379        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1380        free(libtrace->format_data);
1381        libtrace->format_data = NULL;
1382        return -1;
1383    }
1384    return 0;
1385}
1386
1387static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1388    struct rte_eth_dev_info dev_info;
1389    rte_eth_dev_info_get(port_id, &dev_info);
1390    return dev_info.max_rx_queues;
1391}
1392
1393static inline size_t dpdk_processor_count () {
1394    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1395    if (nb_cpu <= 0)
1396        return 1;
1397    else
1398        return (size_t) nb_cpu;
1399}
1400
1401static int dpdk_pstart_input (libtrace_t *libtrace) {
1402    char err[500];
1403    int i=0, phys_cores=0;
1404    int tot = libtrace->perpkt_thread_count;
1405    err[0] = 0;
1406
1407    if (rte_lcore_id() != rte_get_master_lcore())
1408        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
1409
1410    // If the master is not on the last thread we move it there
1411    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1412        // Consider error handling here
1413        dpdk_move_master_lcore(RTE_MAX_LCORE - 1);
1414    }
1415
1416    // Don't exceed the number of cores in the system/detected by dpdk
1417    // We don't have to force this but performance wont be good if we don't
1418    for (i = 0; i < RTE_MAX_LCORE; ++i) {
1419        if (lcore_config[i].detected) {
1420            if (rte_lcore_is_enabled(i))
1421                fprintf(stderr, "Found core %d already in use!\n", i);
1422            else
1423                phys_cores++;
1424        }
1425    }
1426
1427        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1428        tot = MIN(tot, phys_cores);
1429
1430        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
1431
1432    if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1433        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1434        free(libtrace->format_data);
1435        libtrace->format_data = NULL;
1436        return -1;
1437    }
1438
1439    // Make sure we only start the number that we should
1440    libtrace->perpkt_thread_count = tot;
1441    return 0;
1442}
1443
1444
1445/**
1446 * Register a thread with the DPDK system,
1447 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1448 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1449 * gives it.
1450 *
1451 * We then allow a mapper thread to be started on every real core as DPDK would,
1452 * we also bind these to the corresponding CPU cores.
1453 *
1454 * @param libtrace A pointer to the trace
1455 * @param reading True if the thread will be used to read packets, i.e. will
1456 *                call pread_packet(), false if thread used to process packet
1457 *                in any other manner including statistics functions.
1458 */
1459static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1460{
1461    struct rte_config *cfg = rte_eal_get_configuration();
1462    int i;
1463    int new_id = -1;
1464
1465    // If 'reading packets' fill in cores from 0 up and bind affinity
1466    // otherwise start from the MAX core (which is also the master) and work backwards
1467    // in this case physical cores on the system will not exist so we don't bind
1468    // these to any particular physical core
1469    pthread_mutex_lock(&libtrace->libtrace_lock);
1470    if (reading) {
1471#if HAVE_LIBNUMA
1472        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1473                if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
1474                                new_id = i;
1475                        if (!lcore_config[i].detected)
1476                                new_id = -1;
1477                        break;
1478                }
1479        }
1480#endif
1481        /* Retry without the the numa restriction */
1482        if (new_id == -1) {
1483                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1484                                if (!rte_lcore_is_enabled(i)) {
1485                                        new_id = i;
1486                                if (!lcore_config[i].detected)
1487                                        fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
1488                                break;
1489                        }
1490                }
1491        }
1492    } else {
1493        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1494            if (!rte_lcore_is_enabled(i)) {
1495                new_id = i;
1496                break;
1497            }
1498        }
1499    }
1500
1501    if (new_id == -1) {
1502        assert(cfg->lcore_count == RTE_MAX_LCORE);
1503        // TODO proper libtrace style error here!!
1504        fprintf(stderr, "Too many threads for DPDK!!\n");
1505        pthread_mutex_unlock(&libtrace->libtrace_lock);
1506        return -1;
1507    }
1508
1509    // Enable the core in global DPDK structs
1510    cfg->lcore_role[new_id] = ROLE_RTE;
1511    cfg->lcore_count++;
1512    // Set TLS to reflect our new number
1513    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
1514    fprintf(stderr, "original id%d", rte_lcore_id());
1515    RTE_PER_LCORE(_lcore_id) = new_id;
1516        char name[99];
1517        pthread_getname_np(pthread_self(),
1518                              name, sizeof(name));
1519
1520    fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
1521
1522    if (reading) {
1523        // Set affinity bind to corresponding core
1524        cpu_set_t cpuset;
1525        CPU_ZERO(&cpuset);
1526        CPU_SET(rte_lcore_id(), &cpuset);
1527        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1528        if (i != 0) {
1529            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
1530            pthread_mutex_unlock(&libtrace->libtrace_lock);
1531            return -1;
1532        }
1533    }
1534
1535    // Map our TLS to the thread data
1536    if (reading) {
1537        if(t->type == THREAD_PERPKT) {
1538            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
1539        } else {
1540            t->format_data = &FORMAT(libtrace)->per_lcore[0];
1541        }
1542    }
1543    pthread_mutex_unlock(&libtrace->libtrace_lock);
1544    return 0;
1545}
1546
1547
1548/**
1549 * Unregister a thread with the DPDK system.
1550 *
1551 * Only previously registered threads should be calling this just before
1552 * they are destroyed.
1553 */
1554static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1555{
1556    struct rte_config *cfg = rte_eal_get_configuration();
1557
1558    assert(rte_lcore_id() < RTE_MAX_LCORE);
1559    pthread_mutex_lock(&libtrace->libtrace_lock);
1560    // Skip if master!!
1561    if (rte_lcore_id() == rte_get_master_lcore()) {
1562        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1563        pthread_mutex_unlock(&libtrace->libtrace_lock);
1564        return;
1565    }
1566
1567    // Disable this core in global DPDK structs
1568    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1569    cfg->lcore_count--;
1570    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1571    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1572    pthread_mutex_unlock(&libtrace->libtrace_lock);
1573    return;
1574}
1575
1576static int dpdk_start_output(libtrace_out_t *libtrace)
1577{
1578    char err[500];
1579    err[0] = 0;
1580
1581    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1582        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1583        free(libtrace->format_data);
1584        libtrace->format_data = NULL;
1585        return -1;
1586    }
1587    return 0;
1588}
1589
1590static int dpdk_pause_input(libtrace_t * libtrace) {
1591    /* This stops the device, but can be restarted using rte_eth_dev_start() */
1592    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1593#if DEBUG
1594        fprintf(stderr, "Pausing DPDK port\n");
1595#endif
1596        rte_eth_dev_stop(FORMAT(libtrace)->port);
1597        FORMAT(libtrace)->paused = DPDK_PAUSED;
1598        /* Empty the queue of packets */
1599        for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1600                rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1601        }
1602        FORMAT(libtrace)->burst_offset = 0;
1603        FORMAT(libtrace)->burst_size = 0;
1604        /* If we pause it the driver will be reset and likely our counter */
1605
1606        FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0;
1607#if HAS_HW_TIMESTAMPS_82580
1608        FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0;
1609#endif
1610    }
1611    return 0;
1612}
1613
1614static int dpdk_write_packet(libtrace_out_t *trace,
1615                libtrace_packet_t *packet){
1616    struct rte_mbuf* m_buff[1];
1617
1618    int wirelen = trace_get_wire_length(packet);
1619    int caplen = trace_get_capture_length(packet);
1620
1621    /* Check for a checksum and remove it */
1622    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1623                                            wirelen == caplen)
1624        caplen -= ETHER_CRC_LEN;
1625
1626    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1627    if (m_buff[0] == NULL) {
1628        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1629        return -1;
1630    } else {
1631        int ret;
1632        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1633        do {
1634            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1635        } while (ret != 1);
1636    }
1637
1638    return 0;
1639}
1640
1641static int dpdk_fin_input(libtrace_t * libtrace) {
1642    /* Free our memory structures */
1643    if (libtrace->format_data != NULL) {
1644        /* Close the device completely, device cannot be restarted */
1645        if (FORMAT(libtrace)->port != 0xFF)
1646                rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1647                                                RTE_ETH_EVENT_INTR_LSC,
1648                                                dpdk_lsc_callback,
1649                                                FORMAT(libtrace));
1650                rte_eth_dev_close(FORMAT(libtrace)->port);
1651                /* filter here if we used it */
1652                free(libtrace->format_data);
1653        }
1654
1655    /* Revert to the original PCI drivers */
1656    /* No longer in DPDK
1657    rte_eal_pci_exit(); */
1658    return 0;
1659}
1660
1661
1662static int dpdk_fin_output(libtrace_out_t * libtrace) {
1663    /* Free our memory structures */
1664    if (libtrace->format_data != NULL) {
1665        /* Close the device completely, device cannot be restarted */
1666        if (FORMAT(libtrace)->port != 0xFF)
1667            rte_eth_dev_close(FORMAT(libtrace)->port);
1668        /* filter here if we used it */
1669                free(libtrace->format_data);
1670        }
1671
1672    /* Revert to the original PCI drivers */
1673    /* No longer in DPDK
1674    rte_eal_pci_exit(); */
1675    return 0;
1676}
1677
1678/**
1679 * Get the start of the additional header that we added to a packet.
1680 */
1681static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1682    assert(packet);
1683    assert(packet->buffer);
1684    /* Our header sits straight after the mbuf header */
1685    return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1686}
1687
1688static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1689    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1690    return hdr->cap_len;
1691}
1692
1693static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1694    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1695    if (size > hdr->cap_len) {
1696        /* Cannot make a packet bigger */
1697                return trace_get_capture_length(packet);
1698        }
1699
1700    /* Reset the cached capture length first*/
1701    packet->capture_length = -1;
1702    hdr->cap_len = (uint32_t) size;
1703        return trace_get_capture_length(packet);
1704}
1705
1706static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1707    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1708    int org_cap_size; /* The original capture size */
1709    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1710        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1711                            sizeof(struct hw_timestamp_82580);
1712    } else {
1713        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1714    }
1715    if (hdr->flags & INCLUDES_CHECKSUM) {
1716        return org_cap_size;
1717    } else {
1718        /* DPDK packets are always TRACE_TYPE_ETH packets */
1719        return org_cap_size + ETHER_CRC_LEN;
1720    }
1721}
1722static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1723    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1724    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1725        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1726                sizeof(struct hw_timestamp_82580);
1727    else
1728        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1729}
1730
1731static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1732                libtrace_packet_t *packet, void *buffer,
1733                libtrace_rt_types_t rt_type, uint32_t flags) {
1734    assert(packet);
1735    if (packet->buffer != buffer &&
1736        packet->buf_control == TRACE_CTRL_PACKET) {
1737        free(packet->buffer);
1738    }
1739
1740    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1741        packet->buf_control = TRACE_CTRL_PACKET;
1742    } else
1743        packet->buf_control = TRACE_CTRL_EXTERNAL;
1744
1745    packet->buffer = buffer;
1746    packet->header = buffer;
1747
1748    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1749    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1750    packet->type = rt_type;
1751    return 0;
1752}
1753
1754
1755/**
1756 * Given a packet size and a link speed, computes the
1757 * time to transmit in nanoseconds.
1758 *
1759 * @param format_data The dpdk format data from which we get the link speed
1760 *        and if unset updates it in a thread safe manner
1761 * @param pkt_size The size of the packet in bytes
1762 * @return The wire time in nanoseconds
1763 */
1764static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1765        uint32_t wire_time;
1766        /* 20 extra bytes of interframe gap and preamble */
1767# if GET_MAC_CRC_CHECKSUM
1768        wire_time = ((pkt_size + 20) * 8000);
1769# else
1770        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1771# endif
1772
1773        /* Division is really slow and introduces a pipeline stall
1774         * The compiler will optimise this into magical multiplication and shifting
1775         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1776         */
1777retry_calc_wiretime:
1778        switch (format_data->link_speed) {
1779        case ETH_LINK_SPEED_40G:
1780                wire_time /=  ETH_LINK_SPEED_40G;
1781                break;
1782        case ETH_LINK_SPEED_20G:
1783                wire_time /= ETH_LINK_SPEED_20G;
1784                break;
1785        case ETH_LINK_SPEED_10G:
1786                wire_time /= ETH_LINK_SPEED_10G;
1787                break;
1788        case ETH_LINK_SPEED_1000:
1789                wire_time /= ETH_LINK_SPEED_1000;
1790                break;
1791        case 0:
1792                {
1793                /* Maybe the link was down originally, but now it should be up */
1794                struct rte_eth_link link = {0};
1795                rte_eth_link_get_nowait(format_data->port, &link);
1796                if (link.link_status && link.link_speed) {
1797                        format_data->link_speed = link.link_speed;
1798#ifdef DEBUG
1799                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1800#endif
1801                        goto retry_calc_wiretime;
1802                }
1803                /* We don't know the link speed, make sure numbers are counting up */
1804                wire_time = 1;
1805                break;
1806                }
1807        default:
1808                wire_time /= format_data->link_speed;
1809        }
1810        return wire_time;
1811}
1812
1813
1814
1815/*
1816 * Does any extra preperation to all captured packets
1817 * This includes adding our extra header to it with the timestamp,
1818 * and any snapping
1819 *
1820 * @param format_data The DPDK format data
1821 * @param plc The DPDK per lcore format data
1822 * @param pkts An array of size nb_pkts of DPDK packets
1823 * @param nb_pkts The number of packets in pkts and optionally packets
1824 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
1825 */
1826static inline void dpdk_ready_pkts(libtrace_t *libtrace, struct dpdk_per_lcore_t *plc,
1827                                   struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
1828        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
1829        struct dpdk_addt_hdr *hdr;
1830        size_t i;
1831        uint64_t cur_sys_time_ns;
1832#if HAS_HW_TIMESTAMPS_82580
1833        struct hw_timestamp_82580 *hw_ts;
1834        uint64_t estimated_wraps;
1835#else
1836
1837#endif
1838
1839#if USE_CLOCK_GETTIME
1840        struct timespec cur_sys_time = {0};
1841        /* This looks terrible and I feel bad doing it. But it's OK
1842         * on new kernels, because this is a fast vsyscall */
1843        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1844        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1845#else
1846        struct timeval cur_sys_time = {0};
1847        /* Also a fast vsyscall */
1848        gettimeofday(&cur_sys_time, NULL);
1849        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1850#endif
1851
1852        /* The system clock is not perfect so when running
1853         * at linerate we could timestamp a packet in the past.
1854         * To avoid this we munge the timestamp to appear 1ns
1855         * after the previous packet. We should eventually catch up
1856         * to system time since a 64byte packet on a 10G link takes 67ns.
1857         *
1858         * Note with parallel readers timestamping packets
1859         * with duplicate stamps or out of order is unavoidable without
1860         * hardware timestamping from the NIC.
1861         */
1862#if !HAS_HW_TIMESTAMPS_82580
1863        if (plc->ts_last_sys >= cur_sys_time_ns) {
1864                cur_sys_time_ns = plc->ts_last_sys + 1;
1865        }
1866#endif
1867
1868        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
1869        for (i = 0 ; i < nb_pkts ; ++i) {
1870
1871                /* We put our header straight after the dpdk header */
1872                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1873                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1874
1875#if GET_MAC_CRC_CHECKSUM
1876                /* Add back in the CRC sum */
1877                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1878                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1879                hdr->flags |= INCLUDES_CHECKSUM;
1880#endif
1881
1882                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1883
1884#if HAS_HW_TIMESTAMPS_82580
1885                /* The timestamp is sitting before our packet and is included in pkt_len */
1886                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1887                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1888                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1889
1890                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1891                 *
1892                 *        +----------+---+   +--------------+
1893                 *  82580 |    24    | 8 |   |      32      |
1894                 *        +----------+---+   +--------------+
1895                 *          reserved  \______ 40 bits _____/
1896                 *
1897                 * The 40 bit 82580 SYSTIM overflows every
1898                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1899                 *
1900                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1901                 * Endian (for the full 64 bits) i.e. picture is mirrored
1902                 */
1903
1904                /* Despite what the documentation says this is in Little
1905                 * Endian byteorder. Mask the reserved section out.
1906                 */
1907                hdr->timestamp = le64toh(hw_ts->timestamp) &
1908                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1909
1910                if (unlikely(plc->ts_first_sys == 0)) {
1911                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1912                        plc->ts_last_sys = plc->ts_first_sys;
1913                }
1914
1915                /* This will have serious problems if packets aren't read quickly
1916                 * that is within a couple of seconds because our clock cycles every
1917                 * 18 seconds */
1918                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1919                                  / (1ull<<TS_NBITS_82580);
1920
1921                /* Estimated_wraps gives the number of times the counter should have
1922                 * wrapped (however depending on value last time it could have wrapped
1923                 * twice more (if hw clock is close to its max value) or once less (allowing
1924                 * for a bit of variance between hw and sys clock). But if the clock
1925                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1926                if (unlikely(estimated_wraps >= 2)) {
1927                        /* 2 or more wrap arounds add all but the very last wrap */
1928                        plc->wrap_count += estimated_wraps - 1;
1929                }
1930
1931                /* Set the timestamp to the lowest possible value we're considering */
1932                hdr->timestamp += plc->ts_first_sys +
1933                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1934
1935                /* In most runs only the first if() will need evaluating - i.e our
1936                 * estimate is correct. */
1937                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1938                                              hdr->timestamp, MAXSKEW_82580))) {
1939                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1940                        plc->wrap_count++;
1941                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1942                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1943                                             hdr->timestamp, MAXSKEW_82580)) {
1944                                /* Failed to match estimated_wraps */
1945                                plc->wrap_count++;
1946                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1947                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1948                                                     hdr->timestamp, MAXSKEW_82580)) {
1949                                        if (estimated_wraps == 0) {
1950                                                /* 0 case Failed to match estimated_wraps+2 */
1951                                                printf("WARNING - Hardware Timestamp failed to"
1952                                                       " match using systemtime!\n");
1953                                                hdr->timestamp = cur_sys_time_ns;
1954                                        } else {
1955                                                /* Failed to match estimated_wraps+1 */
1956                                                plc->wrap_count++;
1957                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1958                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1959                                                                     hdr->timestamp, MAXSKEW_82580)) {
1960                                                        /* Failed to match estimated_wraps+2 */
1961                                                        printf("WARNING - Hardware Timestamp failed to"
1962                                                               " match using systemtime!!\n");
1963                                                }
1964                                        }
1965                                }
1966                        }
1967                }
1968#else
1969
1970                hdr->timestamp = cur_sys_time_ns;
1971                /* Offset the next packet by the wire time of previous */
1972                calculate_wire_time(format_data, hdr->cap_len);
1973
1974#endif
1975                if(packets) {
1976                        packets[i]->buffer = pkts[i];
1977                        packets[i]->header = pkts[i];
1978                        packets[i]->trace = libtrace;
1979#if HAS_HW_TIMESTAMPS_82580
1980                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
1981                                              RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580);
1982#else
1983                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
1984                                              RTE_PKTMBUF_HEADROOM;
1985#endif
1986                        packets[i]->error = 1;
1987                }
1988        }
1989
1990        plc->ts_last_sys = cur_sys_time_ns;
1991
1992        return;
1993}
1994
1995
1996static void dpdk_fin_packet(libtrace_packet_t *packet)
1997{
1998        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
1999                rte_pktmbuf_free(packet->buffer);
2000                packet->buffer = NULL;
2001        }
2002}
2003
2004
2005static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2006    int nb_rx; /* Number of rx packets we've received */
2007
2008    /* Free the last packet buffer */
2009    if (packet->buffer != NULL) {
2010        /* Buffer is owned by DPDK */
2011        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2012            rte_pktmbuf_free(packet->buffer);
2013            packet->buffer = NULL;
2014        } else
2015        /* Buffer is owned by packet i.e. has been malloc'd */
2016        if (packet->buf_control == TRACE_CTRL_PACKET) {
2017            free(packet->buffer);
2018            packet->buffer = NULL;
2019        }
2020    }
2021
2022    packet->buf_control = TRACE_CTRL_EXTERNAL;
2023    packet->type = TRACE_RT_DATA_DPDK;
2024
2025    /* Check if we already have some packets buffered */
2026    if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2027            packet->buffer =  FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2028            dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2029            return 1; // TODO should be bytes read, which essentially useless anyway
2030    }
2031    /* Wait for a packet */
2032    while (1) {
2033        /* Poll for a single packet */
2034        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2035                                 FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2036        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
2037                FORMAT(libtrace)->burst_size = nb_rx;
2038                FORMAT(libtrace)->burst_offset = 1;
2039                dpdk_ready_pkts(libtrace, &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
2040                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2041                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2042                return 1; // TODO should be bytes read, which essentially useless anyway
2043        }
2044        if (libtrace_halt) {
2045                return 0;
2046        }
2047        /* Wait a while, polling on memory degrades performance
2048         * This relieves the pressure on memory allowing the NIC to DMA */
2049        rte_delay_us(10);
2050    }
2051
2052    /* We'll never get here - but if we did it would be bad */
2053    return -1;
2054}
2055
2056static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
2057    size_t nb_rx; /* Number of rx packets we've recevied */
2058    struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2059    size_t i;
2060
2061    for (i = 0 ; i < nb_packets ; ++i) {
2062            /* Free the last packet buffer */
2063            if (packets[i]->buffer != NULL) {
2064                /* Buffer is owned by DPDK */
2065                if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) {
2066                    rte_pktmbuf_free(packets[i]->buffer);
2067                    packets[i]->buffer = NULL;
2068                } else
2069                /* Buffer is owned by packet i.e. has been malloc'd */
2070                if (packets[i]->buf_control == TRACE_CTRL_PACKET) {
2071                    free(packets[i]->buffer);
2072                    packets[i]->buffer = NULL;
2073                }
2074            }
2075            packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2076            packets[i]->type = TRACE_RT_DATA_DPDK;
2077    }
2078
2079    /* Wait for a packet */
2080    while (1) {
2081        /* Poll for a single packet */
2082        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
2083                            PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets);
2084        if (nb_rx > 0) {
2085                /* Got some packets - otherwise we keep spining */
2086                //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2087                dpdk_ready_pkts(libtrace, PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
2088                return nb_rx;
2089        }
2090        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
2091        if (libtrace_message_queue_count(&t->messages) > 0) {
2092                printf("Extra message yay");
2093                return -2;
2094        }
2095        if (libtrace_halt) {
2096                return 0;
2097        }
2098        /* Wait a while, polling on memory degrades performance
2099         * This relieves the pressure on memory allowing the NIC to DMA */
2100        rte_delay_us(10);
2101    }
2102
2103    /* We'll never get here - but if we did it would be bad */
2104    return -1;
2105}
2106
2107static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2108    struct timeval tv;
2109    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2110
2111    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2112    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2113    return tv;
2114}
2115
2116static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2117    struct timespec ts;
2118    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2119
2120    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2121    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2122    return ts;
2123}
2124
2125static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2126    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2127}
2128
2129static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2130    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2131    return (libtrace_direction_t) hdr->direction;
2132}
2133
2134static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2135    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2136    hdr->direction = (uint8_t) direction;
2137    return (libtrace_direction_t) hdr->direction;
2138}
2139
2140/*
2141 * NOTE: Drops could occur for other reasons than running out of buffer
2142 * space. Such as failed MAC checksums and oversized packets.
2143 */
2144static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
2145    struct rte_eth_stats stats = {0};
2146
2147    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2148        return UINT64_MAX;
2149    /* Grab the current stats */
2150    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2151
2152    /* Get the drop counter */
2153    return (uint64_t) stats.ierrors;
2154}
2155
2156/*
2157 * This is the number of packets filtered by the NIC
2158 * and maybe ahead of number read using libtrace.
2159 *
2160 * XXX we are yet to implement any filtering, but if it was this should
2161 * get the result. So this will just return 0 for now.
2162 */
2163static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
2164    struct rte_eth_stats stats = {0};
2165
2166    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2167        return UINT64_MAX;
2168    /* Grab the current stats */
2169    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2170
2171    /* Get the drop counter */
2172    return (uint64_t) stats.fdirmiss;
2173}
2174
2175/* Attempts to read a packet in a non-blocking fashion. If one is not
2176 * available a SLEEP event is returned. We do not have the ability to
2177 * create a select()able file descriptor in DPDK.
2178 */
2179static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2180                                        libtrace_packet_t *packet) {
2181    libtrace_eventobj_t event = {0,0,0.0,0};
2182    int nb_rx; /* Number of receive packets we've read */
2183    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2184
2185    do {
2186
2187        /* See if we already have a packet waiting */
2188        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2189                        FORMAT(trace)->queue_id, pkts_burst, 1);
2190
2191        if (nb_rx > 0) {
2192            /* Free the last packet buffer */
2193            if (packet->buffer != NULL) {
2194                /* Buffer is owned by DPDK */
2195                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2196                    rte_pktmbuf_free(packet->buffer);
2197                    packet->buffer = NULL;
2198                } else
2199                /* Buffer is owned by packet i.e. has been malloc'd */
2200                if (packet->buf_control == TRACE_CTRL_PACKET) {
2201                    free(packet->buffer);
2202                    packet->buffer = NULL;
2203                }
2204            }
2205
2206            packet->buf_control = TRACE_CTRL_EXTERNAL;
2207            packet->type = TRACE_RT_DATA_DPDK;
2208            event.type = TRACE_EVENT_PACKET;
2209            dpdk_ready_pkts(trace, &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
2210            event.size = 1; // TODO should be bytes read, which essentially useless anyway
2211
2212            /* XXX - Check this passes the filter trace_read_packet normally
2213             * does this for us but this wont */
2214            if (trace->filter) {
2215                if (!trace_apply_filter(trace->filter, packet)) {
2216                    /* Failed the filter so we loop for another packet */
2217                    trace->filtered_packets ++;
2218                    continue;
2219                }
2220            }
2221            trace->accepted_packets ++;
2222        } else {
2223            /* We only want to sleep for a very short time - we are non-blocking */
2224            event.type = TRACE_EVENT_SLEEP;
2225            event.seconds = 0.0001;
2226            event.size = 0;
2227        }
2228
2229        /* If we get here we have our event */
2230        break;
2231    } while (1);
2232
2233    return event;
2234}
2235
2236
2237static void dpdk_help(void) {
2238    printf("dpdk format module: $Revision: 1752 $\n");
2239    printf("Supported input URIs:\n");
2240    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2241    printf("\tThe -<coreid> is optional \n");
2242    printf("\t e.g. dpdk:0000:01:00.1\n");
2243    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2244    printf("\t By default the last CPU core is used if not otherwise specified.\n");
2245    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2246    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2247    printf("\n");
2248    printf("Supported output URIs:\n");
2249    printf("\tSame format as the input URI.\n");
2250    printf("\t e.g. dpdk:0000:01:00.1\n");
2251    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2252    printf("\n");
2253}
2254
2255static struct libtrace_format_t dpdk = {
2256        "dpdk",
2257        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
2258        TRACE_FORMAT_DPDK,
2259        NULL,                   /* probe filename */
2260        NULL,                               /* probe magic */
2261        dpdk_init_input,            /* init_input */
2262        dpdk_config_input,          /* config_input */
2263        dpdk_start_input,           /* start_input */
2264        dpdk_pause_input,           /* pause_input */
2265        dpdk_init_output,           /* init_output */
2266        NULL,                               /* config_output */
2267        dpdk_start_output,          /* start_ouput */
2268        dpdk_fin_input,             /* fin_input */
2269        dpdk_fin_output,        /* fin_output */
2270        dpdk_read_packet,           /* read_packet */
2271        dpdk_prepare_packet,    /* prepare_packet */
2272        dpdk_fin_packet,                                    /* fin_packet */
2273        dpdk_write_packet,          /* write_packet */
2274        dpdk_get_link_type,         /* get_link_type */
2275        dpdk_get_direction,         /* get_direction */
2276        dpdk_set_direction,         /* set_direction */
2277        NULL,                               /* get_erf_timestamp */
2278        dpdk_get_timeval,           /* get_timeval */
2279        dpdk_get_timespec,          /* get_timespec */
2280        NULL,                               /* get_seconds */
2281        NULL,                               /* seek_erf */
2282        NULL,                               /* seek_timeval */
2283        NULL,                               /* seek_seconds */
2284        dpdk_get_capture_length,/* get_capture_length */
2285        dpdk_get_wire_length,   /* get_wire_length */
2286        dpdk_get_framing_length,/* get_framing_length */
2287        dpdk_set_capture_length,/* set_capture_length */
2288        NULL,                               /* get_received_packets */
2289        dpdk_get_filtered_packets,/* get_filtered_packets */
2290        dpdk_get_dropped_packets,/* get_dropped_packets */
2291        NULL,                   /* get_statistics */
2292        NULL,                       /* get_fd */
2293        dpdk_trace_event,               /* trace_event */
2294        dpdk_help,              /* help */
2295        NULL,                   /* next pointer */
2296        {true, 8},              /* Live, NICs typically have 8 threads */
2297        dpdk_pstart_input, /* pstart_input */
2298        dpdk_pread_packets, /* pread_packets */
2299        dpdk_pause_input, /* ppause */
2300        dpdk_fin_input, /* p_fin */
2301        dpdk_pconfig_input, /* pconfig_input */
2302        dpdk_pregister_thread, /* pregister_thread */
2303        dpdk_punregister_thread, /* punregister_thread */
2304        NULL                            /* get thread stats */
2305};
2306
2307void dpdk_constructor(void) {
2308        register_format(&dpdk);
2309}
Note: See TracBrowser for help on using the repository browser.