source: lib/format_dpdk.c @ 694823f

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 694823f was 694823f, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Fix issue with retriving stats from a closed parallel trace.
Remove some DPDK debug code.

  • Property mode set to 100644
File size: 75.0 KB
Line 
1
2/*
3 * This file is part of libtrace
4 *
5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
6 * New Zealand.
7 *
8 * Author: Richard Sanger
9 *
10 * All rights reserved.
11 *
12 * This code has been developed by the University of Waikato WAND
13 * research group. For further information please see http://www.wand.net.nz/
14 *
15 * libtrace is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2 of the License, or
18 * (at your option) any later version.
19 *
20 * libtrace is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with libtrace; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28 *
29 * $Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $
30 *
31 */
32
33/* This format module deals with using the Intel Data Plane Development
34 * Kit capture format.
35 *
36 * Intel Data Plane Development Kit is a LIVE capture format.
37 *
38 * This format also supports writing which will write packets out to the
39 * network as a form of packet replay. This should not be confused with the
40 * RT protocol which is intended to transfer captured packet records between
41 * RT-speaking programs.
42 */
43
44#define _GNU_SOURCE
45
46#include "config.h"
47#include "libtrace.h"
48#include "libtrace_int.h"
49#include "format_helper.h"
50#include "libtrace_arphrd.h"
51#include "hash_toeplitz.h"
52
53#ifdef HAVE_INTTYPES_H
54#  include <inttypes.h>
55#else
56# error "Can't find inttypes.h"
57#endif
58
59#include <stdlib.h>
60#include <assert.h>
61#include <unistd.h>
62#include <endian.h>
63#include <string.h>
64
65#if HAVE_LIBNUMA
66#include <numa.h>
67#endif
68
69/* We can deal with any minor differences by checking the RTE VERSION
70 * Typically DPDK backports some fixes (typically for building against
71 * newer kernels) to the older version of DPDK.
72 *
73 * These get released with the rX suffix. The following macros where added
74 * in these new releases.
75 *
76 * Below this is a log of version that required changes to the libtrace
77 * code (that we still attempt to support).
78 *
79 * DPDK v1.7.1 is recommended.
80 * However 1.5 to 1.8 are likely supported.
81 */
82#include <rte_eal.h>
83#include <rte_version.h>
84#ifndef RTE_VERSION_NUM
85#       define RTE_VERSION_NUM(a,b,c,d) ((a) << 24 | (b) << 16 | (c) << 8 | (d))
86#endif
87#ifndef RTE_VER_PATCH_RELEASE
88#       define RTE_VER_PATCH_RELEASE 0
89#endif
90#ifndef RTE_VERSION
91#       define RTE_VERSION RTE_VERSION_NUM(RTE_VER_MAJOR,RTE_VER_MINOR, \
92        RTE_VER_PATCH_LEVEL, RTE_VER_PATCH_RELEASE)
93#endif
94
95/* 1.6.0r2 :
96 *      rte_eal_pci_set_blacklist() is removed
97 *      device_list is renamed to pci_device_list
98 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
99 *      as such we do apply the whitelist before rte_eal_init.
100 *      This also works correctly with DPDK 1.6.0r2.
101 *
102 * Replaced by:
103 *      rte_devargs (we can simply whitelist)
104 */
105#if RTE_VERSION <= RTE_VERSION_NUM(1, 6, 0, 1)
106#       define DPDK_USE_BLACKLIST 1
107#else
108#       define DPDK_USE_BLACKLIST 0
109#endif
110
111/*
112 * 1.7.0 :
113 *      rte_pmd_init_all is removed
114 *
115 * Replaced by:
116 *      Nothing, no longer needed
117 */
118#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 0)
119#       define DPDK_USE_PMD_INIT 1
120#else
121#       define DPDK_USE_PMD_INIT 0
122#endif
123
124/* 1.7.0-rc3 :
125 *
126 * Since 1.7.0-rc3 rte_eal_pci_probe is called as part of rte_eal_init.
127 * Somewhere between 1.7 and 1.8 calling it twice broke so we should not call
128 * it twice.
129 */
130#if RTE_VERSION < RTE_VERSION_NUM(1, 7, 0, 3)
131#       define DPDK_USE_PCI_PROBE 1
132#else
133#       define DPDK_USE_PCI_PROBE 0
134#endif
135
136/* 1.8.0-rc1 :
137 * LOG LEVEL is a command line option which overrides what
138 * we previously set it to.
139 */
140#if RTE_VERSION >= RTE_VERSION_NUM(1, 8, 0, 1)
141#       define DPDK_USE_LOG_LEVEL 1
142#else
143#       define DPDK_USE_LOG_LEVEL 0
144#endif
145
146#include <rte_per_lcore.h>
147#include <rte_debug.h>
148#include <rte_errno.h>
149#include <rte_common.h>
150#include <rte_log.h>
151#include <rte_memcpy.h>
152#include <rte_prefetch.h>
153#include <rte_branch_prediction.h>
154#include <rte_pci.h>
155#include <rte_ether.h>
156#include <rte_ethdev.h>
157#include <rte_ring.h>
158#include <rte_mempool.h>
159#include <rte_mbuf.h>
160#include <rte_launch.h>
161#include <rte_lcore.h>
162#include <rte_per_lcore.h>
163#include <rte_cycles.h>
164#include <pthread.h>
165
166/* The default size of memory buffers to use - This is the max size of standard
167 * ethernet packet less the size of the MAC CHECKSUM */
168#define RX_MBUF_SIZE 1514
169
170/* The minimum number of memory buffers per queue tx or rx. Search for
171 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
172 */
173#define MIN_NB_BUF 64
174
175/* Number of receive memory buffers to use
176 * By default this is limited by driver to 4k and must be a multiple of 128.
177 * A modification can be made to the driver to remove this limit.
178 * This can be increased in the driver and here.
179 * Should be at least MIN_NB_BUF.
180 */
181#define NB_RX_MBUF 4096
182
183/* Number of send memory buffers to use.
184 * Same limits apply as those to NB_TX_MBUF.
185 */
186#define NB_TX_MBUF 1024
187
188/* The size of the PCI blacklist needs to be big enough to contain
189 * every PCI device address (listed by lspci every bus:device.function tuple).
190 */
191#define BLACK_LIST_SIZE 50
192
193/* The maximum number of characters the mempool name can be */
194#define MEMPOOL_NAME_LEN 20
195
196/* For single threaded libtrace we read packets as a batch/burst
197 * this is the maximum size of said burst */
198#define BURST_SIZE 50
199
200#define MBUF(x) ((struct rte_mbuf *) x)
201/* Get the original placement of the packet data */
202#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
203#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
204#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
205
206#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
207                        (uint64_t) tv.tv_usec*1000ull)
208#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
209                        (uint64_t) ts.tv_nsec)
210
211#if RTE_PKTMBUF_HEADROOM != 128
212#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
213         "any libtrace instance processing these packet must be have the" \
214         "same RTE_PKTMBUF_HEADROOM set"
215#endif
216
217/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
218 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
219 *
220 * Make sure you understand what these are doing before enabling them.
221 * They might make traces incompatable with other builds etc.
222 *
223 * These are also included to show how to do somethings which aren't
224 * obvious in the DPDK documentation.
225 */
226
227/* Print verbose messages to stderr */
228#define DEBUG 0
229
230/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
231 * only turn on if you know clock_gettime is a vsyscall on your system
232 * overwise could be a large overhead. Again gettimeofday() should be
233 * vsyscall also if it's not you should seriously consider updating your
234 * kernel.
235 */
236#ifdef HAVE_LIBRT
237/* You can turn this on (set to 1) to prefer clock_gettime */
238#define USE_CLOCK_GETTIME 1
239#else
240/* DONT CHANGE THIS !!! */
241#define USE_CLOCK_GETTIME 1
242#endif
243
244/* This is fairly safe to turn on - currently there appears to be a 'bug'
245 * in DPDK that will remove the checksum by making the packet appear 4bytes
246 * smaller than what it really is. Most formats don't include the checksum
247 * hence writing out a port such as int: ring: and dpdk: assumes there
248 * is no checksum and will attempt to write the checksum as part of the
249 * packet
250 */
251#define GET_MAC_CRC_CHECKSUM 0
252
253/* This requires a modification of the pmd drivers (inside Intel DPDK)
254 * TODO this requires updating (packet sizes are wrong TS most likely also)
255 */
256#define HAS_HW_TIMESTAMPS_82580 0
257
258#if HAS_HW_TIMESTAMPS_82580
259# define TS_NBITS_82580     40
260/* The maximum on the +ve or -ve side that we can be, make it half way */
261# define MAXSKEW_82580 ((uint64_t) (.5 * (double)(1ull<<TS_NBITS_82580)))
262#define WITHIN_VARIANCE(v1,v2,var) (((v1) - (var) < (v2)) && ((v1) + (var) > (v2)))
263#endif
264
265/* As per Intel 82580 specification - mismatch in 82580 datasheet
266 * it states ts is stored in Big Endian, however its actually Little */
267struct hw_timestamp_82580 {
268    uint64_t reserved;
269    uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
270};
271
272enum paused_state {
273    DPDK_NEVER_STARTED,
274    DPDK_RUNNING,
275    DPDK_PAUSED,
276};
277
278struct dpdk_per_lcore_t
279{
280        uint16_t queue_id;
281        uint8_t port;
282        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
283#if HAS_HW_TIMESTAMPS_82580
284        /* Timestamping only relevent to RX */
285        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
286        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
287#endif
288};
289
290/* Used by both input and output however some fields are not used
291 * for output */
292struct dpdk_format_data_t {
293    int8_t promisc; /* promiscuous mode - RX only */
294    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
295    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
296    uint8_t paused; /* See paused_state */
297    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
298    uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
299    int snaplen; /* The snap length for the capture - RX only */
300    /* We always have to setup both rx and tx queues even if we don't want them */
301    int nb_rx_buf; /* The number of packet buffers in the rx ring */
302    int nb_tx_buf; /* The number of packet buffers in the tx ring */
303    int nic_numa_node; /* The NUMA node that the NIC is attached to */
304    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
305#if DPDK_USE_BLACKLIST
306    struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
307        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
308#endif
309    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
310    uint8_t rss_key[40]; // This is the RSS KEY
311    /* To improve performance we always batch reading packets, in a burst */
312    struct rte_mbuf* burst_pkts[BURST_SIZE];
313    int burst_size; /* The total number read in the burst */
314    int burst_offset; /* The offset we are into the burst */
315        // DPDK normally seems to have a limit of 8 queues for a given card
316        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
317};
318
319enum dpdk_addt_hdr_flags {
320    INCLUDES_CHECKSUM = 0x1,
321    INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
322};
323
324/**
325 * A structure placed in front of the packet where we can store
326 * additional information about the given packet.
327 * +--------------------------+
328 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
329 * +--------------------------+
330 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
331 * +--------------------------+
332 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
333 * +--------------------------+
334 * *   hw_timestamp_82580     * 16 bytes Optional
335 * +--------------------------+
336 * |       Packet data        | Variable Size
337 * |                          |
338 */
339struct dpdk_addt_hdr {
340    uint64_t timestamp;
341    uint8_t flags;
342    uint8_t direction;
343    uint8_t reserved1;
344    uint8_t reserved2;
345    uint32_t cap_len; /* The size to say the capture is */
346};
347
348/**
349 * We want to blacklist all devices except those on the whitelist
350 * (I say list, but yes it is only the one).
351 *
352 * The default behaviour of rte_pci_probe() will map every possible device
353 * to its DPDK driver. The DPDK driver will take the ethernet device
354 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
355 *
356 * So blacklist all devices except the one that we wish to use so that
357 * the others can still be used as standard ethernet ports.
358 *
359 * @return 0 if successful, otherwise -1 on error.
360 */
361#if DPDK_USE_BLACKLIST
362static int blacklist_devices(struct dpdk_format_data_t *format_data, struct rte_pci_addr *whitelist)
363{
364        struct rte_pci_device *dev = NULL;
365        format_data->nb_blacklist = 0;
366
367        memset(format_data->blacklist, 0, sizeof (format_data->blacklist));
368
369        TAILQ_FOREACH(dev, &device_list, next) {
370        if (whitelist != NULL && whitelist->domain == dev->addr.domain
371            && whitelist->bus == dev->addr.bus
372            && whitelist->devid == dev->addr.devid
373            && whitelist->function == dev->addr.function)
374            continue;
375                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
376                                / sizeof (format_data->blacklist[0])) {
377                        fprintf(stderr, "Warning: too many devices to blacklist consider"
378                                        " increasing BLACK_LIST_SIZE");
379                        break;
380                }
381                format_data->blacklist[format_data->nb_blacklist] = dev->addr;
382                ++format_data->nb_blacklist;
383        }
384
385        rte_eal_pci_set_blacklist(format_data->blacklist, format_data->nb_blacklist);
386        return 0;
387}
388#else /* DPDK_USE_BLACKLIST */
389#include <rte_devargs.h>
390static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
391{
392        char pci_str[20] = {0};
393        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
394                 whitelist->domain,
395                 whitelist->bus,
396                 whitelist->devid,
397                 whitelist->function);
398        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
399                return -1;
400        }
401        return 0;
402}
403#endif
404
405/**
406 * Parse the URI format as a pci address
407 * Fills in addr, note core is optional and is unchanged if
408 * a value for it is not provided.
409 *
410 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
411 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
412 */
413static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
414    int matches;
415    assert(str);
416    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
417                     &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
418    if (matches >= 4) {
419        return 0;
420    } else {
421        return -1;
422    }
423}
424
425/**
426 * Convert a pci address to the numa node it is
427 * connected to.
428 *
429 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
430 * so we can call it before DPDK
431 *
432 * @return -1 if unknown otherwise a number 0 or higher of the numa node
433 */
434static int pci_to_numa(struct rte_pci_addr * dev_addr) {
435        char path[50] = {0};
436        FILE *file;
437
438        /* Read from the system */
439        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
440                 dev_addr->domain,
441                 dev_addr->bus,
442                 dev_addr->devid,
443                 dev_addr->function);
444
445        if((file = fopen(path, "r")) != NULL) {
446                int numa_node = -1;
447                fscanf(file, "%d", &numa_node);
448                fclose(file);
449                return numa_node;
450        }
451        return -1;
452}
453
454#if DEBUG
455/* For debugging */
456static inline void dump_configuration()
457{
458    struct rte_config * global_config;
459    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
460
461    if (nb_cpu <= 0) {
462        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
463        nb_cpu = 1; /* fallback to just 1 core */
464    }
465    if (nb_cpu > RTE_MAX_LCORE)
466        nb_cpu = RTE_MAX_LCORE;
467
468    global_config = rte_eal_get_configuration();
469
470    if (global_config != NULL) {
471        int i;
472        fprintf(stderr, "Intel DPDK setup\n"
473               "---Version      : %s\n"
474               "---Master LCore : %"PRIu32"\n"
475               "---LCore Count  : %"PRIu32"\n",
476               rte_version(),
477               global_config->master_lcore, global_config->lcore_count);
478
479        for (i = 0 ; i < nb_cpu; i++) {
480            fprintf(stderr, "   ---Core %d : %s\n", i,
481                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
482        }
483
484        const char * proc_type;
485        switch (global_config->process_type) {
486            case RTE_PROC_AUTO:
487                proc_type = "auto";
488                break;
489            case RTE_PROC_PRIMARY:
490                proc_type = "primary";
491                break;
492            case RTE_PROC_SECONDARY:
493                proc_type = "secondary";
494                break;
495            case RTE_PROC_INVALID:
496                proc_type = "invalid";
497                break;
498            default:
499                proc_type = "something worse than invalid!!";
500        }
501        fprintf(stderr, "---Process Type : %s\n", proc_type);
502    }
503
504}
505#endif
506
507/**
508 * Expects to be called from the master lcore and moves it to the given dpdk id
509 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
510 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
511 *               and not already in use.
512 * @return 0 is successful otherwise -1 on error.
513 */
514static inline int dpdk_move_master_lcore(size_t core) {
515    struct rte_config *cfg = rte_eal_get_configuration();
516    cpu_set_t cpuset;
517    int i;
518
519    assert (core < RTE_MAX_LCORE);
520    assert (rte_get_master_lcore() == rte_lcore_id());
521
522    if (core == rte_lcore_id())
523        return 0;
524
525    // Make sure we are not overwriting someone else
526    assert(!rte_lcore_is_enabled(core));
527
528    // Move the core
529    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
530    cfg->lcore_role[core] = ROLE_RTE;
531    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
532    rte_eal_get_configuration()->master_lcore = core;
533    RTE_PER_LCORE(_lcore_id) = core;
534
535    // Now change the affinity
536    CPU_ZERO(&cpuset);
537
538    if (lcore_config[core].detected) {
539        CPU_SET(core, &cpuset);
540    } else {
541        for (i = 0; i < RTE_MAX_LCORE; ++i) {
542            if (lcore_config[i].detected)
543                CPU_SET(i, &cpuset);
544        }
545    }
546
547    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
548    if (i != 0) {
549        // TODO proper libtrace style error here!!
550        fprintf(stderr, "pthread_setaffinity_np failed\n");
551        return -1;
552    }
553    return 0;
554}
555
556
557/**
558 * XXX This is very bad XXX
559 * But we have to do something to allow getopts nesting
560 * Luckly normally the format is last so it doesn't matter
561 * DPDK only supports modern systems so hopefully this
562 * will continue to work
563 */
564struct saved_getopts {
565        char *optarg;
566        int optind;
567        int opterr;
568        int optopt;
569};
570
571static void save_getopts(struct saved_getopts *opts) {
572        opts->optarg = optarg;
573        opts->optind = optind;
574        opts->opterr = opterr;
575        opts->optopt = optopt;
576}
577
578static void restore_getopts(struct saved_getopts *opts) {
579        optarg = opts->optarg;
580        optind = opts->optind;
581        opterr = opts->opterr;
582        optopt = opts->optopt;
583}
584
585static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
586                                        char * err, int errlen) {
587    int ret; /* Returned error codes */
588    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
589    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
590    char mem_map[20] = {0}; /* The memory name */
591    long nb_cpu; /* The number of CPUs in the system */
592    long my_cpu; /* The CPU number we want to bind to */
593    int i;
594    struct rte_config *cfg = rte_eal_get_configuration();
595        struct saved_getopts save_opts;
596
597#if DEBUG
598    rte_set_log_level(RTE_LOG_DEBUG);
599#else
600    rte_set_log_level(RTE_LOG_WARNING);
601#endif
602    /*
603     * Using unique file prefixes mean separate memory is used, unlinking
604     * the two processes. However be careful we still cannot access a
605     * port that already in use.
606     *
607     * Using unique file prefixes mean separate memory is used, unlinking
608     * the two processes. However be careful we still cannot access a
609     * port that already in use.
610     */
611    char* argv[] = {"libtrace",
612                    "-c", cpu_number,
613                    "-n", "1",
614                    "--proc-type", "auto",
615                    "--file-prefix", mem_map,
616                    "-m", "256",
617#if DPDK_USE_LOG_LEVEL
618#       if DEBUG
619                    "--log-level", "8", /* RTE_LOG_DEBUG */
620#       else
621                    "--log-level", "5", /* RTE_LOG_WARNING */
622#       endif
623#endif
624                    NULL};
625    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
626
627    /* This initialises the Environment Abstraction Layer (EAL)
628     * If we had slave workers these are put into WAITING state
629     *
630     * Basically binds this thread to a fixed core, which we choose as
631     * the last core on the machine (assuming fewer interrupts mapped here).
632     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
633     * "-n" the number of memory channels into the CPU (hardware specific)
634     *      - Most likely to be half the number of ram slots in your machine.
635     *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
636     * Controls where in memory packets are stored and should spread across
637     * the channels. We just use 1 to be safe.
638     */
639
640    /* Get the number of cpu cores in the system and use the last core
641     * on the correct numa node */
642    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
643    if (nb_cpu <= 0) {
644        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
645        nb_cpu = 1; /* fallback to the first core */
646    }
647    if (nb_cpu > RTE_MAX_LCORE)
648        nb_cpu = RTE_MAX_LCORE;
649
650    my_cpu = -1;
651    /* This allows the user to specify the core - we would try to do this
652     * automatically but it's hard to tell that this is secondary
653     * before running rte_eal_init(...). Currently we are limited to 1
654     * instance per core due to the way memory is allocated. */
655    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
656        snprintf(err, errlen, "Failed to parse URI");
657        return -1;
658    }
659
660#if HAVE_LIBNUMA
661        format_data->nic_numa_node = pci_to_numa(&use_addr);
662        if (my_cpu < 0) {
663                /* If we can assign to a core on the same numa node */
664                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
665                if(format_data->nic_numa_node >= 0) {
666                        int max_node_cpu = -1;
667                        struct bitmask *mask = numa_allocate_cpumask();
668                        assert(mask);
669                        numa_node_to_cpus(format_data->nic_numa_node, mask);
670                        for (i = 0 ; i < nb_cpu; ++i) {
671                                if (numa_bitmask_isbitset(mask,i))
672                                        max_node_cpu = i+1;
673                        }
674                        my_cpu = max_node_cpu;
675                }
676        }
677#endif
678        if (my_cpu < 0) {
679                my_cpu = nb_cpu;
680        }
681
682
683    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
684                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
685
686    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
687        snprintf(err, errlen,
688          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
689          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
690        return -1;
691    }
692
693    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
694       info older versions */
695    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
696    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
697
698#if !DPDK_USE_BLACKLIST
699    /* Black list all ports besides the one that we want to use */
700        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
701                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
702                         " are you sure the address is correct?: %s", strerror(-ret));
703                return -1;
704        }
705#endif
706
707        /* Give the memory map a unique name */
708        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
709    /* rte_eal_init it makes a call to getopt so we need to reset the
710     * global optind variable of getopt otherwise this fails */
711        save_getopts(&save_opts);
712    optind = 1;
713    if ((ret = rte_eal_init(argc, argv)) < 0) {
714        snprintf(err, errlen,
715          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
716        return -1;
717    }
718        restore_getopts(&save_opts);
719    // These are still running but will never do anything with DPDK v1.7 we
720    // should remove this XXX in the future
721    for(i = 0; i < RTE_MAX_LCORE; ++i) {
722            if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
723            cfg->lcore_role[i] = ROLE_OFF;
724            cfg->lcore_count--;
725        }
726    }
727    // Only the master should be running
728    assert(cfg->lcore_count == 1);
729
730    dpdk_move_master_lcore(my_cpu-1);
731
732#if DEBUG
733    dump_configuration();
734#endif
735
736#if DPDK_USE_PMD_INIT
737    /* This registers all available NICs with Intel DPDK
738     * These are not loaded until rte_eal_pci_probe() is called.
739     */
740    if ((ret = rte_pmd_init_all()) < 0) {
741        snprintf(err, errlen,
742          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
743        return -1;
744    }
745#endif
746
747#if DPDK_USE_BLACKLIST
748    /* Blacklist all ports besides the one that we want to use */
749        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
750                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
751                         " are you sure the address is correct?: %s", strerror(-ret));
752                return -1;
753        }
754#endif
755
756#if DPDK_USE_PCI_PROBE
757    /* This loads DPDK drivers against all ports that are not blacklisted */
758        if ((ret = rte_eal_pci_probe()) < 0) {
759        snprintf(err, errlen,
760            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
761        return -1;
762    }
763#endif
764
765    format_data->nb_ports = rte_eth_dev_count();
766
767    if (format_data->nb_ports != 1) {
768        snprintf(err, errlen,
769            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
770            format_data->nb_ports);
771        return -1;
772    }
773
774    struct rte_eth_dev_info dev_info;
775    rte_eth_dev_info_get(0, &dev_info);
776    fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
777                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
778
779    return 0;
780}
781
782static int dpdk_init_input (libtrace_t *libtrace) {
783    char err[500];
784    err[0] = 0;
785
786    libtrace->format_data = (struct dpdk_format_data_t *)
787                            malloc(sizeof(struct dpdk_format_data_t));
788    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
789    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
790    FORMAT(libtrace)->nb_ports = 0;
791    FORMAT(libtrace)->snaplen = 0; /* Use default */
792    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
793    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
794    FORMAT(libtrace)->nic_numa_node = -1;
795    FORMAT(libtrace)->promisc = -1;
796    FORMAT(libtrace)->pktmbuf_pool = NULL;
797#if DPDK_USE_BLACKLIST
798    FORMAT(libtrace)->nb_blacklist = 0;
799#endif
800    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
801    FORMAT(libtrace)->mempool_name[0] = 0;
802    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
803    FORMAT(libtrace)->burst_size = 0;
804    FORMAT(libtrace)->burst_offset = 0;
805
806    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
807        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
808        free(libtrace->format_data);
809        libtrace->format_data = NULL;
810        return -1;
811    }
812    return 0;
813};
814
815static int dpdk_init_output(libtrace_out_t *libtrace)
816{
817    char err[500];
818    err[0] = 0;
819
820    libtrace->format_data = (struct dpdk_format_data_t *)
821                            malloc(sizeof(struct dpdk_format_data_t));
822    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
823    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
824    FORMAT(libtrace)->nb_ports = 0;
825    FORMAT(libtrace)->snaplen = 0; /* Use default */
826    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
827    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
828    FORMAT(libtrace)->nic_numa_node = -1;
829    FORMAT(libtrace)->promisc = -1;
830    FORMAT(libtrace)->pktmbuf_pool = NULL;
831#if DPDK_USE_BLACKLIST
832    FORMAT(libtrace)->nb_blacklist = 0;
833#endif
834    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
835    FORMAT(libtrace)->mempool_name[0] = 0;
836    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
837    FORMAT(libtrace)->burst_size = 0;
838    FORMAT(libtrace)->burst_offset = 0;
839
840    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
841        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
842        free(libtrace->format_data);
843        libtrace->format_data = NULL;
844        return -1;
845    }
846    return 0;
847};
848
849static int dpdk_pconfig_input (libtrace_t *libtrace,
850                                trace_parallel_option_t option,
851                                void *data) {
852        switch (option) {
853                case TRACE_OPTION_SET_HASHER:
854                        switch (*((enum hasher_types *) data))
855                        {
856                                case HASHER_BALANCE:
857                                case HASHER_UNIDIRECTIONAL:
858                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
859                                        return 0;
860                                case HASHER_BIDIRECTIONAL:
861                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
862                                        return 0;
863                                case HASHER_HARDWARE:
864                                case HASHER_CUSTOM:
865                                        // We don't support these
866                                        return -1;
867                        }
868        break;
869        }
870        return -1;
871}
872/**
873 * Note here snaplen excludes the MAC checksum. Packets over
874 * the requested snaplen will be dropped. (Excluding MAC checksum)
875 *
876 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
877 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
878 * is set the maximum size of the returned packet would be 1518 otherwise
879 * 1514 would be the largest size possibly returned.
880 *
881 */
882static int dpdk_config_input (libtrace_t *libtrace,
883                                        trace_option_t option,
884                                        void *data) {
885    switch (option) {
886        case TRACE_OPTION_SNAPLEN:
887            /* Only support changing snaplen before a call to start is
888             * made */
889            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
890                FORMAT(libtrace)->snaplen=*(int*)data;
891            else
892                return -1;
893            return 0;
894                case TRACE_OPTION_PROMISC:
895                        FORMAT(libtrace)->promisc=*(int*)data;
896            return 0;
897        case TRACE_OPTION_FILTER:
898            /* TODO filtering */
899            break;
900        case TRACE_OPTION_META_FREQ:
901            break;
902        case TRACE_OPTION_EVENT_REALTIME:
903            break;
904        /* Avoid default: so that future options will cause a warning
905         * here to remind us to implement it, or flag it as
906         * unimplementable
907         */
908    }
909
910        /* Don't set an error - trace_config will try to deal with the
911         * option and will set an error if it fails */
912    return -1;
913}
914
915/* Can set jumbo frames/ or limit the size of a frame by setting both
916 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
917 *
918 */
919static struct rte_eth_conf port_conf = {
920        .rxmode = {
921                .mq_mode = ETH_RSS,
922                .split_hdr_size = 0,
923                .header_split   = 0, /**< Header Split disabled */
924                .hw_ip_checksum = 0, /**< IP checksum offload disabled */
925                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
926                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
927                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
928#if GET_MAC_CRC_CHECKSUM
929/* So it appears that if hw_strip_crc is turned off the driver will still
930 * take this off. See line 955ish in lib/librte_pmd_e1000/igb_rxtx.c.
931 * So if .hw_strip_crc=0 a valid CRC exists 4 bytes after the end of the
932 * So lets just add it back on when we receive the packet.
933 */
934                .hw_strip_crc   = 0, /**< CRC stripped by hardware */
935#else
936/* By default strip the MAC checksum because it's a bit of a hack to
937 * actually read these. And don't want to rely on disabling this to actualy
938 * always cut off the checksum in the future
939 */
940                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
941#endif
942        },
943        .txmode = {
944                .mq_mode = ETH_DCB_NONE,
945        },
946        .rx_adv_conf = {
947                .rss_conf = {
948                        // .rss_key = &rss_key, // We set this per format
949                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
950                },
951        },
952        .intr_conf = {
953                .lsc = 1
954        }
955};
956
957static const struct rte_eth_rxconf rx_conf = {
958        .rx_thresh = {
959                .pthresh = 8,/* RX_PTHRESH prefetch */
960                .hthresh = 8,/* RX_HTHRESH host */
961                .wthresh = 4,/* RX_WTHRESH writeback */
962        },
963    .rx_free_thresh = 0,
964    .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
965};
966
967static const struct rte_eth_txconf tx_conf = {
968        .tx_thresh = {
969        /**
970         * TX_PTHRESH prefetch
971         * Set on the NIC, if the number of unprocessed descriptors to queued on
972         * the card fall below this try grab at least hthresh more unprocessed
973         * descriptors.
974         */
975                .pthresh = 36,
976
977        /* TX_HTHRESH host
978         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
979         */
980                .hthresh = 0,
981
982        /* TX_WTHRESH writeback
983         * Set on the NIC, the number of sent descriptors before writing back
984         * status to confirm the transmission. This is done more efficiently as
985         * a bulk DMA-transfer rather than writing one at a time.
986         * Similar to tx_free_thresh however this is applied to the NIC, where
987         * as tx_free_thresh is when DPDK will check these. This is extended
988         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
989         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
990         */
991                .wthresh = 4,
992        },
993
994    /* Used internally by DPDK rather than passed to the NIC. The number of
995     * packet descriptors to send before checking for any responses written
996     * back (to confirm the transmission). Default = 32 if set to 0)
997     */
998        .tx_free_thresh = 0,
999
1000    /* This is the Report Status threshold, used by 10Gbit cards,
1001     * This signals the card to only write back status (such as
1002     * transmission successful) after this minimum number of transmit
1003     * descriptors are seen. The default is 32 (if set to 0) however if set
1004     * to greater than 1 TX wthresh must be set to zero, because this is kindof
1005     * a replacement. See the dpdk programmers guide for more restrictions.
1006     */
1007        .tx_rs_thresh = 1,
1008};
1009
1010/**
1011 * A callback for a link state change (LSC).
1012 *
1013 * Packets may be received before this notification. In fact the DPDK IGXBE
1014 * driver likes to put a delay upto 5sec before sending this.
1015 *
1016 * We use this to ensure the link speed is correct for our timestamp
1017 * calculations. Because packets might be received before the link up we still
1018 * update this when the packet is received.
1019 *
1020 * @param port The DPDK port
1021 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
1022 * @param cb_arg The dpdk_format_data_t structure associated with the format
1023 */
1024static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
1025                              void *cb_arg) {
1026        struct dpdk_format_data_t * format_data = cb_arg;
1027        struct rte_eth_link link_info;
1028        assert(event == RTE_ETH_EVENT_INTR_LSC);
1029        assert(port == format_data->port);
1030
1031        rte_eth_link_get_nowait(port, &link_info);
1032
1033        if (link_info.link_status)
1034                format_data->link_speed = link_info.link_speed;
1035        else
1036                format_data->link_speed = 0;
1037
1038#if DEBUG
1039        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
1040                link_info.link_status ? "up" : "down",
1041                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1042                                          "full-duplex" : "half-duplex",
1043                (int) link_info.link_speed);
1044#endif
1045
1046        /* Turns out DPDK drivers might not come back up if the link speed
1047         * changes. So we reset the autoneg procedure. This is very unsafe
1048         * we have have threads reading packets and we stop the port. */
1049#if 0
1050        if (!link_info.link_status) {
1051                int ret;
1052                rte_eth_dev_stop(port);
1053                ret = rte_eth_dev_start(port);
1054                if (ret < 0) {
1055                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
1056                                strerror(-ret));
1057                }
1058        }
1059#endif
1060}
1061
1062/* Attach memory to the port and start the port or restart the port.
1063 */
1064static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
1065    int ret; /* Check return values for errors */
1066    struct rte_eth_link link_info; /* Wait for link */
1067    unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id());
1068
1069    /* Already started */
1070    if (format_data->paused == DPDK_RUNNING)
1071        return 0;
1072
1073    /* First time started we need to alloc our memory, doing this here
1074     * rather than in environment setup because we don't have snaplen then */
1075    if (format_data->paused == DPDK_NEVER_STARTED) {
1076        if (format_data->snaplen == 0) {
1077            format_data->snaplen = RX_MBUF_SIZE;
1078            port_conf.rxmode.jumbo_frame = 0;
1079            port_conf.rxmode.max_rx_pkt_len = 0;
1080        } else {
1081            /* Use jumbo frames */
1082            port_conf.rxmode.jumbo_frame = 1;
1083            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1084        }
1085
1086        /* This is additional overhead so make sure we allow space for this */
1087#if GET_MAC_CRC_CHECKSUM
1088        format_data->snaplen += ETHER_CRC_LEN;
1089#endif
1090#if HAS_HW_TIMESTAMPS_82580
1091        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1092#endif
1093
1094        /* Create the mbuf pool, which is the place our packets are allocated
1095         * from - TODO figure out if there is is a free function (I cannot see one)
1096         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1097         * allocate however that extra 1 packet is not used.
1098         * (I assume <= vs < error some where in DPDK code)
1099         * TX requires nb_tx_buffers + 1 in the case the queue is full
1100         * so that will fill the new buffer and wait until slots in the
1101         * ring become available.
1102         */
1103#if DEBUG
1104    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1105#endif
1106    format_data->pktmbuf_pool =
1107            rte_mempool_create(format_data->mempool_name,
1108                       (format_data->nb_rx_buf + format_data->nb_tx_buf + 1),
1109                       format_data->snaplen + sizeof(struct rte_mbuf)
1110                                        + RTE_PKTMBUF_HEADROOM,
1111                       128, sizeof(struct rte_pktmbuf_pool_private),
1112                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1113                       cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/);
1114
1115    if (format_data->pktmbuf_pool == NULL) {
1116            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"
1117                        "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node);
1118            return -1;
1119        }
1120    }
1121
1122    /* ----------- Now do the setup for the port mapping ------------ */
1123    /* Order of calls must be
1124     * rte_eth_dev_configure()
1125     * rte_eth_tx_queue_setup()
1126     * rte_eth_rx_queue_setup()
1127     * rte_eth_dev_start()
1128     * other rte_eth calls
1129     */
1130
1131
1132    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
1133
1134    /* This must be called first before another *eth* function
1135     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1136    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
1137    if (ret < 0) {
1138        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1139                            " %"PRIu8" : %s", format_data->port,
1140                            strerror(-ret));
1141        return -1;
1142    }
1143    /* Initialise the TX queue a minimum value if using this port for
1144     * receiving. Otherwise a larger size if writing packets.
1145     */
1146    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1147                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1148    if (ret < 0) {
1149        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1150                            " %"PRIu8" : %s", format_data->port,
1151                            strerror(-ret));
1152        return -1;
1153    }
1154    /* Initialise the RX queue with some packets from memory */
1155    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
1156                                 format_data->nb_rx_buf, cpu_numa_node,
1157                                 &rx_conf, format_data->pktmbuf_pool);
1158    if (ret < 0) {
1159        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1160                    " %"PRIu8" : %s", format_data->port,
1161                    strerror(-ret));
1162        return -1;
1163    }
1164
1165    /* Start device */
1166    ret = rte_eth_dev_start(format_data->port);
1167    if (ret < 0) {
1168        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1169                    strerror(-ret));
1170        return -1;
1171    }
1172
1173    /* Default promiscuous to on */
1174    if (format_data->promisc == -1)
1175        format_data->promisc = 1;
1176
1177    if (format_data->promisc == 1)
1178        rte_eth_promiscuous_enable(format_data->port);
1179    else
1180        rte_eth_promiscuous_disable(format_data->port);
1181
1182        /* Register a callback for link state changes */
1183        ret = rte_eth_dev_callback_register(format_data->port,
1184                                            RTE_ETH_EVENT_INTR_LSC,
1185                                            dpdk_lsc_callback,
1186                                            format_data);
1187        /* If this fails it is not a show stopper */
1188#if DEBUG
1189        fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1190                ret, strerror(-ret));
1191#endif
1192
1193    /* Get the current link status */
1194    rte_eth_link_get_nowait(format_data->port, &link_info);
1195    format_data->link_speed = link_info.link_speed;
1196#if DEBUG
1197    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1198            (int) link_info.link_duplex, (int) link_info.link_speed);
1199#endif
1200    /* We have now successfully started/unpaused */
1201    format_data->paused = DPDK_RUNNING;
1202
1203    return 0;
1204}
1205
1206/* Attach memory to the port and start (or restart) the port/s.
1207 */
1208static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) {
1209    int ret, i; /* Check return values for errors */
1210    struct rte_eth_link link_info; /* Wait for link */
1211
1212    /* Already started */
1213    if (format_data->paused == DPDK_RUNNING)
1214        return 0;
1215
1216    /* First time started we need to alloc our memory, doing this here
1217     * rather than in environment setup because we don't have snaplen then */
1218    if (format_data->paused == DPDK_NEVER_STARTED) {
1219        if (format_data->snaplen == 0) {
1220            format_data->snaplen = RX_MBUF_SIZE;
1221            port_conf.rxmode.jumbo_frame = 0;
1222            port_conf.rxmode.max_rx_pkt_len = 0;
1223        } else {
1224            /* Use jumbo frames */
1225            port_conf.rxmode.jumbo_frame = 1;
1226            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
1227        }
1228
1229        /* This is additional overhead so make sure we allow space for this */
1230#if GET_MAC_CRC_CHECKSUM
1231        format_data->snaplen += ETHER_CRC_LEN;
1232#endif
1233#if HAS_HW_TIMESTAMPS_82580
1234        format_data->snaplen += sizeof(struct hw_timestamp_82580);
1235#endif
1236
1237        /* Create the mbuf pool, which is the place our packets are allocated
1238         * from - TODO figure out if there is a free function (I cannot see one)
1239         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
1240         * allocate however that extra 1 packet is not used.
1241         * (I assume <= vs < error some where in DPDK code)
1242         * TX requires nb_tx_buffers + 1 in the case the queue is full
1243         * so that will fill the new buffer and wait until slots in the
1244         * ring become available.
1245         */
1246#if DEBUG
1247    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
1248#endif
1249    format_data->pktmbuf_pool =
1250            rte_mempool_create(format_data->mempool_name,
1251                       (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
1252                       format_data->snaplen + sizeof(struct rte_mbuf)
1253                                        + RTE_PKTMBUF_HEADROOM,
1254                       128, sizeof(struct rte_pktmbuf_pool_private),
1255                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
1256                       format_data->nic_numa_node, 0);
1257
1258        if (format_data->pktmbuf_pool == NULL) {
1259            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
1260                        "pool failed: %s", strerror(rte_errno));
1261            return -1;
1262        }
1263    }
1264
1265    /* ----------- Now do the setup for the port mapping ------------ */
1266    /* Order of calls must be
1267     * rte_eth_dev_configure()
1268     * rte_eth_tx_queue_setup()
1269     * rte_eth_rx_queue_setup()
1270     * rte_eth_dev_start()
1271     * other rte_eth calls
1272     */
1273
1274    /* This must be called first before another *eth* function
1275     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
1276    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
1277    if (ret < 0) {
1278        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
1279                            " %"PRIu8" : %s", format_data->port,
1280                            strerror(-ret));
1281        return -1;
1282    }
1283#if DEBUG
1284    fprintf(stderr, "Doing dev configure\n");
1285#endif
1286    /* Initialise the TX queue a minimum value if using this port for
1287     * receiving. Otherwise a larger size if writing packets.
1288     */
1289    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
1290                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
1291    if (ret < 0) {
1292        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
1293                            " %"PRIu8" : %s", format_data->port,
1294                            strerror(-ret));
1295        return -1;
1296    }
1297
1298    for (i=0; i < rx_queues; i++) {
1299#if DEBUG
1300    fprintf(stderr, "Doing queue configure\n");
1301#endif
1302
1303                /* Initialise the RX queue with some packets from memory */
1304                ret = rte_eth_rx_queue_setup(format_data->port, i,
1305                                             format_data->nb_rx_buf, format_data->nic_numa_node,
1306                                             &rx_conf, format_data->pktmbuf_pool);
1307        /* Init per_thread data structures */
1308        format_data->per_lcore[i].port = format_data->port;
1309        format_data->per_lcore[i].queue_id = i;
1310
1311                if (ret < 0) {
1312                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
1313                                                " %"PRIu8" : %s", format_data->port,
1314                                                strerror(-ret));
1315                        return -1;
1316                }
1317        }
1318
1319#if DEBUG
1320    fprintf(stderr, "Doing start device\n");
1321#endif
1322    /* Start device */
1323    ret = rte_eth_dev_start(format_data->port);
1324#if DEBUG
1325    fprintf(stderr, "Done start device\n");
1326#endif
1327    if (ret < 0) {
1328        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
1329                    strerror(-ret));
1330        return -1;
1331    }
1332
1333
1334    /* Default promiscuous to on */
1335    if (format_data->promisc == -1)
1336        format_data->promisc = 1;
1337
1338    if (format_data->promisc == 1)
1339        rte_eth_promiscuous_enable(format_data->port);
1340    else
1341        rte_eth_promiscuous_disable(format_data->port);
1342
1343
1344    /* We have now successfully started/unpased */
1345    format_data->paused = DPDK_RUNNING;
1346
1347    // Can use remote launch for all
1348    /*RTE_LCORE_FOREACH_SLAVE(i) {
1349                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
1350        }*/
1351
1352    /* Register a callback for link state changes */
1353    ret = rte_eth_dev_callback_register(format_data->port,
1354                                        RTE_ETH_EVENT_INTR_LSC,
1355                                        dpdk_lsc_callback,
1356                                        format_data);
1357    /* If this fails it is not a show stopper */
1358#if DEBUG
1359    fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
1360            ret, strerror(-ret));
1361#endif
1362
1363    /* Get the current link status */
1364    rte_eth_link_get_nowait(format_data->port, &link_info);
1365    format_data->link_speed = link_info.link_speed;
1366#if DEBUG
1367    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
1368            (int) link_info.link_duplex, (int) link_info.link_speed);
1369#endif
1370
1371    return 0;
1372}
1373
1374static int dpdk_start_input (libtrace_t *libtrace) {
1375    char err[500];
1376    err[0] = 0;
1377
1378    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1379        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1380        free(libtrace->format_data);
1381        libtrace->format_data = NULL;
1382        return -1;
1383    }
1384    return 0;
1385}
1386
1387static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
1388    struct rte_eth_dev_info dev_info;
1389    rte_eth_dev_info_get(port_id, &dev_info);
1390    return dev_info.max_rx_queues;
1391}
1392
1393static inline size_t dpdk_processor_count () {
1394    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
1395    if (nb_cpu <= 0)
1396        return 1;
1397    else
1398        return (size_t) nb_cpu;
1399}
1400
1401static int dpdk_pstart_input (libtrace_t *libtrace) {
1402    char err[500];
1403    int i=0, phys_cores=0;
1404    int tot = libtrace->perpkt_thread_count;
1405    err[0] = 0;
1406
1407    if (rte_lcore_id() != rte_get_master_lcore())
1408        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
1409
1410    // If the master is not on the last thread we move it there
1411    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
1412        // Consider error handling here
1413        dpdk_move_master_lcore(RTE_MAX_LCORE - 1);
1414    }
1415
1416    // Don't exceed the number of cores in the system/detected by dpdk
1417    // We don't have to force this but performance wont be good if we don't
1418    for (i = 0; i < RTE_MAX_LCORE; ++i) {
1419        if (lcore_config[i].detected) {
1420            if (rte_lcore_is_enabled(i))
1421                fprintf(stderr, "Found core %d already in use!\n", i);
1422            else
1423                phys_cores++;
1424        }
1425    }
1426
1427        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
1428        tot = MIN(tot, phys_cores);
1429
1430        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
1431
1432    if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
1433        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1434        free(libtrace->format_data);
1435        libtrace->format_data = NULL;
1436        return -1;
1437    }
1438
1439    // Make sure we only start the number that we should
1440    libtrace->perpkt_thread_count = tot;
1441    return 0;
1442}
1443
1444
1445/**
1446 * Register a thread with the DPDK system,
1447 * When we start DPDK in parallel libtrace we move the 'main thread' to the
1448 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
1449 * gives it.
1450 *
1451 * We then allow a mapper thread to be started on every real core as DPDK would,
1452 * we also bind these to the corresponding CPU cores.
1453 *
1454 * @param libtrace A pointer to the trace
1455 * @param reading True if the thread will be used to read packets, i.e. will
1456 *                call pread_packet(), false if thread used to process packet
1457 *                in any other manner including statistics functions.
1458 */
1459static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
1460{
1461    struct rte_config *cfg = rte_eal_get_configuration();
1462    int i;
1463    int new_id = -1;
1464
1465    // If 'reading packets' fill in cores from 0 up and bind affinity
1466    // otherwise start from the MAX core (which is also the master) and work backwards
1467    // in this case physical cores on the system will not exist so we don't bind
1468    // these to any particular physical core
1469    pthread_mutex_lock(&libtrace->libtrace_lock);
1470    if (reading) {
1471#if HAVE_LIBNUMA
1472        for (i = 0; i < RTE_MAX_LCORE; ++i) {
1473                if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
1474                                new_id = i;
1475                        if (!lcore_config[i].detected)
1476                                new_id = -1;
1477                        break;
1478                }
1479        }
1480#endif
1481        /* Retry without the the numa restriction */
1482        if (new_id == -1) {
1483                for (i = 0; i < RTE_MAX_LCORE; ++i) {
1484                                if (!rte_lcore_is_enabled(i)) {
1485                                        new_id = i;
1486                                if (!lcore_config[i].detected)
1487                                        fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
1488                                break;
1489                        }
1490                }
1491        }
1492    } else {
1493        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
1494            if (!rte_lcore_is_enabled(i)) {
1495                new_id = i;
1496                break;
1497            }
1498        }
1499    }
1500
1501    if (new_id == -1) {
1502        assert(cfg->lcore_count == RTE_MAX_LCORE);
1503        // TODO proper libtrace style error here!!
1504        fprintf(stderr, "Too many threads for DPDK!!\n");
1505        pthread_mutex_unlock(&libtrace->libtrace_lock);
1506        return -1;
1507    }
1508
1509    // Enable the core in global DPDK structs
1510    cfg->lcore_role[new_id] = ROLE_RTE;
1511    cfg->lcore_count++;
1512    // Set TLS to reflect our new number
1513    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
1514    fprintf(stderr, "original id%d", rte_lcore_id());
1515    RTE_PER_LCORE(_lcore_id) = new_id;
1516        char name[99];
1517        pthread_getname_np(pthread_self(),
1518                              name, sizeof(name));
1519
1520    fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
1521
1522    if (reading) {
1523        // Set affinity bind to corresponding core
1524        cpu_set_t cpuset;
1525        CPU_ZERO(&cpuset);
1526        CPU_SET(rte_lcore_id(), &cpuset);
1527        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
1528        if (i != 0) {
1529            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
1530            pthread_mutex_unlock(&libtrace->libtrace_lock);
1531            return -1;
1532        }
1533    }
1534
1535    // Map our TLS to the thread data
1536    if (reading) {
1537        if(t->type == THREAD_PERPKT) {
1538            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
1539        } else {
1540            t->format_data = &FORMAT(libtrace)->per_lcore[0];
1541        }
1542    }
1543    pthread_mutex_unlock(&libtrace->libtrace_lock);
1544    return 0;
1545}
1546
1547
1548/**
1549 * Unregister a thread with the DPDK system.
1550 *
1551 * Only previously registered threads should be calling this just before
1552 * they are destroyed.
1553 */
1554static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
1555{
1556    struct rte_config *cfg = rte_eal_get_configuration();
1557
1558    assert(rte_lcore_id() < RTE_MAX_LCORE);
1559    pthread_mutex_lock(&libtrace->libtrace_lock);
1560    // Skip if master!!
1561    if (rte_lcore_id() == rte_get_master_lcore()) {
1562        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
1563        pthread_mutex_unlock(&libtrace->libtrace_lock);
1564        return;
1565    }
1566
1567    // Disable this core in global DPDK structs
1568    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
1569    cfg->lcore_count--;
1570    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
1571    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
1572    pthread_mutex_unlock(&libtrace->libtrace_lock);
1573    return;
1574}
1575
1576static int dpdk_start_output(libtrace_out_t *libtrace)
1577{
1578    char err[500];
1579    err[0] = 0;
1580
1581    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
1582        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
1583        free(libtrace->format_data);
1584        libtrace->format_data = NULL;
1585        return -1;
1586    }
1587    return 0;
1588}
1589
1590static int dpdk_pause_input(libtrace_t * libtrace) {
1591    /* This stops the device, but can be restarted using rte_eth_dev_start() */
1592    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
1593#if DEBUG
1594        fprintf(stderr, "Pausing DPDK port\n");
1595#endif
1596        rte_eth_dev_stop(FORMAT(libtrace)->port);
1597        FORMAT(libtrace)->paused = DPDK_PAUSED;
1598        /* Empty the queue of packets */
1599        for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
1600                rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
1601        }
1602        FORMAT(libtrace)->burst_offset = 0;
1603        FORMAT(libtrace)->burst_size = 0;
1604        /* If we pause it the driver will be reset and likely our counter */
1605
1606        FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0;
1607#if HAS_HW_TIMESTAMPS_82580
1608        FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0;
1609#endif
1610    }
1611    return 0;
1612}
1613
1614static int dpdk_write_packet(libtrace_out_t *trace,
1615                libtrace_packet_t *packet){
1616    struct rte_mbuf* m_buff[1];
1617
1618    int wirelen = trace_get_wire_length(packet);
1619    int caplen = trace_get_capture_length(packet);
1620
1621    /* Check for a checksum and remove it */
1622    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
1623                                            wirelen == caplen)
1624        caplen -= ETHER_CRC_LEN;
1625
1626    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
1627    if (m_buff[0] == NULL) {
1628        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
1629        return -1;
1630    } else {
1631        int ret;
1632        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
1633        do {
1634            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
1635        } while (ret != 1);
1636    }
1637
1638    return 0;
1639}
1640
1641static int dpdk_fin_input(libtrace_t * libtrace) {
1642    /* Free our memory structures */
1643    if (libtrace->format_data != NULL) {
1644        /* Close the device completely, device cannot be restarted */
1645        if (FORMAT(libtrace)->port != 0xFF)
1646                rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
1647                                                RTE_ETH_EVENT_INTR_LSC,
1648                                                dpdk_lsc_callback,
1649                                                FORMAT(libtrace));
1650                rte_eth_dev_close(FORMAT(libtrace)->port);
1651                /* filter here if we used it */
1652                free(libtrace->format_data);
1653        }
1654
1655    /* Revert to the original PCI drivers */
1656    /* No longer in DPDK
1657    rte_eal_pci_exit(); */
1658    return 0;
1659}
1660
1661
1662static int dpdk_fin_output(libtrace_out_t * libtrace) {
1663    /* Free our memory structures */
1664    if (libtrace->format_data != NULL) {
1665        /* Close the device completely, device cannot be restarted */
1666        if (FORMAT(libtrace)->port != 0xFF)
1667            rte_eth_dev_close(FORMAT(libtrace)->port);
1668        /* filter here if we used it */
1669                free(libtrace->format_data);
1670        }
1671
1672    /* Revert to the original PCI drivers */
1673    /* No longer in DPDK
1674    rte_eal_pci_exit(); */
1675    return 0;
1676}
1677
1678/**
1679 * Get the start of the additional header that we added to a packet.
1680 */
1681static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
1682    assert(packet);
1683    assert(packet->buffer);
1684    /* Our header sits straight after the mbuf header */
1685    return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
1686}
1687
1688static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
1689    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1690    return hdr->cap_len;
1691}
1692
1693static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
1694    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1695    if (size > hdr->cap_len) {
1696        /* Cannot make a packet bigger */
1697                return trace_get_capture_length(packet);
1698        }
1699
1700    /* Reset the cached capture length first*/
1701    packet->capture_length = -1;
1702    hdr->cap_len = (uint32_t) size;
1703        return trace_get_capture_length(packet);
1704}
1705
1706static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
1707    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1708    int org_cap_size; /* The original capture size */
1709    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
1710        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
1711                            sizeof(struct hw_timestamp_82580);
1712    } else {
1713        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
1714    }
1715    if (hdr->flags & INCLUDES_CHECKSUM) {
1716        return org_cap_size;
1717    } else {
1718        /* DPDK packets are always TRACE_TYPE_ETH packets */
1719        return org_cap_size + ETHER_CRC_LEN;
1720    }
1721}
1722static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
1723    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
1724    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
1725        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
1726                sizeof(struct hw_timestamp_82580);
1727    else
1728        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1729}
1730
1731static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
1732                libtrace_packet_t *packet, void *buffer,
1733                libtrace_rt_types_t rt_type, uint32_t flags) {
1734    assert(packet);
1735    if (packet->buffer != buffer &&
1736        packet->buf_control == TRACE_CTRL_PACKET) {
1737        free(packet->buffer);
1738    }
1739
1740    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
1741        packet->buf_control = TRACE_CTRL_PACKET;
1742    } else
1743        packet->buf_control = TRACE_CTRL_EXTERNAL;
1744
1745    packet->buffer = buffer;
1746    packet->header = buffer;
1747
1748    /* Don't use pktmbuf_mtod will fail if the packet is a copy */
1749    packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
1750    packet->type = rt_type;
1751    return 0;
1752}
1753
1754
1755/**
1756 * Given a packet size and a link speed, computes the
1757 * time to transmit in nanoseconds.
1758 *
1759 * @param format_data The dpdk format data from which we get the link speed
1760 *        and if unset updates it in a thread safe manner
1761 * @param pkt_size The size of the packet in bytes
1762 * @return The wire time in nanoseconds
1763 */
1764static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
1765        uint32_t wire_time;
1766        /* 20 extra bytes of interframe gap and preamble */
1767# if GET_MAC_CRC_CHECKSUM
1768        wire_time = ((pkt_size + 20) * 8000);
1769# else
1770        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
1771# endif
1772
1773        /* Division is really slow and introduces a pipeline stall
1774         * The compiler will optimise this into magical multiplication and shifting
1775         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
1776         */
1777retry_calc_wiretime:
1778        switch (format_data->link_speed) {
1779        case ETH_LINK_SPEED_40G:
1780                wire_time /=  ETH_LINK_SPEED_40G;
1781                break;
1782        case ETH_LINK_SPEED_20G:
1783                wire_time /= ETH_LINK_SPEED_20G;
1784                break;
1785        case ETH_LINK_SPEED_10G:
1786                wire_time /= ETH_LINK_SPEED_10G;
1787                break;
1788        case ETH_LINK_SPEED_1000:
1789                wire_time /= ETH_LINK_SPEED_1000;
1790                break;
1791        case 0:
1792                {
1793                /* Maybe the link was down originally, but now it should be up */
1794                struct rte_eth_link link = {0};
1795                rte_eth_link_get_nowait(format_data->port, &link);
1796                if (link.link_status && link.link_speed) {
1797                        format_data->link_speed = link.link_speed;
1798#ifdef DEBUG
1799                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
1800#endif
1801                        goto retry_calc_wiretime;
1802                }
1803                /* We don't know the link speed, make sure numbers are counting up */
1804                wire_time = 1;
1805                break;
1806                }
1807        default:
1808                wire_time /= format_data->link_speed;
1809        }
1810        return wire_time;
1811}
1812
1813
1814
1815/*
1816 * Does any extra preperation to all captured packets
1817 * This includes adding our extra header to it with the timestamp,
1818 * and any snapping
1819 *
1820 * @param format_data The DPDK format data
1821 * @param plc The DPDK per lcore format data
1822 * @param pkts An array of size nb_pkts of DPDK packets
1823 * @param nb_pkts The number of packets in pkts and optionally packets
1824 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
1825 */
1826static inline void dpdk_ready_pkts(struct dpdk_format_data_t *format_data, struct dpdk_per_lcore_t *plc,
1827                                   struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
1828        struct dpdk_addt_hdr *hdr;
1829        size_t i;
1830        uint64_t cur_sys_time_ns;
1831#if HAS_HW_TIMESTAMPS_82580
1832        struct hw_timestamp_82580 *hw_ts;
1833        uint64_t estimated_wraps;
1834#else
1835
1836#endif
1837
1838#if USE_CLOCK_GETTIME
1839        struct timespec cur_sys_time = {0};
1840        /* This looks terrible and I feel bad doing it. But it's OK
1841         * on new kernels, because this is a fast vsyscall */
1842        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
1843        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
1844#else
1845        struct timeval cur_sys_time = {0};
1846        /* Also a fast vsyscall */
1847        gettimeofday(&cur_sys_time, NULL);
1848        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
1849#endif
1850
1851        /* The system clock is not perfect so when running
1852         * at linerate we could timestamp a packet in the past.
1853         * To avoid this we munge the timestamp to appear 1ns
1854         * after the previous packet. We should eventually catch up
1855         * to system time since a 64byte packet on a 10G link takes 67ns.
1856         *
1857         * Note with parallel readers timestamping packets
1858         * with duplicate stamps or out of order is unavoidable without
1859         * hardware timestamping from the NIC.
1860         */
1861#if !HAS_HW_TIMESTAMPS_82580
1862        if (plc->ts_last_sys >= cur_sys_time_ns) {
1863                cur_sys_time_ns = plc->ts_last_sys + 1;
1864        }
1865#endif
1866
1867        assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); // TODO static compile time assert sometime??
1868        for (i = 0 ; i < nb_pkts ; ++i) {
1869
1870                /* We put our header straight after the dpdk header */
1871                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
1872                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
1873
1874#if GET_MAC_CRC_CHECKSUM
1875<<<<<<< HEAD
1876                /* Add back in the CRC sum */
1877                pkts[i]->pkt.pkt_len += ETHER_CRC_LEN;
1878                pkts[i]->pkt.data_len += ETHER_CRC_LEN;
1879                hdr->flags |= INCLUDES_CHECKSUM;
1880=======
1881    /* Add back in the CRC sum */
1882    rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
1883    rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
1884    hdr->flags |= INCLUDES_CHECKSUM;
1885>>>>>>> master
1886#endif
1887
1888                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
1889
1890#if HAS_HW_TIMESTAMPS_82580
1891                /* The timestamp is sitting before our packet and is included in pkt_len */
1892                hdr->flags |= INCLUDES_HW_TIMESTAMP;
1893                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
1894                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
1895
1896                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
1897                 *
1898                 *        +----------+---+   +--------------+
1899                 *  82580 |    24    | 8 |   |      32      |
1900                 *        +----------+---+   +--------------+
1901                 *          reserved  \______ 40 bits _____/
1902                 *
1903                 * The 40 bit 82580 SYSTIM overflows every
1904                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
1905                 *
1906                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
1907                 * Endian (for the full 64 bits) i.e. picture is mirrored
1908                 */
1909
1910                /* Despite what the documentation says this is in Little
1911                 * Endian byteorder. Mask the reserved section out.
1912                 */
1913                hdr->timestamp = le64toh(hw_ts->timestamp) &
1914                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
1915
1916                if (unlikely(plc->ts_first_sys == 0)) {
1917                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
1918                        plc->ts_last_sys = plc->ts_first_sys;
1919                }
1920
1921                /* This will have serious problems if packets aren't read quickly
1922                 * that is within a couple of seconds because our clock cycles every
1923                 * 18 seconds */
1924                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
1925                                  / (1ull<<TS_NBITS_82580);
1926
1927                /* Estimated_wraps gives the number of times the counter should have
1928                 * wrapped (however depending on value last time it could have wrapped
1929                 * twice more (if hw clock is close to its max value) or once less (allowing
1930                 * for a bit of variance between hw and sys clock). But if the clock
1931                 * shouldn't have wrapped once then don't allow it to go backwards in time */
1932                if (unlikely(estimated_wraps >= 2)) {
1933                        /* 2 or more wrap arounds add all but the very last wrap */
1934                        plc->wrap_count += estimated_wraps - 1;
1935                }
1936
1937                /* Set the timestamp to the lowest possible value we're considering */
1938                hdr->timestamp += plc->ts_first_sys +
1939                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
1940
1941                /* In most runs only the first if() will need evaluating - i.e our
1942                 * estimate is correct. */
1943                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
1944                                              hdr->timestamp, MAXSKEW_82580))) {
1945                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
1946                        plc->wrap_count++;
1947                        hdr->timestamp += (1ull<<TS_NBITS_82580);
1948                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
1949                                             hdr->timestamp, MAXSKEW_82580)) {
1950                                /* Failed to match estimated_wraps */
1951                                plc->wrap_count++;
1952                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1953                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1954                                                     hdr->timestamp, MAXSKEW_82580)) {
1955                                        if (estimated_wraps == 0) {
1956                                                /* 0 case Failed to match estimated_wraps+2 */
1957                                                printf("WARNING - Hardware Timestamp failed to"
1958                                                       " match using systemtime!\n");
1959                                                hdr->timestamp = cur_sys_time_ns;
1960                                        } else {
1961                                                /* Failed to match estimated_wraps+1 */
1962                                                plc->wrap_count++;
1963                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
1964                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
1965                                                                     hdr->timestamp, MAXSKEW_82580)) {
1966                                                        /* Failed to match estimated_wraps+2 */
1967                                                        printf("WARNING - Hardware Timestamp failed to"
1968                                                               " match using systemtime!!\n");
1969                                                }
1970                                        }
1971                                }
1972                        }
1973                }
1974#else
1975
1976                hdr->timestamp = cur_sys_time_ns;
1977                /* Offset the next packet by the wire time of previous */
1978                calculate_wire_time(format_data, hdr->cap_len);
1979
1980#endif
1981                if(packets) {
1982                        packets[i]->buffer = pkts[i];
1983                        packets[i]->header = pkts[i];
1984#if HAS_HW_TIMESTAMPS_82580
1985                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
1986                                              RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580);
1987#else
1988                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
1989                                              RTE_PKTMBUF_HEADROOM;
1990#endif
1991                        packets[i]->error = 1;
1992                }
1993        }
1994
1995        plc->ts_last_sys = cur_sys_time_ns;
1996
1997        return;
1998}
1999
2000
2001static void dpdk_fin_packet(libtrace_packet_t *packet)
2002{
2003        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2004                rte_pktmbuf_free(packet->buffer);
2005                packet->buffer = NULL;
2006        }
2007}
2008
2009
2010static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
2011    int nb_rx; /* Number of rx packets we've received */
2012
2013    /* Free the last packet buffer */
2014    if (packet->buffer != NULL) {
2015        /* Buffer is owned by DPDK */
2016        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2017            rte_pktmbuf_free(packet->buffer);
2018            packet->buffer = NULL;
2019        } else
2020        /* Buffer is owned by packet i.e. has been malloc'd */
2021        if (packet->buf_control == TRACE_CTRL_PACKET) {
2022            free(packet->buffer);
2023            packet->buffer = NULL;
2024        }
2025    }
2026
2027    packet->buf_control = TRACE_CTRL_EXTERNAL;
2028    packet->type = TRACE_RT_DATA_DPDK;
2029
2030    /* Check if we already have some packets buffered */
2031    if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
2032            packet->buffer =  FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
2033            dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2034            return 1; // TODO should be bytes read, which essentially useless anyway
2035    }
2036    /* Wait for a packet */
2037    while (1) {
2038        /* Poll for a single packet */
2039        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
2040                                 FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE);
2041        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
2042                FORMAT(libtrace)->burst_size = nb_rx;
2043                FORMAT(libtrace)->burst_offset = 1;
2044                dpdk_ready_pkts(FORMAT(libtrace), &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
2045                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
2046                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
2047                return 1; // TODO should be bytes read, which essentially useless anyway
2048        }
2049        if (libtrace_halt) {
2050                return 0;
2051        }
2052        /* Wait a while, polling on memory degrades performance
2053         * This relieves the pressure on memory allowing the NIC to DMA */
2054        rte_delay_us(10);
2055    }
2056
2057    /* We'll never get here - but if we did it would be bad */
2058    return -1;
2059}
2060
2061static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
2062    size_t nb_rx; /* Number of rx packets we've recevied */
2063    struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
2064    size_t i;
2065
2066    for (i = 0 ; i < nb_packets ; ++i) {
2067            /* Free the last packet buffer */
2068            if (packets[i]->buffer != NULL) {
2069                /* Buffer is owned by DPDK */
2070                if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) {
2071                    rte_pktmbuf_free(packets[i]->buffer);
2072                    packets[i]->buffer = NULL;
2073                } else
2074                /* Buffer is owned by packet i.e. has been malloc'd */
2075                if (packets[i]->buf_control == TRACE_CTRL_PACKET) {
2076                    free(packets[i]->buffer);
2077                    packets[i]->buffer = NULL;
2078                }
2079            }
2080            packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
2081            packets[i]->type = TRACE_RT_DATA_DPDK;
2082    }
2083
2084    /* Wait for a packet */
2085    while (1) {
2086        /* Poll for a single packet */
2087        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
2088                            PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets);
2089        if (nb_rx > 0) {
2090                /* Got some packets - otherwise we keep spining */
2091                //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
2092                dpdk_ready_pkts(FORMAT(libtrace), PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
2093                return nb_rx;
2094        }
2095        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
2096        if (libtrace_message_queue_count(&t->messages) > 0) {
2097                printf("Extra message yay");
2098                return -2;
2099        }
2100        if (libtrace_halt) {
2101                return 0;
2102        }
2103        /* Wait a while, polling on memory degrades performance
2104         * This relieves the pressure on memory allowing the NIC to DMA */
2105        rte_delay_us(10);
2106    }
2107
2108    /* We'll never get here - but if we did it would be bad */
2109    return -1;
2110}
2111
2112static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
2113    struct timeval tv;
2114    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2115
2116    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2117    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
2118    return tv;
2119}
2120
2121static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
2122    struct timespec ts;
2123    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2124
2125    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
2126    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
2127    return ts;
2128}
2129
2130static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
2131    return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
2132}
2133
2134static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
2135    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2136    return (libtrace_direction_t) hdr->direction;
2137}
2138
2139static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
2140    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
2141    hdr->direction = (uint8_t) direction;
2142    return (libtrace_direction_t) hdr->direction;
2143}
2144
2145/*
2146 * NOTE: Drops could occur for other reasons than running out of buffer
2147 * space. Such as failed MAC checksums and oversized packets.
2148 */
2149static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
2150    struct rte_eth_stats stats = {0};
2151
2152    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2153        return UINT64_MAX;
2154    /* Grab the current stats */
2155    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2156
2157    /* Get the drop counter */
2158    return (uint64_t) stats.ierrors;
2159}
2160
2161/*
2162 * This is the number of packets filtered by the NIC
2163 * and maybe ahead of number read using libtrace.
2164 *
2165 * XXX we are yet to implement any filtering, but if it was this should
2166 * get the result. So this will just return 0 for now.
2167 */
2168static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
2169    struct rte_eth_stats stats = {0};
2170
2171    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
2172        return UINT64_MAX;
2173    /* Grab the current stats */
2174    rte_eth_stats_get(FORMAT(trace)->port, &stats);
2175
2176    /* Get the drop counter */
2177    return (uint64_t) stats.fdirmiss;
2178}
2179
2180/* Attempts to read a packet in a non-blocking fashion. If one is not
2181 * available a SLEEP event is returned. We do not have the ability to
2182 * create a select()able file descriptor in DPDK.
2183 */
2184static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
2185                                        libtrace_packet_t *packet) {
2186    libtrace_eventobj_t event = {0,0,0.0,0};
2187    int nb_rx; /* Number of receive packets we've read */
2188    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
2189
2190    do {
2191
2192        /* See if we already have a packet waiting */
2193        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
2194                        FORMAT(trace)->queue_id, pkts_burst, 1);
2195
2196        if (nb_rx > 0) {
2197            /* Free the last packet buffer */
2198            if (packet->buffer != NULL) {
2199                /* Buffer is owned by DPDK */
2200                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
2201                    rte_pktmbuf_free(packet->buffer);
2202                    packet->buffer = NULL;
2203                } else
2204                /* Buffer is owned by packet i.e. has been malloc'd */
2205                if (packet->buf_control == TRACE_CTRL_PACKET) {
2206                    free(packet->buffer);
2207                    packet->buffer = NULL;
2208                }
2209            }
2210
2211            packet->buf_control = TRACE_CTRL_EXTERNAL;
2212            packet->type = TRACE_RT_DATA_DPDK;
2213            event.type = TRACE_EVENT_PACKET;
2214            dpdk_ready_pkts(FORMAT(trace), &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
2215            event.size = 1; // TODO should be bytes read, which essentially useless anyway
2216
2217            /* XXX - Check this passes the filter trace_read_packet normally
2218             * does this for us but this wont */
2219            if (trace->filter) {
2220                if (!trace_apply_filter(trace->filter, packet)) {
2221                    /* Failed the filter so we loop for another packet */
2222                    trace->filtered_packets ++;
2223                    continue;
2224                }
2225            }
2226            trace->accepted_packets ++;
2227        } else {
2228            /* We only want to sleep for a very short time - we are non-blocking */
2229            event.type = TRACE_EVENT_SLEEP;
2230            event.seconds = 0.0001;
2231            event.size = 0;
2232        }
2233
2234        /* If we get here we have our event */
2235        break;
2236    } while (1);
2237
2238    return event;
2239}
2240
2241
2242static void dpdk_help(void) {
2243    printf("dpdk format module: $Revision: 1752 $\n");
2244    printf("Supported input URIs:\n");
2245    printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
2246    printf("\tThe -<coreid> is optional \n");
2247    printf("\t e.g. dpdk:0000:01:00.1\n");
2248    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
2249    printf("\t By default the last CPU core is used if not otherwise specified.\n");
2250    printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
2251    printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
2252    printf("\n");
2253    printf("Supported output URIs:\n");
2254    printf("\tSame format as the input URI.\n");
2255    printf("\t e.g. dpdk:0000:01:00.1\n");
2256    printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
2257    printf("\n");
2258}
2259
2260static struct libtrace_format_t dpdk = {
2261        "dpdk",
2262        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
2263        TRACE_FORMAT_DPDK,
2264        NULL,                   /* probe filename */
2265        NULL,                               /* probe magic */
2266        dpdk_init_input,            /* init_input */
2267        dpdk_config_input,          /* config_input */
2268        dpdk_start_input,           /* start_input */
2269        dpdk_pause_input,           /* pause_input */
2270        dpdk_init_output,           /* init_output */
2271        NULL,                               /* config_output */
2272        dpdk_start_output,          /* start_ouput */
2273        dpdk_fin_input,             /* fin_input */
2274        dpdk_fin_output,        /* fin_output */
2275        dpdk_read_packet,           /* read_packet */
2276        dpdk_prepare_packet,    /* prepare_packet */
2277        dpdk_fin_packet,                                    /* fin_packet */
2278        dpdk_write_packet,          /* write_packet */
2279        dpdk_get_link_type,         /* get_link_type */
2280        dpdk_get_direction,         /* get_direction */
2281        dpdk_set_direction,         /* set_direction */
2282        NULL,                               /* get_erf_timestamp */
2283        dpdk_get_timeval,           /* get_timeval */
2284        dpdk_get_timespec,          /* get_timespec */
2285        NULL,                               /* get_seconds */
2286        NULL,                               /* seek_erf */
2287        NULL,                               /* seek_timeval */
2288        NULL,                               /* seek_seconds */
2289        dpdk_get_capture_length,/* get_capture_length */
2290        dpdk_get_wire_length,   /* get_wire_length */
2291        dpdk_get_framing_length,/* get_framing_length */
2292        dpdk_set_capture_length,/* set_capture_length */
2293        NULL,                               /* get_received_packets */
2294        dpdk_get_filtered_packets,/* get_filtered_packets */
2295        dpdk_get_dropped_packets,/* get_dropped_packets */
2296        NULL,                   /* get_statistics */
2297        NULL,                       /* get_fd */
2298        dpdk_trace_event,               /* trace_event */
2299        dpdk_help,              /* help */
2300        NULL,                   /* next pointer */
2301        {true, 8},              /* Live, NICs typically have 8 threads */
2302        dpdk_pstart_input, /* pstart_input */
2303        dpdk_pread_packets, /* pread_packets */
2304        dpdk_pause_input, /* ppause */
2305        dpdk_fin_input, /* p_fin */
2306        dpdk_pconfig_input, /* pconfig_input */
2307        dpdk_pregister_thread, /* pregister_thread */
2308        dpdk_punregister_thread, /* punregister_thread */
2309        NULL                            /* get thread stats */
2310};
2311
2312void dpdk_constructor(void) {
2313        register_format(&dpdk);
2314}
Note: See TracBrowser for help on using the repository browser.