Changes in / [2b0eae9:a984307]


Ignore:
Files:
54 added
8 deleted
62 edited

Legend:

Unmodified
Added
Removed
  • INSTALL

    rb6cbdaf rb6cbdaf  
    77Optional:
    88 * DAG libraries (both 2.4 and 2.5 versions are supported)
     9 * libcrypto (required for CryptoPAN anonymisation in traceanon)
     10 * libncurses (required for tracetop)
     11 
    912----------------------------------
    1013
     
    1720
    1821The above series of commands will install libtrace into /usr/local/lib. If
    19 you wish to install to a non-standard location, append the 
     22you wish to install to a non-standard location, append the
    2023--prefix=DIR option to the ./configure command. ./configure also takes
    2124a number of other options - run ./configure --help to view a comprehensive
    2225list.
    2326
    24 You may need to add the library location (e.g. /usr/local/lib) to your 
     27You may need to add the library location (e.g. /usr/local/lib) to your
    2528/etc/ld.so.conf and run 'ldconfig' as root.
    2629
     
    2932Installation FAQ:
    3033
    31 Q. I've installed libpcap but I'm still getting the following error: 
    32 "libpcap0.8 or greater is required to compile libtrace. If you have installed 
    33 it in a non-standard location please use LDFLAGS to specify the location of 
     34Q. I've installed libpcap but I'm still getting the following error:
     35"libpcap0.8 or greater is required to compile libtrace. If you have installed
     36it in a non-standard location please use LDFLAGS to specify the location of
    3437the library"?
    3538
    36 A. You need to install the development version of the pcap library. 
    37 For example, Ubuntu/Debian users will need to install the libpcap0.8-dev 
     39A. You need to install the development version of the pcap library.
     40For example, Ubuntu/Debian users will need to install the libpcap0.8-dev
    3841package in addition to the libpcap0.8 package.
    3942
     
    4750
    4851The best source of information on how to use libtrace and the tools that come
    49 with it is the libtrace wiki located at
    50 http://www.github.com/wanduow/libtrace/wiki
     52with it is the libtrace wiki: https://github.com/wanduow/libtrace/wiki
    5153
    5254
  • README

    rb6cbdaf rb6cbdaf  
    1 libtrace 3.0.22
     1libtrace 4.0.0
    22
    33---------------------------------------------------------------------------
     
    1919and has been since extended to a range of other trace and interface formats.
    2020
    21 Libtrace should build and run on Linux, Mac OS X, FreeBSD and OpenBSD systems.
     21In version 4.0, we have introduced an API for processing packets in parallel
     22using multiple threads. See libtrace_parallel.h for a detailed description
     23of the API.
    2224
    23 Further information about libtrace see
     25Further information about libtrace, see
    2426http://research.wand.net.nz/software/libtrace.php
    2527
     
    3032see the included file GPL for details of this license.
    3133
    32 A detailed ChangeLog can be found on the libtrace wiki: 
     34A detailed ChangeLog can be found on the libtrace wiki:
    3335https://github.com/wanduow/libtrace/wiki/ChangeLog
    3436
     
    3638on the libtrace wiki.
    3739
    38 For further information, please contact the WAND group. See 
    39 http://www.wand.net.nz/ for details. 
     40For further information, please contact the WAND group. See
     41http://www.wand.net.nz/ for details.
  • configure.in

    rc24de65 r32a3ec5  
    44# and in the README
    55
    6 AC_INIT([libtrace],[3.0.22],[contact@wand.net.nz],[libtrace])
    7 
    8 LIBTRACE_MAJOR=3
     6AC_INIT([libtrace],[4.0.0],[contact@wand.net.nz],[libtrace])
     7
     8LIBTRACE_MAJOR=4
    99LIBTRACE_MID=0
    10 LIBTRACE_MINOR=22
     10LIBTRACE_MINOR=0
    1111
    1212# OpenSolaris hides libraries like libncurses in /usr/gnu/lib, which is not
     
    3939        tools/traceends/Makefile
    4040        examples/Makefile examples/skeleton/Makefile examples/rate/Makefile
    41         examples/stats/Makefile examples/tutorial/Makefile
     41        examples/stats/Makefile examples/tutorial/Makefile examples/parallel/Makefile
    4242        docs/libtrace.doxygen
    4343        lib/libtrace.h
     
    115115gcc_PURE
    116116gcc_FORMAT
     117
     118# Check for gcc style TLS (__thread)
     119gcc_TLS
    117120       
    118121# Check for libtool
     
    208211fi
    209212
     213AC_CHECK_LIB(crypto, EVP_EncryptInit_ex, cryptofound=1, cryptofound=0)
     214
    210215# Check for libpcap
    211216AC_CHECK_LIB(pcap,pcap_next_ex,pcapfound=1,pcapfound=0)
    212217AC_CHECK_LIB(pcap,pcap_create,pcapcreate=1,pcapcreate=0)
     218AC_CHECK_LIB(pcap,pcap_set_immediate_mode,pcapimmediate=1,pcapimmediate=0)
    213219AC_CHECK_DECLS([BIOCSETIF],,,[
    214220#include <sys/types.h>
     
    217223#include <net/bpf.h>
    218224])
     225
     226AC_ARG_ENABLE(memory-debugging,
     227                AS_HELP_STRING(--enable-memory-debugging, prints internal memory statistics),[
     228                if test "$HAVE_TLS" = 1
     229                then
     230                    AC_DEFINE([ENABLE_MEM_STATS], 1, [print debug memory statistics])
     231                fi
     232],[])
    219233
    220234# Configure options for man pages
     
    247261        AC_DEFINE([HAVE_PCAP_CREATE],1,[compile with libpcap 1.0 support])
    248262fi
     263
     264if test "$pcapimmediate" = 1; then
     265        AC_DEFINE([HAVE_PCAP_IMMEDIATE],1,[able to use pcap_set_immediate_mode])
     266fi       
    249267
    250268# Configure options for use of DAG cards
     
    296314                AC_DEFINE(DAG_VERSION,25,[defines the DAG driver version])
    297315                libtrace_dag_version=25
    298         fi
     316
     317                AC_CHECK_HEADERS(dag_config_api.h,[csapi_found=1],,)
     318                if test "$csapi_found" = 1; then
     319                        LIBTRACE_LIBS="$LIBTRACE_LIBS -ldagconf"
     320                fi
     321        fi     
    299322fi
    300323
     
    401424fi
    402425
     426# Check for PACKET_FANOUT (borrowed from Suricata)
     427AC_CHECK_DECL([PACKET_FANOUT],
     428        AC_DEFINE([HAVE_PACKET_FANOUT],[1],
     429        [Recent packet fanout support is available]),
     430        [],
     431        [[#include <linux/if_packet.h>]])
     432
     433# If we use DPDK we might be able to use libnuma
     434AC_CHECK_LIB(numa, numa_node_to_cpus, have_numa=1, have_numa=0)
     435
    403436# Checks for various "optional" libraries
    404437AC_CHECK_LIB(pthread, pthread_create, have_pthread=1, have_pthread=0)
     438
     439AC_CHECK_LIB(pthread, pthread_setname_np, have_pthread_setname_np=1, have_pthread_setname_np=0)
    405440
    406441# Check for ncurses
     
    420455AC_CHECK_LIB(rt, clock_gettime, have_clock_gettime=1, have_clock_gettime=0)
    421456LIBS=
     457
     458if test "$have_numa" = 1; then
     459        LIBTRACE_LIBS="$LIBTRACE_LIBS -lnuma"
     460        AC_DEFINE(HAVE_LIBNUMA, 1, [Set to 1 if libnuma is supported])
     461        with_numa=yes
     462else
     463        AC_DEFINE(HAVE_LIBNUMA, 0, [Set to 1 if libnuma is supported])
     464        with_numa=no
     465fi
    422466
    423467if test "$dlfound" = 0; then
     
    436480fi
    437481
     482if test "$have_pthread_setname_np" = 1; then
     483        AC_DEFINE(HAVE_PTHREAD_SETNAME_NP, 1, [Set to 1 if pthread_setname_np is found])
     484fi
     485
     486if test "$cryptofound" = 1; then
     487        AC_DEFINE(HAVE_LIBCRYPTO, 1, [Set to 1 if libcrypto is available])
     488        TOOLS_LIBS="$TOOLS_LIBS -lcrypto"
     489        have_crypto=yes
     490else
     491        have_crypto=no
     492fi
    438493
    439494if test "$have_nsl" = 1; then
     
    453508
    454509if test "$have_clock_gettime" = 1; then
    455     LIBTRACE_LIBS="$LIBTRACE_LIBS -lrt"
     510        LIBTRACE_LIBS="$LIBTRACE_LIBS -lrt"
     511        AC_DEFINE(HAVE_CLOCK_GETTIME, 1, [Set to 1 if clock_gettime is supported])
     512        with_clock_gettime=yes
     513else
     514        AC_DEFINE(HAVE_CLOCK_GETTIME, 0, [Set to 1 if clock_gettime is supported])
     515        with_clock_gettime=no
    456516fi
    457517
     
    599659
    600660if test x"$libtrace_dpdk" = xtrue; then
    601     AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes])
     661        AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes])
     662        reportopt "Compiled with DPDK trace NUMA support" $with_numa
     663        reportopt "Compiled with clock_gettime support" $with_clock_gettime
    602664elif test x"$want_dpdk" != "xno"; then
    603665#   We don't officially support DPDK so only report failure if the user
    604666#   explicitly asked for DPDK. That way, we can hopefully keep it hidden
    605667#   from most users for now...
    606        
    607     AC_MSG_NOTICE([Compiled with DPDK live capture support: No])
    608     AC_MSG_NOTICE([Note: Requires DPDK v1.5 or newer])
     668
     669        AC_MSG_NOTICE([Compiled with DPDK live capture support: No])
     670        AC_MSG_NOTICE([Note: Requires DPDK v1.5 or newer])
    609671fi
    610672reportopt "Compiled with LLVM BPF JIT support" $JIT
    611673reportopt "Building man pages/documentation" $libtrace_doxygen
    612674reportopt "Building tracetop (requires libncurses)" $with_ncurses
     675reportopt "Building traceanon with CryptoPan (requires libcrypto)" $have_crypto
    613676
    614677# Report any errors relating to missing bison, flex, etc.
  • examples/Makefile.am

    r8835f5a r16cb2a2  
    11
    2 SUBDIRS=skeleton rate stats tutorial
     2SUBDIRS=skeleton rate stats tutorial parallel
  • examples/skeleton/Makefile.am

    rfdb5e98 rc42a2d6  
    11
    2 noinst_PROGRAMS=trivial complete event output
     2noinst_PROGRAMS=trivial complete event output parallel
    33
    44include ../Makefile.examples
     
    77event_SOURCES=event.c
    88output_SOURCES=output.c
     9parallel_SOURCES=parallel.c
    910
  • lib/Makefile.am

    r3a333e2 re63d80d  
    11lib_LTLIBRARIES = libtrace.la
    2 include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 
     2include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h \
     3        rt_protocol.h erftypes.h libtrace_parallel.h \
     4        data-struct/ring_buffer.h data-struct/object_cache.h \
     5        data-struct/vector.h data-struct/message_queue.h \
     6        data-struct/deque.h data-struct/linked_list.h \
     7        data-struct/sliding_window.h hash_toeplitz.h \
     8        data-struct/buckets.h
    39
    4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@
    5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@
     10AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread
     11AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread
    612
    713extra_DIST = format_template.c
    8 NATIVEFORMATS=format_linux.c
     14NATIVEFORMATS=format_linux_common.c format_linux_ring.c format_linux_int.c format_linux_common.h
    915BPFFORMATS=format_bpf.c
    1016
     
    2935NATIVEFORMATS+= format_dpdk.c
    3036# So we also make libtrace.mk in dpdk otherwise automake tries to expand
    31 # it to early which I cannot seem to stop unless we use a path that
     37# it too early which I cannot seem to stop unless we use a path that
    3238# doesn't exist currently
    3339export RTE_SDK=@RTE_SDK@
     
    4450endif
    4551
    46 libtrace_la_SOURCES = trace.c common.h \
     52libtrace_la_SOURCES = trace.c trace_parallel.c common.h \
    4753                format_erf.c format_pcap.c format_legacy.c \
    4854                format_rt.c format_helper.c format_helper.h format_pcapfile.c \
     
    5763                $(DAGSOURCE) format_erf.h \
    5864                $(BPFJITSOURCE) \
    59                 libtrace_arphrd.h
     65                libtrace_arphrd.h \
     66                data-struct/ring_buffer.c data-struct/vector.c \
     67                data-struct/message_queue.c data-struct/deque.c \
     68                data-struct/sliding_window.c data-struct/object_cache.c \
     69                data-struct/linked_list.c hash_toeplitz.c combiner_ordered.c \
     70                data-struct/buckets.c \
     71                combiner_sorted.c combiner_unordered.c \
     72                pthread_spinlock.c pthread_spinlock.h
    6073
    6174if DAG2_4
  • lib/byteswap.c

    r8b49230 rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
  • lib/checksum.c

    rc909fad rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007-2013 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
  • lib/checksum.h

    rc909fad rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
  • lib/dagformat.h

    r143eaba rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
  • lib/daglegacy.h

    re56be6d rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
  • lib/erftypes.h

    r204872da rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
  • lib/format_atmhdr.c

    r5952ff0 rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
     
    227227        NULL,                           /* get_filtered_packets */
    228228        NULL,                           /* get_dropped_packets */
    229         NULL,                           /* get_captured_packets */
     229        NULL,                           /* get_statistics */
    230230        NULL,                           /* get_fd */
    231231        trace_event_trace,              /* trace_event */
    232232        NULL,                           /* help */
    233         NULL                            /* next pointer */
     233        NULL,                            /* next pointer */
     234        NON_PARALLEL(false)
    234235};
    235236       
  • lib/format_bpf.c

    r13bcf9e r13bcf9e  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
     
    117117        /* A boolean flag indicating whether the statistics are up-to-date */
    118118        int stats_valid;
     119        /* Bucket data structure for safely storing buffers of packets */
     120        libtrace_bucket_t *bucket;
    119121};
    120122
     
    139141        FORMATIN(libtrace)->snaplen = 65536;
    140142        FORMATIN(libtrace)->stats_valid = 0;
     143        FORMATIN(libtrace)->bucket = libtrace_bucket_init();
    141144
    142145        return 0;
     
    203206        }
    204207
    205         FORMATIN(libtrace)->buffer = malloc(FORMATIN(libtrace)->buffersize);
    206         FORMATIN(libtrace)->bufptr = FORMATIN(libtrace)->buffer;
     208        FORMATIN(libtrace)->buffer = NULL;
     209        FORMATIN(libtrace)->bufptr = NULL;
    207210        FORMATIN(libtrace)->remaining = 0;
    208211
     
    316319}
    317320
     321static void bpf_get_statistics(libtrace_t *trace, libtrace_stat_t *stat) {
     322        uint64_t dropped = bpf_get_dropped_packets(trace);
     323        uint64_t received = bpf_get_received_packets(trace);
     324
     325        if (dropped != (uint64_t)-1) {
     326                stat->dropped_valid = 1;
     327                stat->dropped = dropped;
     328        }
     329
     330        if (received != (uint64_t) -1) {
     331                stat->received_valid = 1;
     332                stat->received = received;
     333        }
     334
     335}
     336
    318337/* Pauses a BPF input trace */
    319338static int bpf_pause_input(libtrace_t *libtrace)
     
    328347static int bpf_fin_input(libtrace_t *libtrace)
    329348{
    330         free(libtrace->format_data);
     349        libtrace_bucket_destroy(FORMATIN(libtrace)->bucket);
     350        free(libtrace->format_data);
    331351        return 0;
    332352}
     
    356376                        /* Captures are always realtime */
    357377                        break;
     378                case TRACE_OPTION_HASHER:
     379                        /* TODO investigate hashing in BSD? */
     380                        break;
    358381
    359382                /* Avoid default: so that future options will cause a warning
     
    438461        packet->type = bpf_linktype_to_rt(FORMATIN(libtrace)->linktype);
    439462
     463        if (FORMATIN(libtrace)->remaining <= 0) {
     464                FORMATIN(libtrace)->buffer = malloc(FORMATIN(libtrace)->buffersize);
     465                libtrace_create_new_bucket(FORMATIN(libtrace)->bucket, FORMATIN(libtrace)->buffer);
     466        }
     467
    440468        while (FORMATIN(libtrace)->remaining <= 0) {
     469
    441470                tout.tv_sec = 0;
    442471                tout.tv_usec = 500000;
     
    484513        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
    485514
    486         if (packet->buf_control == TRACE_CTRL_PACKET)
     515        if (packet->buffer && packet->buf_control == TRACE_CTRL_PACKET)
    487516                free(packet->buffer);
    488517
     
    493522                return -1;
    494523        }
    495        
    496 
    497         /* Skip past the packet record we're going to return, making sure
     524
     525        packet->internalid = libtrace_push_into_bucket(FORMATIN(libtrace)->bucket);
     526        packet->srcbucket = FORMATIN(libtrace)->bucket;
     527
     528        /* Skip past the packet record we're going to return, making sure
    498529         * that we deal with padding correctly */
    499530        FORMATIN(libtrace)->bufptr+=
     
    607638        bpf_get_framing_length, /* get_framing_length */
    608639        NULL,                   /* set_capture_length */
    609         bpf_get_received_packets,/* get_received_packets */
     640        NULL,                   /* get_received_packets */
    610641        NULL,                   /* get_filtered_packets */
    611         bpf_get_dropped_packets,/* get_dropped_packets */
    612         NULL,                   /* get_captured_packets */
     642        NULL,                   /* get_dropped_packets */
     643        bpf_get_statistics,     /* get_statistics */
    613644        bpf_get_fd,             /* get_fd */
    614645        trace_event_device,     /* trace_event */
    615646        bpf_help,               /* help */
    616         NULL
     647        NULL,                   /* next pointer */
     648        NON_PARALLEL(true)
    617649};
    618650#else   /* HAVE_DECL_BIOCSETIF */
     
    656688        bpf_get_framing_length, /* get_framing_length */
    657689        NULL,                   /* set_capture_length */
    658         NULL,/* get_received_packets */
     690        NULL,                   /* get_received_packets */
    659691        NULL,                   /* get_filtered_packets */
    660         NULL,/* get_dropped_packets */
    661         NULL,                   /* get_captured_packets */
     692        NULL,                   /* get_dropped_packets */
     693        NULL,                   /* get_statistics */
    662694        NULL,                   /* get_fd */
    663695        NULL,                   /* trace_event */
    664696        bpf_help,               /* help */
    665         NULL
     697        NULL,                   /* next pointer */
     698        NON_PARALLEL(true)
    666699};
    667700#endif  /* HAVE_DECL_BIOCSETIF */
  • lib/format_dag24.c

    rc70f59f rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
     
    554554        NULL,                           /* get_filtered_packets */
    555555        dag_get_dropped_packets,        /* get_dropped_packets */
    556         NULL,                           /* get_captured_packets */
     556        NULL,                           /* get_statistics */
    557557        NULL,                           /* get_fd */
    558558        trace_event_dag,                /* trace_event */
    559559        dag_help,                       /* help */
    560         NULL                            /* next pointer */
     560        NULL,                            /* next pointer */
     561    NON_PARALLEL(true)
    561562};
    562563
  • lib/format_dag25.c

    r0054c50 rd8b05b7  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
     
    5454#include <pthread.h>
    5555
     56
     57#ifdef HAVE_DAG_CONFIG_API_H
     58#include <dag_config_api.h>
     59#endif
    5660
    5761#ifdef WIN32
     
    7983 */
    8084
    81 
    8285#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8386#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     87#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
    8488
    8589#define FORMAT_DATA DATA(libtrace)
     
    8791
    8892#define DUCK FORMAT_DATA->duck
     93
     94#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
     95#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
     96
    8997static struct libtrace_format_t dag;
    9098
     
    104112/* "Global" data that is stored for each DAG output trace */
    105113struct dag_format_data_out_t {
    106         /* String containing the DAG device name */
    107         char *device_name;
    108114        /* The DAG device being used for writing */
    109115        struct dag_dev_t *device;
     
    118124};
    119125
     126/* Data that is stored against each input stream */
     127struct dag_per_stream_t {
     128        /* DAG stream number */
     129        uint16_t dagstream;
     130        /* Pointer to the last unread byte in the DAG memory */
     131        uint8_t *top;
     132        /* Pointer to the first unread byte in the DAG memory */
     133        uint8_t *bottom;
     134        /* Amount of data processed from the bottom pointer */
     135        uint32_t processed;
     136        /* Number of packets seen by the stream */
     137        uint64_t pkt_count;
     138        /* Drop count for this particular stream */
     139        uint64_t drops;
     140        /* Boolean values to indicate if a particular interface has been seen
     141         * or not. This is limited to four interfaces, which is enough to
     142         * support all current DAG cards */
     143        uint8_t seeninterface[4];
     144};
     145
    120146/* "Global" data that is stored for each DAG input trace */
    121147struct dag_format_data_t {
    122 
    123         /* Data required for regular DUCK reporting */
     148        /* DAG device */
     149        struct dag_dev_t *device;
     150        /* Boolean flag indicating whether the trace is currently attached */
     151        int stream_attached;
     152        /* Data stored against each DAG input stream */
     153        libtrace_list_t *per_stream;
     154
     155        /* Data required for regular DUCK reporting.
     156         * We put this on a new cache line otherwise we have a lot of false
     157         * sharing caused by updating the last_pkt.
     158         * This should only ever be accessed by the first thread stream,
     159         * that includes both read and write operations.
     160         */
    124161        struct {
    125162                /* Timestamp of the last DUCK report */
    126163                uint32_t last_duck;
    127164                /* The number of seconds between each DUCK report */
    128                 uint32_t duck_freq;
     165                uint32_t duck_freq;
    129166                /* Timestamp of the last packet read from the DAG card */
    130                 uint32_t last_pkt;
     167                uint32_t last_pkt;
    131168                /* Dummy trace to ensure DUCK packets are dealt with using the
    132169                 * DUCK format functions */
    133                 libtrace_t *dummy_duck;
    134         } duck;
    135 
    136         /* String containing the DAG device name */
    137         char *device_name;
    138         /* The DAG device that we are reading from */
    139         struct dag_dev_t *device;
    140         /* The DAG stream that we are reading from */
    141         unsigned int dagstream;
    142         /* Boolean flag indicating whether the stream is currently attached */
    143         int stream_attached;
    144         /* Pointer to the first unread byte in the DAG memory hole */
    145         uint8_t *bottom;
    146         /* Pointer to the last unread byte in the DAG memory hole */
    147         uint8_t *top;
    148         /* The amount of data processed thus far from the bottom pointer */
    149         uint32_t processed;
    150         /* The number of packets that have been dropped */
    151         uint64_t drops;
    152 
    153         uint8_t seeninterface[4];
     170                libtrace_t *dummy_duck;
     171        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
    154172};
    155173
     
    207225
    208226/* Initialises the DAG output data structure */
    209 static void dag_init_format_out_data(libtrace_out_t *libtrace) {
    210         libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
     227static void dag_init_format_out_data(libtrace_out_t *libtrace)
     228{
     229        libtrace->format_data = (struct dag_format_data_out_t *)
     230                malloc(sizeof(struct dag_format_data_out_t));
    211231        // no DUCK on output
    212232        FORMAT_DATA_OUT->stream_attached = 0;
    213233        FORMAT_DATA_OUT->device = NULL;
    214         FORMAT_DATA_OUT->device_name = NULL;
    215234        FORMAT_DATA_OUT->dagstream = 0;
    216235        FORMAT_DATA_OUT->waiting = 0;
     
    219238
    220239/* Initialises the DAG input data structure */
    221 static void dag_init_format_data(libtrace_t *libtrace) {
     240static void dag_init_format_data(libtrace_t *libtrace)
     241{
     242        struct dag_per_stream_t stream_data;
     243
    222244        libtrace->format_data = (struct dag_format_data_t *)
    223245                malloc(sizeof(struct dag_format_data_t));
    224246        DUCK.last_duck = 0;
    225         DUCK.duck_freq = 0;
    226         DUCK.last_pkt = 0;
    227         DUCK.dummy_duck = NULL;
    228         FORMAT_DATA->stream_attached = 0;
    229         FORMAT_DATA->drops = 0;
    230         FORMAT_DATA->device_name = NULL;
    231         FORMAT_DATA->device = NULL;
    232         FORMAT_DATA->dagstream = 0;
    233         FORMAT_DATA->processed = 0;
    234         FORMAT_DATA->bottom = NULL;
    235         FORMAT_DATA->top = NULL;
    236         memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
     247        DUCK.duck_freq = 0;
     248        DUCK.last_pkt = 0;
     249        DUCK.dummy_duck = NULL;
     250
     251        FORMAT_DATA->per_stream =
     252                libtrace_list_init(sizeof(stream_data));
     253        assert(FORMAT_DATA->per_stream != NULL);
     254
     255        /* We'll start with just one instance of stream_data, and we'll
     256         * add more later if we need them */
     257        memset(&stream_data, 0, sizeof(stream_data));
     258        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
    237259}
    238260
     
    241263 *
    242264 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    243 static struct dag_dev_t *dag_find_open_device(char *dev_name) {
     265static struct dag_dev_t *dag_find_open_device(char *dev_name)
     266{
    244267        struct dag_dev_t *dag_dev;
    245268
     
    252275                        dag_dev->ref_count ++;
    253276                        return dag_dev;
    254 
    255277                }
    256278                dag_dev = dag_dev->next;
    257279        }
    258280        return NULL;
    259 
    260 
    261281}
    262282
     
    267287 *
    268288 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    269 static void dag_close_device(struct dag_dev_t *dev) {
     289static void dag_close_device(struct dag_dev_t *dev)
     290{
    270291        /* Need to remove from the device list */
    271 
    272292        assert(dev->ref_count == 0);
    273293
     
    290310 *
    291311 * NOTE: this function should only be called when opening a DAG device for
    292  * writing - there is little practical difference between this and the 
     312 * writing - there is little practical difference between this and the
    293313 * function below that covers the reading case, but we need the output trace
    294  * object to report errors properly so the two functions take slightly 
     314 * object to report errors properly so the two functions take slightly
    295315 * different arguments. This is really lame and there should be a much better
    296316 * way of doing this.
    297317 *
    298  * NOTE: This function assumes the open_dag_mutex is held by the caller 
     318 * NOTE: This function assumes the open_dag_mutex is held by the caller
    299319 */
    300 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
     320static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
     321                                                char *dev_name)
     322{
    301323        struct stat buf;
    302324        int fd;
     
    307329                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
    308330                return NULL;
    309 }
     331        }
    310332
    311333        /* Make sure it is the appropriate type of device */
     
    344366 *
    345367 * NOTE: this function should only be called when opening a DAG device for
    346  * reading - there is little practical difference between this and the 
     368 * reading - there is little practical difference between this and the
    347369 * function above that covers the writing case, but we need the input trace
    348  * object to report errors properly so the two functions take slightly 
     370 * object to report errors properly so the two functions take slightly
    349371 * different arguments. This is really lame and there should be a much better
    350372 * way of doing this.
     
    357379
    358380        /* Make sure the device exists */
    359         if (stat(dev_name, &buf) == -1) {
    360                 trace_set_err(libtrace,errno,"stat(%s)",dev_name);
    361                 return NULL;
    362         }
     381        if (stat(dev_name, &buf) == -1) {
     382                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
     383                return NULL;
     384        }
    363385
    364386        /* Make sure it is the appropriate type of device */
     
    366388                /* Try opening the DAG device */
    367389                if((fd = dag_open(dev_name)) < 0) {
    368                         trace_set_err(libtrace,errno,"Cannot open DAG %s",
    369                                         dev_name);
    370                         return NULL;
    371                 }
     390                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
     391                                      dev_name);
     392                        return NULL;
     393                }
    372394        } else {
    373395                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
    374                                 dev_name);
    375                 return NULL;
    376         }
     396                              dev_name);
     397                return NULL;
     398        }
    377399
    378400        /* Add the device to our device list - it is just a doubly linked
     
    395417
    396418/* Creates and initialises a DAG output trace */
    397 static int dag_init_output(libtrace_out_t *libtrace) {
     419static int dag_init_output(libtrace_out_t *libtrace)
     420{
     421        /* Upon successful creation, the device name is stored against the
     422         * device and free when it is free()d */
     423        char *dag_dev_name = NULL;
    398424        char *scan = NULL;
    399425        struct dag_dev_t *dag_device = NULL;
    400426        int stream = 1;
    401        
     427
    402428        /* XXX I don't know if this is important or not, but this function
    403429         * isn't present in all of the driver releases that this code is
     
    409435
    410436        dag_init_format_out_data(libtrace);
    411         /* Grab the mutex while we're likely to be messing with the device 
     437        /* Grab the mutex while we're likely to be messing with the device
    412438         * list */
    413439        pthread_mutex_lock(&open_dag_mutex);
    414        
     440
    415441        /* Specific streams are signified using a comma in the libtrace URI,
    416442         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
     
    418444         * If no stream is specified, we will write using stream 1 */
    419445        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    420                 FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
     446                dag_dev_name = strdup(libtrace->uridata);
    421447        } else {
    422                 FORMAT_DATA_OUT->device_name =
    423                                 (char *)strndup(libtrace->uridata,
     448                dag_dev_name = (char *)strndup(libtrace->uridata,
    424449                                (size_t)(scan - libtrace->uridata));
    425450                stream = atoi(++scan);
     
    428453
    429454        /* See if our DAG device is already open */
    430         dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
     455        dag_device = dag_find_open_device(dag_dev_name);
    431456
    432457        if (dag_device == NULL) {
    433458                /* Device not yet opened - open it ourselves */
    434                 dag_device = dag_open_output_device(libtrace,
    435                                 FORMAT_DATA_OUT->device_name);
     459                dag_device = dag_open_output_device(libtrace, dag_dev_name);
     460        } else {
     461                /* Otherwise, just use the existing one */
     462                free(dag_dev_name);
     463                dag_dev_name = NULL;
    436464        }
    437465
    438466        /* Make sure we have successfully opened a DAG device */
    439467        if (dag_device == NULL) {
    440                 if (FORMAT_DATA_OUT->device_name) {
    441                         free(FORMAT_DATA_OUT->device_name);
    442                         FORMAT_DATA_OUT->device_name = NULL;
     468                if (dag_dev_name) {
     469                        free(dag_dev_name);
    443470                }
    444471                pthread_mutex_unlock(&open_dag_mutex);
     
    453480/* Creates and initialises a DAG input trace */
    454481static int dag_init_input(libtrace_t *libtrace) {
     482        /* Upon successful creation, the device name is stored against the
     483         * device and free when it is free()d */
     484        char *dag_dev_name = NULL;
    455485        char *scan = NULL;
    456486        int stream = 0;
     
    458488
    459489        dag_init_format_data(libtrace);
    460         /* Grab the mutex while we're likely to be messing with the device 
     490        /* Grab the mutex while we're likely to be messing with the device
    461491         * list */
    462492        pthread_mutex_lock(&open_dag_mutex);
    463        
    464        
    465         /* Specific streams are signified using a comma in the libtrace URI,
     493
     494
     495        /* DAG cards support multiple streams. In a single threaded capture,
     496         * these are specified using a comma in the libtrace URI,
    466497         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    467498         *
    468          * If no stream is specified, we will read from stream 0 */
     499         * If no stream is specified, we will read from stream 0 with
     500         * one thread
     501         */
    469502        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    470                 FORMAT_DATA->device_name = strdup(libtrace->uridata);
     503                dag_dev_name = strdup(libtrace->uridata);
    471504        } else {
    472                 FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
     505                dag_dev_name = (char *)strndup(libtrace->uridata,
    473506                                (size_t)(scan - libtrace->uridata));
    474507                stream = atoi(++scan);
    475508        }
    476509
    477         FORMAT_DATA->dagstream = stream;
     510        FORMAT_DATA_FIRST->dagstream = stream;
    478511
    479512        /* See if our DAG device is already open */
    480         dag_device = dag_find_open_device(FORMAT_DATA->device_name);
     513        dag_device = dag_find_open_device(dag_dev_name);
    481514
    482515        if (dag_device == NULL) {
    483516                /* Device not yet opened - open it ourselves */
    484                 dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
     517                dag_device = dag_open_device(libtrace, dag_dev_name);
     518        } else {
     519                /* Otherwise, just use the existing one */
     520                free(dag_dev_name);
     521                dag_dev_name = NULL;
    485522        }
    486523
    487524        /* Make sure we have successfully opened a DAG device */
    488525        if (dag_device == NULL) {
    489                 if (FORMAT_DATA->device_name)
    490                         free(FORMAT_DATA->device_name);
    491                 FORMAT_DATA->device_name = NULL;
     526                if (dag_dev_name)
     527                        free(dag_dev_name);
     528                dag_dev_name = NULL;
    492529                pthread_mutex_unlock(&open_dag_mutex);
    493530                return -1;
     
    496533        FORMAT_DATA->device = dag_device;
    497534
    498         /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
    499         /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
    500  
    501         *  The symptom of the port being disabled is that libtrace will appear to hang.
    502         */
     535        /* See Config_Status_API_Programming_Guide.pdf from the Endace
     536           Dag Documentation */
     537        /* Check kBooleanAttributeActive is true -- no point capturing
     538         * on an interface that's disabled
     539         *
     540         * The symptom of the port being disabled is that libtrace
     541         * will appear to hang. */
    503542        /* Check kBooleanAttributeFault is false */
    504543        /* Check kBooleanAttributeLocalFault is false */
     
    506545        /* Check kBooleanAttributePeerLink ? */
    507546
    508         /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
     547        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
     548           on libtrace promisc attribute?*/
    509549        /* Set kUint32AttributeSnapLength to the snaplength */
    510550
    511551        pthread_mutex_unlock(&open_dag_mutex);
    512         return 0;
    513 }
     552        return 0;
     553}
     554
     555#ifdef HAVE_DAG_CONFIG_API_H
     556static int dag_csapi_set_snaplen(libtrace_t *libtrace, int slen) {
     557        dag_card_ref_t card_ref = NULL;
     558        dag_component_t root = NULL;
     559        attr_uuid_t uuid = 0;
     560
     561        if (slen < 0)
     562                slen = 0;
     563
     564        card_ref = dag_config_init(FORMAT_DATA->device->dev_name);
     565        root = dag_config_get_root_component(card_ref);
     566       
     567        uuid = dag_component_get_config_attribute_uuid(root, kBooleanAttributeVarlen);
     568        dag_config_set_boolean_attribute(card_ref, uuid, true);
     569
     570        uuid = dag_component_get_config_attribute_uuid(root, kUint32AttributeSnaplength);
     571        dag_config_set_uint32_attribute(card_ref, uuid, (uint32_t)slen);
     572
     573        return 0;
     574       
     575
     576}
     577#endif /* HAVE_DAG_CONFIG_API_H */
    514578
    515579/* Configures a DAG input trace */
    516580static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
    517                                 void *data) {
    518         char conf_str[4096];
     581                            void *data)
     582{
    519583        switch(option) {
    520                 case TRACE_OPTION_META_FREQ:
    521                         /* This option is used to specify the frequency of DUCK
    522                          * updates */
    523                         DUCK.duck_freq = *(int *)data;
    524                         return 0;
    525                 case TRACE_OPTION_SNAPLEN:
    526                         /* Tell the card our new snap length */
    527                         snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
    528                         if (dag_configure(FORMAT_DATA->device->fd,
    529                                                 conf_str) != 0) {
    530                                 trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
    531                                 return -1;
    532                         }
    533                         return 0;
    534                 case TRACE_OPTION_PROMISC:
    535                         /* DAG already operates in a promisc fashion */
    536                         return -1;
    537                 case TRACE_OPTION_FILTER:
    538                         /* We don't yet support pushing filters into DAG
    539                          * cards */
    540                         return -1;
    541                 case TRACE_OPTION_EVENT_REALTIME:
    542                         /* Live capture is always going to be realtime */
     584        case TRACE_OPTION_META_FREQ:
     585                /* This option is used to specify the frequency of DUCK
     586                 * updates */
     587                DUCK.duck_freq = *(int *)data;
     588                return 0;
     589        case TRACE_OPTION_SNAPLEN:
     590#ifdef HAVE_DAG_CONFIG_API_H
     591                return dag_csapi_set_snaplen(libtrace, *(int *)data);
     592#else
     593                /* Tell the card our new snap length */
     594        {
     595                char conf_str[4096];
     596                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
     597                if (dag_configure(FORMAT_DATA->device->fd,
     598                                  conf_str) != 0) {
     599                        trace_set_err(libtrace, errno, "Failed to configure "
     600                                      "snaplen on DAG card: %s",
     601                                      libtrace->uridata);
    543602                        return -1;
    544         }
     603                }
     604        }
     605#endif /* HAVE_DAG_CONFIG_API_H */
     606
     607                return 0;
     608        case TRACE_OPTION_PROMISC:
     609                /* DAG already operates in a promisc fashion */
     610                return -1;
     611        case TRACE_OPTION_FILTER:
     612                /* We don't yet support pushing filters into DAG
     613                 * cards */
     614                return -1;
     615        case TRACE_OPTION_EVENT_REALTIME:
     616                /* Live capture is always going to be realtime */
     617                return -1;
     618        case TRACE_OPTION_HASHER:
     619                /* Lets just say we did this, it's currently still up to
     620                 * the user to configure this correctly. */
     621                return 0;
     622        }
    545623        return -1;
    546624}
    547625
    548626/* Starts a DAG output trace */
    549 static int dag_start_output(libtrace_out_t *libtrace) {
     627static int dag_start_output(libtrace_out_t *libtrace)
     628{
    550629        struct timeval zero, nopoll;
    551630
     
    555634
    556635        /* Attach and start the DAG stream */
    557 
    558636        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
    559637                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
     
    570648
    571649        /* We don't want the dag card to do any sleeping */
    572 
    573650        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
    574651                        FORMAT_DATA_OUT->dagstream, 0, &zero,
     
    578655}
    579656
    580 /* Starts a DAG input trace */
    581 static int dag_start_input(libtrace_t *libtrace) {
    582         struct timeval zero, nopoll;
    583         uint8_t *top, *bottom, *starttop;
     657static int dag_start_input_stream(libtrace_t *libtrace,
     658                                  struct dag_per_stream_t * stream) {
     659        struct timeval zero, nopoll;
     660        uint8_t *top, *bottom, *starttop;
    584661        top = bottom = NULL;
    585662
    586663        zero.tv_sec = 0;
    587         zero.tv_usec = 10000;
    588         nopoll = zero;
     664        zero.tv_usec = 10000;
     665        nopoll = zero;
    589666
    590667        /* Attach and start the DAG stream */
    591668        if (dag_attach_stream(FORMAT_DATA->device->fd,
    592                                 FORMAT_DATA->dagstream, 0, 0) < 0) {
    593                 trace_set_err(libtrace, errno, "Cannot attach DAG stream");
    594                 return -1;
    595         }
     669                              stream->dagstream, 0, 0) < 0) {
     670                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
     671                              stream->dagstream);
     672                return -1;
     673        }
    596674
    597675        if (dag_start_stream(FORMAT_DATA->device->fd,
    598                                 FORMAT_DATA->dagstream) < 0) {
    599                 trace_set_err(libtrace, errno, "Cannot start DAG stream");
    600                 return -1;
    601         }
     676                             stream->dagstream) < 0) {
     677                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
     678                              stream->dagstream);
     679                return -1;
     680        }
    602681        FORMAT_DATA->stream_attached = 1;
    603        
     682
    604683        /* We don't want the dag card to do any sleeping */
    605         dag_set_stream_poll(FORMAT_DATA->device->fd,
    606                                 FORMAT_DATA->dagstream, 0, &zero,
    607                                 &nopoll);
     684        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
     685                            stream->dagstream, 0, &zero,
     686                            &nopoll) < 0) {
     687                trace_set_err(libtrace, errno,
     688                              "dag_set_stream_poll failed!");
     689                return -1;
     690        }
    608691
    609692        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
    610                                         FORMAT_DATA->dagstream,
    611                                         &bottom);
     693                                      stream->dagstream,
     694                                      &bottom);
    612695
    613696        /* Should probably flush the memory hole now */
     
    616699                bottom += (starttop - bottom);
    617700                top = dag_advance_stream(FORMAT_DATA->device->fd,
    618                                         FORMAT_DATA->dagstream,
    619                                         &bottom);
    620         }
    621         FORMAT_DATA->top = top;
    622         FORMAT_DATA->bottom = bottom;
    623         FORMAT_DATA->processed = 0;
    624         FORMAT_DATA->drops = 0;
     701                                         stream->dagstream,
     702                                         &bottom);
     703        }
     704        stream->top = top;
     705        stream->bottom = bottom;
     706        stream->processed = 0;
     707        stream->drops = 0;
    625708
    626709        return 0;
     710
     711}
     712
     713/* Starts a DAG input trace */
     714static int dag_start_input(libtrace_t *libtrace)
     715{
     716        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
     717}
     718
     719static int dag_pstart_input(libtrace_t *libtrace)
     720{
     721        char *scan, *tok;
     722        uint16_t stream_count = 0, max_streams;
     723        int iserror = 0;
     724        struct dag_per_stream_t stream_data;
     725
     726        /* Check we aren't trying to create more threads than the DAG card can
     727         * handle */
     728        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
     729        if (libtrace->perpkt_thread_count > max_streams) {
     730                fprintf(stderr,
     731                              "WARNING: DAG has only %u streams available, "
     732                              "capping total number of threads at this value.",
     733                              max_streams);
     734                libtrace->perpkt_thread_count = max_streams;
     735        }
     736
     737        /* Get the stream names from the uri */
     738        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     739                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     740                              "Format uri doesn't specify the DAG streams");
     741                iserror = 1;
     742                goto cleanup;
     743        }
     744
     745        scan++;
     746
     747        tok = strtok(scan, ",");
     748        while (tok != NULL) {
     749                /* Ensure we haven't specified too many streams */
     750                if (stream_count >= libtrace->perpkt_thread_count) {
     751                        fprintf(stderr,
     752                                      "WARNING: Format uri specifies too many "
     753                                      "streams. Maximum is %u, so only using "
     754                                      "the first %u from the uri.",
     755                                      libtrace->perpkt_thread_count,
     756                                      libtrace->perpkt_thread_count);
     757                        break;
     758                }
     759
     760                /* Save the stream details */
     761                if (stream_count == 0) {
     762                        /* Special case where we update the existing stream
     763                         * data structure */
     764                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
     765                } else {
     766                        memset(&stream_data, 0, sizeof(stream_data));
     767                        stream_data.dagstream = (uint16_t)atoi(tok);
     768                        libtrace_list_push_back(FORMAT_DATA->per_stream,
     769                                                &stream_data);
     770                }
     771
     772                stream_count++;
     773                tok = strtok(NULL, ",");
     774        }
     775
     776        if (stream_count < libtrace->perpkt_thread_count) {
     777                libtrace->perpkt_thread_count = stream_count;
     778        }
     779       
     780        FORMAT_DATA->stream_attached = 1;
     781
     782 cleanup:
     783        if (iserror) {
     784                return -1;
     785        } else {
     786                return 0;
     787        }
    627788}
    628789
    629790/* Pauses a DAG output trace */
    630 static int dag_pause_output(libtrace_out_t *libtrace) {
    631 
     791static int dag_pause_output(libtrace_out_t *libtrace)
     792{
    632793        /* Stop and detach the stream */
    633794        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
    634                         FORMAT_DATA_OUT->dagstream) < 0) {
     795                            FORMAT_DATA_OUT->dagstream) < 0) {
    635796                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
    636797                return -1;
    637798        }
    638799        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
    639                         FORMAT_DATA_OUT->dagstream) < 0) {
    640                 trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
     800                              FORMAT_DATA_OUT->dagstream) < 0) {
     801                trace_set_err_out(libtrace, errno,
     802                                  "Could not detach DAG stream");
    641803                return -1;
    642804        }
     
    646808
    647809/* Pauses a DAG input trace */
    648 static int dag_pause_input(libtrace_t *libtrace) {
    649 
    650         /* Stop and detach the stream */
    651         if (dag_stop_stream(FORMAT_DATA->device->fd,
    652                                 FORMAT_DATA->dagstream) < 0) {
    653                 trace_set_err(libtrace, errno, "Could not stop DAG stream");
    654                 return -1;
    655         }
    656         if (dag_detach_stream(FORMAT_DATA->device->fd,
    657                                 FORMAT_DATA->dagstream) < 0) {
    658                 trace_set_err(libtrace, errno, "Could not detach DAG stream");
    659                 return -1;
    660         }
     810static int dag_pause_input(libtrace_t *libtrace)
     811{
     812        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     813
     814        /* Stop and detach each stream */
     815        while (tmp != NULL) {
     816                if (dag_stop_stream(FORMAT_DATA->device->fd,
     817                                    STREAM_DATA(tmp)->dagstream) < 0) {
     818                        trace_set_err(libtrace, errno,
     819                                      "Could not stop DAG stream");
     820                        return -1;
     821                }
     822                if (dag_detach_stream(FORMAT_DATA->device->fd,
     823                                      STREAM_DATA(tmp)->dagstream) < 0) {
     824                        trace_set_err(libtrace, errno,
     825                                      "Could not detach DAG stream");
     826                        return -1;
     827                }
     828
     829                tmp = tmp->next;
     830        }
     831
    661832        FORMAT_DATA->stream_attached = 0;
    662833        return 0;
    663834}
    664835
     836
     837
    665838/* Closes a DAG input trace */
    666 static int dag_fin_input(libtrace_t *libtrace) {
     839static int dag_fin_input(libtrace_t *libtrace)
     840{
    667841        /* Need the lock, since we're going to be handling the device list */
    668842        pthread_mutex_lock(&open_dag_mutex);
    669        
     843
    670844        /* Detach the stream if we are not paused */
    671845        if (FORMAT_DATA->stream_attached)
    672846                dag_pause_input(libtrace);
    673         FORMAT_DATA->device->ref_count --;
     847        FORMAT_DATA->device->ref_count--;
    674848
    675849        /* Close the DAG device if there are no more references to it */
    676850        if (FORMAT_DATA->device->ref_count == 0)
    677851                dag_close_device(FORMAT_DATA->device);
     852
    678853        if (DUCK.dummy_duck)
    679854                trace_destroy_dead(DUCK.dummy_duck);
    680         if (FORMAT_DATA->device_name)
    681                 free(FORMAT_DATA->device_name);
     855
     856        /* Clear the list */
     857        libtrace_list_deinit(FORMAT_DATA->per_stream);
    682858        free(libtrace->format_data);
    683859        pthread_mutex_unlock(&open_dag_mutex);
    684         return 0; /* success */
     860        return 0; /* success */
    685861}
    686862
    687863/* Closes a DAG output trace */
    688 static int dag_fin_output(libtrace_out_t *libtrace) {
    689        
     864static int dag_fin_output(libtrace_out_t *libtrace)
     865{
     866
    690867        /* Commit any outstanding traffic in the txbuffer */
    691868        if (FORMAT_DATA_OUT->waiting) {
    692                 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    693                                 FORMAT_DATA_OUT->waiting );
    694         }
    695 
    696         /* Wait until the buffer is nearly clear before exiting the program,
     869                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     870                                           FORMAT_DATA_OUT->dagstream,
     871                                           FORMAT_DATA_OUT->waiting );
     872        }
     873
     874        /* Wait until the buffer is nearly clear before exiting the program,
    697875         * as we will lose packets otherwise */
    698         dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    699                         FORMAT_DATA_OUT->dagstream,
    700                         dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
    701                                         FORMAT_DATA_OUT->dagstream) - 8
    702                         );
     876        dag_tx_get_stream_space
     877                (FORMAT_DATA_OUT->device->fd,
     878                 FORMAT_DATA_OUT->dagstream,
     879                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
     880                                            FORMAT_DATA_OUT->dagstream) - 8);
    703881
    704882        /* Need the lock, since we're going to be handling the device list */
     
    713891        if (FORMAT_DATA_OUT->device->ref_count == 0)
    714892                dag_close_device(FORMAT_DATA_OUT->device);
    715         if (FORMAT_DATA_OUT->device_name)
    716                 free(FORMAT_DATA_OUT->device_name);
    717893        free(libtrace->format_data);
    718894        pthread_mutex_unlock(&open_dag_mutex);
     
    750926
    751927        /* Allocate memory for the DUCK data */
    752         if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
    753                         !packet->buffer) {
    754                 packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
    755                 packet->buf_control = TRACE_CTRL_PACKET;
    756                 if (!packet->buffer) {
    757                         trace_set_err(libtrace, errno,
    758                                         "Cannot allocate packet buffer");
    759                         return -1;
    760                 }
    761         }
     928        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
     929            !packet->buffer) {
     930                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
     931                packet->buf_control = TRACE_CTRL_PACKET;
     932                if (!packet->buffer) {
     933                        trace_set_err(libtrace, errno,
     934                                      "Cannot allocate packet buffer");
     935                        return -1;
     936                }
     937        }
    762938
    763939        /* DUCK doesn't have a format header */
    764         packet->header = 0;
    765         packet->payload = packet->buffer;
    766 
    767         /* No need to check if we can get DUCK or not - we're modern
    768         * enough so just grab the DUCK info */
    769         if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
    770                                         (duckinf_t *)packet->payload) < 0)) {
    771                 trace_set_err(libtrace, errno, "Error using DUCK ioctl");
     940        packet->header = 0;
     941        packet->payload = packet->buffer;
     942
     943        /* No need to check if we can get DUCK or not - we're modern
     944        * enough so just grab the DUCK info */
     945        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
     946                   (duckinf_t *)packet->payload) < 0)) {
     947                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
    772948                DUCK.duck_freq = 0;
    773                 return -1;
    774         }
    775 
    776         packet->type = LIBTRACE_DUCK_VERSION;
     949                return -1;
     950        }
     951
     952        packet->type = LIBTRACE_DUCK_VERSION;
    777953
    778954        /* Set the packet's trace to point at a DUCK trace, so that the
    779955         * DUCK format functions will be called on the packet rather than the
    780956         * DAG ones */
    781         if (!DUCK.dummy_duck)
    782                 DUCK.dummy_duck = trace_create_dead("duck:dummy");
    783         packet->trace = DUCK.dummy_duck;
    784         DUCK.last_duck = DUCK.last_pkt;
    785         return sizeof(duckinf_t);
     957        if (!DUCK.dummy_duck)
     958                DUCK.dummy_duck = trace_create_dead("duck:dummy");
     959        packet->trace = DUCK.dummy_duck;
     960        DUCK.last_duck = DUCK.last_pkt;
     961        packet->error = sizeof(duckinf_t);
     962        return sizeof(duckinf_t);
    786963}
    787964
    788965/* Determines the amount of data available to read from the DAG card */
    789 static int dag_available(libtrace_t *libtrace) {
    790         uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     966static int dag_available(libtrace_t *libtrace,
     967                         struct dag_per_stream_t *stream_data)
     968{
     969        uint32_t diff = stream_data->top - stream_data->bottom;
    791970
    792971        /* If we've processed more than 4MB of data since we last called
    793972         * dag_advance_stream, then we should call it again to allow the
    794973         * space occupied by that 4MB to be released */
    795         if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
     974        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
    796975                return diff;
    797        
     976
    798977        /* Update the top and bottom pointers */
    799         FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
    800                         FORMAT_DATA->dagstream,
    801                         &(FORMAT_DATA->bottom));
    802        
    803         if (FORMAT_DATA->top == NULL) {
     978        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
     979                                              stream_data->dagstream,
     980                                              &(stream_data->bottom));
     981
     982        if (stream_data->top == NULL) {
    804983                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
    805984                return -1;
    806985        }
    807         FORMAT_DATA->processed = 0;
    808         diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     986        stream_data->processed = 0;
     987        diff = stream_data->top - stream_data->bottom;
    809988        return diff;
    810989}
    811990
    812991/* Returns a pointer to the start of the next complete ERF record */
    813 static dag_record_t *dag_get_record(libtrace_t *libtrace) {
    814         dag_record_t *erfptr = NULL;
    815         uint16_t size;
    816         erfptr = (dag_record_t *)FORMAT_DATA->bottom;
     992static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
     993{
     994        dag_record_t *erfptr = NULL;
     995        uint16_t size;
     996
     997        erfptr = (dag_record_t *)stream_data->bottom;
    817998        if (!erfptr)
    818                 return NULL;
    819         size = ntohs(erfptr->rlen);
    820         assert( size >= dag_record_size );
     999                return NULL;
     1000
     1001        size = ntohs(erfptr->rlen);
     1002        assert( size >= dag_record_size );
     1003
    8211004        /* Make certain we have the full packet available */
    822         if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
     1005        if (size > (stream_data->top - stream_data->bottom))
    8231006                return NULL;
    824         FORMAT_DATA->bottom += size;
    825         FORMAT_DATA->processed += size;
     1007
     1008        stream_data->bottom += size;
     1009        stream_data->processed += size;
    8261010        return erfptr;
    8271011}
     
    8291013/* Converts a buffer containing a recently read DAG packet record into a
    8301014 * libtrace packet */
    831 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
    832                 void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
    833 
     1015static int dag_prepare_packet_stream(libtrace_t *libtrace,
     1016                                     struct dag_per_stream_t *stream_data,
     1017                                     libtrace_packet_t *packet,
     1018                                     void *buffer, libtrace_rt_types_t rt_type,
     1019                                     uint32_t flags)
     1020{
    8341021        dag_record_t *erfptr;
    835        
     1022
    8361023        /* If the packet previously owned a buffer that is not the buffer
    837         * that contains the new packet data, we're going to need to free the
    838         * old one to avoid memory leaks */
     1024        * that contains the new packet data, we're going to need to free the
     1025        * old one to avoid memory leaks */
    8391026        if (packet->buffer != buffer &&
    840                         packet->buf_control == TRACE_CTRL_PACKET) {
     1027            packet->buf_control == TRACE_CTRL_PACKET) {
    8411028                free(packet->buffer);
    8421029        }
     
    8511038        erfptr = (dag_record_t *)buffer;
    8521039        packet->buffer = erfptr;
    853         packet->header = erfptr;
    854         packet->type = rt_type;
     1040        packet->header = erfptr;
     1041        packet->type = rt_type;
    8551042
    8561043        if (erfptr->flags.rxerror == 1) {
    857                 /* rxerror means the payload is corrupt - drop the payload
    858                 * by tweaking rlen */
    859                 packet->payload = NULL;
    860                 erfptr->rlen = htons(erf_get_framing_length(packet));
    861         } else {
    862                 packet->payload = (char*)packet->buffer
    863                         + erf_get_framing_length(packet);
    864         }
     1044                /* rxerror means the payload is corrupt - drop the payload
     1045                * by tweaking rlen */
     1046                packet->payload = NULL;
     1047                erfptr->rlen = htons(erf_get_framing_length(packet));
     1048        } else {
     1049                packet->payload = (char*)packet->buffer
     1050                        + erf_get_framing_length(packet);
     1051        }
    8651052
    8661053        if (libtrace->format_data == NULL) {
     
    8691056
    8701057        /* Update the dropped packets counter */
    871 
    872         /* No loss counter for DSM coloured records - have to use
    873          * some other API */
     1058        /* No loss counter for DSM coloured records - have to use some
     1059         * other API */
    8741060        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    8751061                /* TODO */
    8761062        } else {
    8771063                /* Use the ERF loss counter */
    878                 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
    879                         FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
     1064                if (stream_data->seeninterface[erfptr->flags.iface]
     1065                    == 0) {
     1066                        stream_data->seeninterface[erfptr->flags.iface]
     1067                                = 1;
    8801068                } else {
    881                         FORMAT_DATA->drops += ntohs(erfptr->lctr);
     1069                        stream_data->drops += ntohs(erfptr->lctr);
    8821070                }
    8831071        }
    8841072
    8851073        return 0;
     1074}
     1075
     1076static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
     1077                              void *buffer, libtrace_rt_types_t rt_type,
     1078                              uint32_t flags)
     1079{
     1080        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
     1081                                       buffer, rt_type, flags);
    8861082}
    8871083
     
    8961092/* Pushes an ERF record onto the transmit stream */
    8971093static int dag_dump_packet(libtrace_out_t *libtrace,
    898                 dag_record_t *erfptr, unsigned int pad, void *buffer) {
     1094                           dag_record_t *erfptr, unsigned int pad,
     1095                           void *buffer)
     1096{
    8991097        int size;
    9001098
    9011099        /*
    902          * If we've got 0 bytes waiting in the txqueue, assume that we haven't
    903          * requested any space yet, and request some, storing the pointer at
    904          * FORMAT_DATA_OUT->txbuffer.
     1100         * If we've got 0 bytes waiting in the txqueue, assume that we
     1101         * haven't requested any space yet, and request some, storing
     1102         * the pointer at FORMAT_DATA_OUT->txbuffer.
    9051103         *
    9061104         * The amount to request is slightly magical at the moment - it's
     
    9091107         */
    9101108        if (FORMAT_DATA_OUT->waiting == 0) {
    911                 FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    912                                 FORMAT_DATA_OUT->dagstream, 16908288);
     1109                FORMAT_DATA_OUT->txbuffer =
     1110                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
     1111                                                FORMAT_DATA_OUT->dagstream,
     1112                                                16908288);
    9131113        }
    9141114
     
    9171117         * are in contiguous memory
    9181118         */
    919         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
     1119        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
     1120               (dag_record_size + pad));
    9201121        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
    921 
    922 
    9231122
    9241123        /*
     
    9271126         */
    9281127        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
    929         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
     1128        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
     1129               size);
    9301130        FORMAT_DATA_OUT->waiting += size;
    9311131
     
    9361136         * case there is still data in the buffer at program exit.
    9371137         */
    938 
    9391138        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
    940                 FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    941                         FORMAT_DATA_OUT->waiting );
     1139                FORMAT_DATA_OUT->txbuffer =
     1140                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     1141                                                   FORMAT_DATA_OUT->dagstream,
     1142                                                   FORMAT_DATA_OUT->waiting);
    9421143                FORMAT_DATA_OUT->waiting = 0;
    9431144        }
    9441145
    9451146        return size + pad + dag_record_size;
    946 
    9471147}
    9481148
     
    9501150 * if one is found, false otherwise */
    9511151static bool find_compatible_linktype(libtrace_out_t *libtrace,
    952                                 libtrace_packet_t *packet, char *type)
    953 {
    954          // Keep trying to simplify the packet until we can find
    955          //something we can do with it
     1152                                     libtrace_packet_t *packet, char *type)
     1153{
     1154        /* Keep trying to simplify the packet until we can find
     1155         * something we can do with it */
    9561156
    9571157        do {
    958                 *type=libtrace_to_erf_type(trace_get_link_type(packet));
    959 
    960                 // Success
     1158                *type = libtrace_to_erf_type(trace_get_link_type(packet));
     1159
     1160                /* Success */
    9611161                if (*type != (char)-1)
    9621162                        return true;
     
    9641164                if (!demote_packet(packet)) {
    9651165                        trace_set_err_out(libtrace,
    966                                         TRACE_ERR_NO_CONVERSION,
    967                                         "No erf type for packet (%i)",
    968                                         trace_get_link_type(packet));
     1166                                          TRACE_ERR_NO_CONVERSION,
     1167                                          "No erf type for packet (%i)",
     1168                                          trace_get_link_type(packet));
    9691169                        return false;
    9701170                }
     
    9761176
    9771177/* Writes a packet to the provided DAG output trace */
    978 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    979         /*
    980          * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
    981          * sucks, sorry about that.
     1178static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
     1179{
     1180        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
     1181         * coding sucks, sorry about that.
    9821182         */
    9831183        unsigned int pad = 0;
     
    9881188
    9891189        if(!packet->header) {
    990                 /* No header, probably an RT packet. Lifted from 
     1190                /* No header, probably an RT packet. Lifted from
    9911191                 * erf_write_packet(). */
    9921192                return -1;
     
    10071207
    10081208        if (packet->type == TRACE_RT_DATA_ERF) {
    1009                 numbytes = dag_dump_packet(libtrace,
    1010                                 header,
    1011                                 pad,
    1012                                 payload
    1013                                 );
    1014 
     1209                numbytes = dag_dump_packet(libtrace, header, pad, payload);
    10151210        } else {
    10161211                /* Build up a new packet header from the existing header */
    10171212
    1018                 /* Simplify the packet first - if we can't do this, break 
     1213                /* Simplify the packet first - if we can't do this, break
    10191214                 * early */
    10201215                if (!find_compatible_linktype(libtrace,packet,&erf_type))
     
    10351230
    10361231                /* Packet length (rlen includes format overhead) */
    1037                 assert(trace_get_capture_length(packet)>0
    1038                                 && trace_get_capture_length(packet)<=65536);
    1039                 assert(erf_get_framing_length(packet)>0
    1040                                 && trace_get_framing_length(packet)<=65536);
    1041                 assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
    1042                       &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
     1232                assert(trace_get_capture_length(packet) > 0
     1233                       && trace_get_capture_length(packet) <= 65536);
     1234                assert(erf_get_framing_length(packet) > 0
     1235                       && trace_get_framing_length(packet) <= 65536);
     1236                assert(trace_get_capture_length(packet) +
     1237                       erf_get_framing_length(packet) > 0
     1238                       && trace_get_capture_length(packet) +
     1239                       erf_get_framing_length(packet) <= 65536);
    10431240
    10441241                erfhdr.rlen = htons(trace_get_capture_length(packet)
    1045                         + erf_get_framing_length(packet));
     1242                                    + erf_get_framing_length(packet));
    10461243
    10471244
     
    10521249
    10531250                /* Write it out */
    1054                 numbytes = dag_dump_packet(libtrace,
    1055                                 &erfhdr,
    1056                                 pad,
    1057                                 payload);
    1058 
     1251                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
    10591252        }
    10601253
     
    10661259 * If DUCK reporting is enabled, the packet returned may be a DUCK update
    10671260 */
    1068 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1069         int size = 0;
    1070         struct timeval tv;
    1071         dag_record_t *erfptr = NULL;
     1261static int dag_read_packet_stream(libtrace_t *libtrace,
     1262                                struct dag_per_stream_t *stream_data,
     1263                                libtrace_thread_t *t, /* Optional */
     1264                                libtrace_packet_t *packet)
     1265{
     1266        int size = 0;
     1267        dag_record_t *erfptr = NULL;
     1268        struct timeval tv;
    10721269        int numbytes = 0;
    10731270        uint32_t flags = 0;
    1074         struct timeval maxwait;
    1075         struct timeval pollwait;
     1271        struct timeval maxwait, pollwait;
    10761272
    10771273        pollwait.tv_sec = 0;
     
    10801276        maxwait.tv_usec = 250000;
    10811277
    1082         /* Check if we're due for a DUCK report */
    1083         size = dag_get_duckinfo(libtrace, packet);
    1084 
    1085         if (size != 0)
    1086                 return size;
     1278        /* Check if we're due for a DUCK report - only report on the first thread */
     1279        if (stream_data == FORMAT_DATA_FIRST) {
     1280                size = dag_get_duckinfo(libtrace, packet);
     1281                if (size != 0)
     1282                        return size;
     1283        }
    10871284
    10881285
     
    10921289        /* If the packet buffer is currently owned by libtrace, free it so
    10931290         * that we can set the packet to point into the DAG memory hole */
    1094         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1095                 free(packet->buffer);
    1096                 packet->buffer = 0;
    1097         }
    1098        
    1099         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1100                         FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait,
    1101                         &pollwait) == -1)
    1102         {
     1291        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1292                free(packet->buffer);
     1293                packet->buffer = 0;
     1294        }
     1295
     1296        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
     1297                                sizeof(dag_record_t), &maxwait,
     1298                                &pollwait) == -1) {
    11031299                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11041300                return -1;
    11051301        }
    1106 
    11071302
    11081303        /* Grab a full ERF record */
    11091304        do {
    1110                 numbytes = dag_available(libtrace);
     1305                numbytes = dag_available(libtrace, stream_data);
    11111306                if (numbytes < 0)
    11121307                        return numbytes;
    11131308                if (numbytes < dag_record_size) {
     1309                        /* Check the message queue if we have one to check */
     1310                        if (t != NULL &&
     1311                            libtrace_message_queue_count(&t->messages) > 0)
     1312                                return -2;
     1313
    11141314                        if (libtrace_halt)
    11151315                                return 0;
     
    11171317                        continue;
    11181318                }
    1119                 erfptr = dag_get_record(libtrace);
     1319                erfptr = dag_get_record(stream_data);
    11201320        } while (erfptr == NULL);
    11211321
     1322        packet->trace = libtrace;
    11221323        /* Prepare the libtrace packet */
    1123         if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
    1124                                 flags))
    1125                 return -1;
    1126 
    1127         /* Update the DUCK timer */
    1128         tv = trace_get_timeval(packet);
    1129         DUCK.last_pkt = tv.tv_sec;
    1130 
    1131         return packet->payload ? htons(erfptr->rlen) :
    1132                                 erf_get_framing_length(packet);
     1324        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
     1325                                    TRACE_RT_DATA_ERF, flags))
     1326                return -1;
     1327
     1328        /* Update the DUCK timer - don't re-order this check (false-sharing) */
     1329        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
     1330                tv = trace_get_timeval(packet);
     1331                DUCK.last_pkt = tv.tv_sec;
     1332        }
     1333
     1334        packet->error = packet->payload ? htons(erfptr->rlen) :
     1335                                          erf_get_framing_length(packet);
     1336
     1337        return packet->error;
     1338}
     1339
     1340static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1341{
     1342        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
     1343}
     1344
     1345static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
     1346                             libtrace_packet_t **packets, size_t nb_packets)
     1347{
     1348        int ret;
     1349        size_t read_packets = 0;
     1350        int numbytes = 0;
     1351
     1352        struct dag_per_stream_t *stream_data =
     1353                (struct dag_per_stream_t *)t->format_data;
     1354
     1355        /* Read as many packets as we can, but read atleast one packet */
     1356        do {
     1357                ret = dag_read_packet_stream(libtrace, stream_data, t,
     1358                                           packets[read_packets]);
     1359                if (ret < 0)
     1360                        return ret;
     1361
     1362                read_packets++;
     1363
     1364                /* Make sure we don't read too many packets..! */
     1365                if (read_packets >= nb_packets)
     1366                        break;
     1367
     1368                numbytes = dag_available(libtrace, stream_data);
     1369        } while (numbytes >= dag_record_size);
     1370
     1371        return read_packets;
    11331372}
    11341373
     
    11381377 */
    11391378static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
    1140                                         libtrace_packet_t *packet) {
    1141         libtrace_eventobj_t event = {0,0,0.0,0};
     1379                                           libtrace_packet_t *packet)
     1380{
     1381        libtrace_eventobj_t event = {0,0,0.0,0};
    11421382        dag_record_t *erfptr = NULL;
    11431383        int numbytes;
     
    11581398        }
    11591399       
    1160         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1161                         FORMAT_DATA->dagstream, 0, &minwait,
    1162                         &minwait) == -1)
    1163         {
     1400        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
     1401                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
     1402                                &minwait) == -1) {
    11641403                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11651404                event.type = TRACE_EVENT_TERMINATE;
     
    11701409                erfptr = NULL;
    11711410                numbytes = 0;
    1172        
     1411
    11731412                /* Need to call dag_available so that the top pointer will get
    11741413                 * updated, otherwise we'll never see any data! */
    1175                 numbytes = dag_available(libtrace);
    1176 
    1177                 /* May as well not bother calling dag_get_record if 
     1414                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
     1415
     1416                /* May as well not bother calling dag_get_record if
    11781417                 * dag_available suggests that there's no data */
    11791418                if (numbytes != 0)
    1180                         erfptr = dag_get_record(libtrace);
     1419                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
    11811420                if (erfptr == NULL) {
    11821421                        /* No packet available - sleep for a very short time */
    11831422                        if (libtrace_halt) {
    11841423                                event.type = TRACE_EVENT_TERMINATE;
    1185                         } else {                       
     1424                        } else {
    11861425                                event.type = TRACE_EVENT_SLEEP;
    11871426                                event.seconds = 0.0001;
     
    11891428                        break;
    11901429                }
    1191                 if (dag_prepare_packet(libtrace, packet, erfptr,
    1192                                         TRACE_RT_DATA_ERF, flags)) {
     1430                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
     1431                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
    11931432                        event.type = TRACE_EVENT_TERMINATE;
    11941433                        break;
     
    11961435
    11971436
    1198                 event.size = trace_get_capture_length(packet) + 
    1199                                 trace_get_framing_length(packet);
    1200                
     1437                event.size = trace_get_capture_length(packet) +
     1438                        trace_get_framing_length(packet);
     1439
    12011440                /* XXX trace_read_packet() normally applies the following
    12021441                 * config options for us, but this function is called via
     
    12041443
    12051444                if (libtrace->filter) {
    1206                         int filtret = trace_apply_filter(libtrace->filter, 
    1207                                         packet);
     1445                        int filtret = trace_apply_filter(libtrace->filter,
     1446                                                         packet);
    12081447                        if (filtret == -1) {
    12091448                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
    1210                                                 "Bad BPF Filter");
     1449                                              "Bad BPF Filter");
    12111450                                event.type = TRACE_EVENT_TERMINATE;
    12121451                                break;
     
    12191458                                 * a sleep event in this case, like we used to
    12201459                                 * do! */
    1221                                 libtrace->filtered_packets ++;
     1460                                libtrace->filtered_packets ++;
    12221461                                trace_clear_cache(packet);
    12231462                                continue;
    12241463                        }
    1225                                
     1464
    12261465                        event.type = TRACE_EVENT_PACKET;
    12271466                } else {
     
    12361475                        trace_set_capture_length(packet, libtrace->snaplen);
    12371476                }
    1238                 libtrace->accepted_packets ++;
     1477                libtrace->accepted_packets ++;
    12391478                break;
    1240         } while (1);
     1479        } while(1);
    12411480
    12421481        return event;
    12431482}
    12441483
    1245 /* Gets the number of dropped packets */
    1246 static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
    1247         if (trace->format_data == NULL)
    1248                 return (uint64_t)-1;
    1249         return DATA(trace)->drops;
     1484static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
     1485{
     1486        libtrace_list_node_t *tmp;
     1487        assert(stat && libtrace);
     1488        tmp = FORMAT_DATA_HEAD;
     1489
     1490        /* Dropped packets */
     1491        stat->dropped_valid = 1;
     1492        stat->dropped = 0;
     1493        while (tmp != NULL) {
     1494                stat->dropped += STREAM_DATA(tmp)->drops;
     1495                tmp = tmp->next;
     1496        }
     1497
     1498}
     1499
     1500static void dag_get_thread_statistics(libtrace_t *libtrace, libtrace_thread_t *t,
     1501                                       libtrace_stat_t *stat) {
     1502        struct dag_per_stream_t *stream_data = t->format_data;
     1503        assert(stat && libtrace);
     1504
     1505        stat->dropped_valid = 1;
     1506        stat->dropped = stream_data->drops;
     1507
     1508        stat->filtered_valid = 1;
     1509        stat->filtered = 0;
    12501510}
    12511511
    12521512/* Prints some semi-useful help text about the DAG format module */
    12531513static void dag_help(void) {
    1254         printf("dag format module: $Revision: 1755 $\n");
    1255         printf("Supported input URIs:\n");
    1256         printf("\tdag:/dev/dagn\n");
    1257         printf("\n");
    1258         printf("\te.g.: dag:/dev/dag0\n");
    1259         printf("\n");
    1260         printf("Supported output URIs:\n");
    1261         printf("\tnone\n");
    1262         printf("\n");
     1514        printf("dag format module: $Revision: 1755 $\n");
     1515        printf("Supported input URIs:\n");
     1516        printf("\tdag:/dev/dagn\n");
     1517        printf("\n");
     1518        printf("\te.g.: dag:/dev/dag0\n");
     1519        printf("\n");
     1520        printf("Supported output URIs:\n");
     1521        printf("\tnone\n");
     1522        printf("\n");
     1523}
     1524
     1525static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     1526                                bool reader)
     1527{
     1528        struct dag_per_stream_t *stream_data;
     1529        libtrace_list_node_t *node;
     1530
     1531        if (reader && t->type == THREAD_PERPKT) {
     1532                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
     1533                                                t->perpkt_num);
     1534                if (node == NULL) {
     1535                        return -1;
     1536                }
     1537                stream_data = node->data;
     1538
     1539                /* Pass the per thread data to the thread */
     1540                t->format_data = stream_data;
     1541
     1542                /* Attach and start the DAG stream */
     1543                if (dag_start_input_stream(libtrace, stream_data) < 0)
     1544                        return -1;
     1545        }
     1546
     1547        return 0;
    12631548}
    12641549
    12651550static struct libtrace_format_t dag = {
    1266         "dag",
    1267         "$Id$",
    1268         TRACE_FORMAT_ERF,
     1551        "dag",
     1552        "$Id$",
     1553        TRACE_FORMAT_ERF,
    12691554        dag_probe_filename,             /* probe filename */
    12701555        NULL,                           /* probe magic */
    1271         dag_init_input,                 /* init_input */
    1272         dag_config_input,               /* config_input */
    1273         dag_start_input,                /* start_input */
    1274         dag_pause_input,                /* pause_input */
     1556        dag_init_input,                 /* init_input */
     1557        dag_config_input,               /* config_input */
     1558        dag_start_input,                /* start_input */
     1559        dag_pause_input,                /* pause_input */
    12751560        dag_init_output,                /* init_output */
    1276         NULL,                           /* config_output */
     1561        NULL,                           /* config_output */
    12771562        dag_start_output,               /* start_output */
    1278         dag_fin_input,                  /* fin_input */
     1563        dag_fin_input,                  /* fin_input */
    12791564        dag_fin_output,                 /* fin_output */
    1280         dag_read_packet,                /* read_packet */
    1281         dag_prepare_packet,             /* prepare_packet */
     1565        dag_read_packet,                /* read_packet */
     1566        dag_prepare_packet,             /* prepare_packet */
    12821567        NULL,                           /* fin_packet */
    12831568        dag_write_packet,               /* write_packet */
    1284         erf_get_link_type,              /* get_link_type */
    1285         erf_get_direction,              /* get_direction */
    1286         erf_set_direction,              /* set_direction */
    1287         erf_get_erf_timestamp,          /* get_erf_timestamp */
    1288         NULL,                           /* get_timeval */
    1289         NULL,                           /* get_seconds */
     1569        erf_get_link_type,              /* get_link_type */
     1570        erf_get_direction,              /* get_direction */
     1571        erf_set_direction,              /* set_direction */
     1572        erf_get_erf_timestamp,          /* get_erf_timestamp */
     1573        NULL,                           /* get_timeval */
     1574        NULL,                           /* get_seconds */
    12901575        NULL,                           /* get_timespec */
    1291         NULL,                           /* seek_erf */
    1292         NULL,                           /* seek_timeval */
    1293         NULL,                           /* seek_seconds */
    1294         erf_get_capture_length,         /* get_capture_length */
    1295         erf_get_wire_length,            /* get_wire_length */
    1296         erf_get_framing_length,         /* get_framing_length */
    1297         erf_set_capture_length,         /* set_capture_length */
     1576        NULL,                           /* seek_erf */
     1577        NULL,                           /* seek_timeval */
     1578        NULL,                           /* seek_seconds */
     1579        erf_get_capture_length,         /* get_capture_length */
     1580        erf_get_wire_length,            /* get_wire_length */
     1581        erf_get_framing_length,         /* get_framing_length */
     1582        erf_set_capture_length,         /* set_capture_length */
    12981583        NULL,                           /* get_received_packets */
    12991584        NULL,                           /* get_filtered_packets */
    1300         dag_get_dropped_packets,        /* get_dropped_packets */
    1301         NULL,                           /* get_captured_packets */
    1302         NULL,                           /* get_fd */
    1303         trace_event_dag,                /* trace_event */
    1304         dag_help,                       /* help */
    1305         NULL                            /* next pointer */
     1585        NULL,                           /* get_dropped_packets */
     1586        dag_get_statistics,             /* get_statistics */
     1587        NULL,                           /* get_fd */
     1588        trace_event_dag,                /* trace_event */
     1589        dag_help,                       /* help */
     1590        NULL,                            /* next pointer */
     1591        {true, 0}, /* live packet capture, thread limit TBD */
     1592        dag_pstart_input,
     1593        dag_pread_packets,
     1594        dag_pause_input,
     1595        NULL,
     1596        dag_pregister_thread,
     1597        NULL,
     1598        dag_get_thread_statistics       /* get thread stats */
    13061599};
    13071600
    1308 void dag_constructor(void) {
     1601void dag_constructor(void)
     1602{
    13091603        register_format(&dag);
    13101604}
  • lib/format_dpdk.c

    rbb0a1f4 rbb0a1f4  
    33 * This file is part of libtrace
    44 *
    5  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
     5 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
    66 * New Zealand.
    77 *
    8  * Author: Richard Sanger 
    9  *         
     8 * Author: Richard Sanger
     9 *
    1010 * All rights reserved.
    1111 *
    12  * This code has been developed by the University of Waikato WAND 
     12 * This code has been developed by the University of Waikato WAND
    1313 * research group. For further information please see http://www.wand.net.nz/
    1414 *
     
    3636 * Intel Data Plane Development Kit is a LIVE capture format.
    3737 *
    38  * This format also supports writing which will write packets out to the 
    39  * network as a form of packet replay. This should not be confused with the 
    40  * RT protocol which is intended to transfer captured packet records between 
     38 * This format also supports writing which will write packets out to the
     39 * network as a form of packet replay. This should not be confused with the
     40 * RT protocol which is intended to transfer captured packet records between
    4141 * RT-speaking programs.
    4242 */
     43
     44#define _GNU_SOURCE
    4345
    4446#include "config.h"
     
    4749#include "format_helper.h"
    4850#include "libtrace_arphrd.h"
     51#include "hash_toeplitz.h"
    4952
    5053#ifdef HAVE_INTTYPES_H
     
    5962#include <endian.h>
    6063#include <string.h>
     64
     65#if HAVE_LIBNUMA
     66#include <numa.h>
     67#endif
    6168
    6269/* We can deal with any minor differences by checking the RTE VERSION
     
    162169#include <rte_mempool.h>
    163170#include <rte_mbuf.h>
    164 
    165 /* The default size of memory buffers to use - This is the max size of standard
     171#include <rte_launch.h>
     172#include <rte_lcore.h>
     173#include <rte_per_lcore.h>
     174#include <rte_cycles.h>
     175#include <pthread.h>
     176#ifdef __FreeBSD__
     177#include <pthread_np.h>
     178#endif
     179
     180
     181/* The default size of memory buffers to use - This is the max size of standard
    166182 * ethernet packet less the size of the MAC CHECKSUM */
    167183#define RX_MBUF_SIZE 1514
    168184
    169 /* The minimum number of memory buffers per queue tx or rx. Search for 
     185/* The minimum number of memory buffers per queue tx or rx. Search for
    170186 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
    171187 */
     
    185201#define NB_TX_MBUF 1024
    186202
    187 /* The size of the PCI blacklist needs to be big enough to contain 
     203/* The size of the PCI blacklist needs to be big enough to contain
    188204 * every PCI device address (listed by lspci every bus:device.function tuple).
    189205 */
     
    192208/* The maximum number of characters the mempool name can be */
    193209#define MEMPOOL_NAME_LEN 20
     210
     211/* For single threaded libtrace we read packets as a batch/burst
     212 * this is the maximum size of said burst */
     213#define BURST_SIZE 50
    194214
    195215#define MBUF(x) ((struct rte_mbuf *) x)
     
    197217#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    198218#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     219#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     220
     221#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
     222#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
     223
    199224#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    200                         (uint64_t) tv.tv_usec*1000ull)
     225                        (uint64_t) tv.tv_usec*1000ull)
    201226#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
    202                         (uint64_t) ts.tv_nsec)
     227                        (uint64_t) ts.tv_nsec)
    203228
    204229#if RTE_PKTMBUF_HEADROOM != 128
    205230#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
    206         "any libtrace instance processing these packet must be have the" \
    207         "same RTE_PKTMBUF_HEADROOM set"
     231        "any libtrace instance processing these packet must be have the" \
     232        "same RTE_PKTMBUF_HEADROOM set"
    208233#endif
    209234
    210235/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    211  * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 
    212  * 
     236 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
     237 *
    213238 * Make sure you understand what these are doing before enabling them.
    214239 * They might make traces incompatable with other builds etc.
    215  * 
     240 *
    216241 * These are also included to show how to do somethings which aren't
    217242 * obvious in the DPDK documentation.
    218243 */
    219244
    220 /* Print verbose messages to stdout */
     245/* Print verbose messages to stderr */
    221246#define DEBUG 0
    222247
    223 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 
    224  * only turn on if you know clock_gettime is a vsyscall on your system 
     248/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     249 * only turn on if you know clock_gettime is a vsyscall on your system
    225250 * overwise could be a large overhead. Again gettimeofday() should be
    226251 * vsyscall also if it's not you should seriously consider updating your
    227252 * kernel.
    228253 */
    229 #ifdef HAVE_LIBRT
     254#ifdef HAVE_CLOCK_GETTIME
    230255/* You can turn this on (set to 1) to prefer clock_gettime */
    231 #define USE_CLOCK_GETTIME 0
     256#define USE_CLOCK_GETTIME 1
    232257#else
    233 /* DONT CHANGE THIS !!! */
     258/* DON'T CHANGE THIS !!! */
    234259#define USE_CLOCK_GETTIME 0
    235260#endif
     
    240265 * hence writing out a port such as int: ring: and dpdk: assumes there
    241266 * is no checksum and will attempt to write the checksum as part of the
    242  * packet 
     267 * packet
    243268 */
    244269#define GET_MAC_CRC_CHECKSUM 0
    245270
    246271/* This requires a modification of the pmd drivers (inside Intel DPDK)
     272 * TODO this requires updating (packet sizes are wrong TS most likely also)
    247273 */
    248274#define HAS_HW_TIMESTAMPS_82580 0
     
    255281#endif
    256282
     283static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
     284/* Memory pools Per NUMA node */
     285static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
     286
    257287/* As per Intel 82580 specification - mismatch in 82580 datasheet
    258288 * it states ts is stored in Big Endian, however its actually Little */
    259289struct hw_timestamp_82580 {
    260     uint64_t reserved;
    261     uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
     290        uint64_t reserved;
     291        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
    262292};
    263293
    264294enum paused_state {
    265     DPDK_NEVER_STARTED,
    266     DPDK_RUNNING,
    267     DPDK_PAUSED,
     295        DPDK_NEVER_STARTED,
     296        DPDK_RUNNING,
     297        DPDK_PAUSED,
    268298};
     299
     300struct dpdk_per_stream_t
     301{
     302        uint16_t queue_id;
     303        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
     304        struct rte_mempool *mempool;
     305        int lcore;
     306#if HAS_HW_TIMESTAMPS_82580
     307        /* Timestamping only relevent to RX */
     308        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
     309        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
     310#endif
     311} ALIGN_STRUCT(CACHE_LINE_SIZE);
     312
     313#if HAS_HW_TIMESTAMPS_82580
     314#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
     315#else
     316#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
     317#endif
     318
     319typedef struct dpdk_per_stream_t dpdk_per_stream_t;
    269320
    270321/* Used by both input and output however some fields are not used
    271322 * for output */
    272323struct dpdk_format_data_t {
    273     int8_t promisc; /* promiscuous mode - RX only */
    274     uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
    275     uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
    276     uint8_t paused; /* See paused_state */
    277     uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
    278     int snaplen; /* The snap length for the capture - RX only */
    279     /* We always have to setup both rx and tx queues even if we don't want them */
    280     int nb_rx_buf; /* The number of packet buffers in the rx ring */
    281     int nb_tx_buf; /* The number of packet buffers in the tx ring */
    282     struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
     324        int8_t promisc; /* promiscuous mode - RX only */
     325        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
     326        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
     327        uint8_t paused; /* See paused_state */
     328        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
     329        int snaplen; /* The snap length for the capture - RX only */
     330        /* We always have to setup both rx and tx queues even if we don't want them */
     331        int nb_rx_buf; /* The number of packet buffers in the rx ring */
     332        int nb_tx_buf; /* The number of packet buffers in the tx ring */
     333        int nic_numa_node; /* The NUMA node that the NIC is attached to */
     334        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
    283335#if DPDK_USE_BLACKLIST
    284     struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
     336        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
    285337        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
    286338#endif
    287     char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    288 #if HAS_HW_TIMESTAMPS_82580
    289     /* Timestamping only relevent to RX */
    290     uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
    291     uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
    292     uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    293 #endif
     339        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
     340        uint8_t rss_key[40]; // This is the RSS KEY
     341        /* To improve single-threaded performance we always batch reading
     342         * packets, in a burst, otherwise the parallel library does this for us */
     343        struct rte_mbuf* burst_pkts[BURST_SIZE];
     344        int burst_size; /* The total number read in the burst */
     345        int burst_offset; /* The offset we are into the burst */
     346
     347        /* Our parallel streams */
     348        libtrace_list_t *per_stream;
    294349};
    295350
    296351enum dpdk_addt_hdr_flags {
    297     INCLUDES_CHECKSUM = 0x1,
    298     INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
     352        INCLUDES_CHECKSUM = 0x1,
     353        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
    299354};
    300355
    301 /** 
     356/**
    302357 * A structure placed in front of the packet where we can store
    303358 * additional information about the given packet.
     
    305360 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
    306361 * +--------------------------+
    307  * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
    308  * +--------------------------+
    309362 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
    310363 * +--------------------------+
    311  * |   sizeof(dpdk_addt_hdr)  | 1 byte
    312  * +--------------------------+ 
     364 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
     365 * +--------------------------+
    313366 * *   hw_timestamp_82580     * 16 bytes Optional
    314367 * +--------------------------+
     
    317370 */
    318371struct dpdk_addt_hdr {
    319     uint64_t timestamp;
    320     uint8_t flags;
    321     uint8_t direction;
    322     uint8_t reserved1;
    323     uint8_t reserved2;
    324     uint32_t cap_len; /* The size to say the capture is */
     372        uint64_t timestamp;
     373        uint8_t flags;
     374        uint8_t direction;
     375        uint8_t reserved1;
     376        uint8_t reserved2;
     377        uint32_t cap_len; /* The size to say the capture is */
    325378};
    326379
     
    328381 * We want to blacklist all devices except those on the whitelist
    329382 * (I say list, but yes it is only the one).
    330  * 
     383 *
    331384 * The default behaviour of rte_pci_probe() will map every possible device
    332385 * to its DPDK driver. The DPDK driver will take the ethernet device
    333386 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
    334  * 
    335  * So blacklist all devices except the one that we wish to use so that 
     387 *
     388 * So blacklist all devices except the one that we wish to use so that
    336389 * the others can still be used as standard ethernet ports.
    337390 *
     
    347400
    348401        TAILQ_FOREACH(dev, &device_list, next) {
    349         if (whitelist != NULL && whitelist->domain == dev->addr.domain
    350             && whitelist->bus == dev->addr.bus
    351             && whitelist->devid == dev->addr.devid
    352             && whitelist->function == dev->addr.function)
    353             continue;
     402        if (whitelist != NULL && whitelist->domain == dev->addr.domain
     403            && whitelist->bus == dev->addr.bus
     404            && whitelist->devid == dev->addr.devid
     405            && whitelist->function == dev->addr.function)
     406            continue;
    354407                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
    355                                 / sizeof (format_data->blacklist[0])) {
    356                         printf("Warning: too many devices to blacklist consider"
    357                                         " increasing BLACK_LIST_SIZE");
     408                                / sizeof (format_data->blacklist[0])) {
     409                        fprintf(stderr, "Warning: too many devices to blacklist consider"
     410                                        " increasing BLACK_LIST_SIZE");
    358411                        break;
    359412                }
     
    371424        char pci_str[20] = {0};
    372425        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
    373                 whitelist->domain,
    374                 whitelist->bus,
    375                 whitelist->devid,
    376                 whitelist->function);
     426                whitelist->domain,
     427                whitelist->bus,
     428                whitelist->devid,
     429                whitelist->function);
    377430        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
    378431                return -1;
     
    386439 * Fills in addr, note core is optional and is unchanged if
    387440 * a value for it is not provided.
    388  * 
     441 *
    389442 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
    390  * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 
     443 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
    391444 */
    392445static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
    393     int matches;
    394     assert(str);
    395     matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
    396                      &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
    397     if (matches >= 4) {
    398         return 0;
    399     } else {
    400         return -1;
    401     }
     446        int matches;
     447        assert(str);
     448        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
     449                         &addr->domain, &addr->bus, &addr->devid,
     450                         &addr->function, core);
     451        if (matches >= 4) {
     452                return 0;
     453        } else {
     454                return -1;
     455        }
     456}
     457
     458/**
     459 * Convert a pci address to the numa node it is
     460 * connected to.
     461 *
     462 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
     463 * so we can call it before DPDK
     464 *
     465 * @return -1 if unknown otherwise a number 0 or higher of the numa node
     466 */
     467static int pci_to_numa(struct rte_pci_addr * dev_addr) {
     468        char path[50] = {0};
     469        FILE *file;
     470
     471        /* Read from the system */
     472        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
     473                 dev_addr->domain,
     474                 dev_addr->bus,
     475                 dev_addr->devid,
     476                 dev_addr->function);
     477
     478        if((file = fopen(path, "r")) != NULL) {
     479                int numa_node = -1;
     480                fscanf(file, "%d", &numa_node);
     481                fclose(file);
     482                return numa_node;
     483        }
     484        return -1;
    402485}
    403486
     
    406489static inline void dump_configuration()
    407490{
    408     struct rte_config * global_config;
    409     long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    410 
    411     if (nb_cpu <= 0) {
    412         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    413         nb_cpu = 1; /* fallback to just 1 core */
    414     }
    415     if (nb_cpu > RTE_MAX_LCORE)
    416         nb_cpu = RTE_MAX_LCORE;
    417 
    418     global_config = rte_eal_get_configuration();
    419 
    420     if (global_config != NULL) {
    421         int i;
    422         fprintf(stderr, "Intel DPDK setup\n"
    423                "---Version      : %s\n"
    424                "---Master LCore : %"PRIu32"\n"
    425                "---LCore Count  : %"PRIu32"\n",
    426                rte_version(),
    427                global_config->master_lcore, global_config->lcore_count);
    428 
    429         for (i = 0 ; i < nb_cpu; i++) {
    430             fprintf(stderr, "   ---Core %d : %s\n", i,
    431                    global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
    432         }
    433 
    434         const char * proc_type;
    435         switch (global_config->process_type) {
    436             case RTE_PROC_AUTO:
    437                 proc_type = "auto";
    438                 break;
    439             case RTE_PROC_PRIMARY:
    440                 proc_type = "primary";
    441                 break;
    442             case RTE_PROC_SECONDARY:
    443                 proc_type = "secondary";
    444                 break;
    445             case RTE_PROC_INVALID:
    446                 proc_type = "invalid";
    447                 break;
    448             default:
    449                 proc_type = "something worse than invalid!!";
    450         }
    451         fprintf(stderr, "---Process Type : %s\n", proc_type);
    452     }
    453 
    454 }
    455 #endif
     491        struct rte_config * global_config;
     492        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     493
     494        if (nb_cpu <= 0) {
     495                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
     496                       " Falling back to the first core.");
     497                nb_cpu = 1; /* fallback to just 1 core */
     498        }
     499        if (nb_cpu > RTE_MAX_LCORE)
     500                nb_cpu = RTE_MAX_LCORE;
     501
     502        global_config = rte_eal_get_configuration();
     503
     504        if (global_config != NULL) {
     505                int i;
     506                fprintf(stderr, "Intel DPDK setup\n"
     507                        "---Version      : %s\n"
     508                        "---Master LCore : %"PRIu32"\n"
     509                        "---LCore Count  : %"PRIu32"\n",
     510                        rte_version(),
     511                        global_config->master_lcore, global_config->lcore_count);
     512
     513                for (i = 0 ; i < nb_cpu; i++) {
     514                        fprintf(stderr, "   ---Core %d : %s\n", i,
     515                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
     516                }
     517
     518                const char * proc_type;
     519                switch (global_config->process_type) {
     520                case RTE_PROC_AUTO:
     521                        proc_type = "auto";
     522                        break;
     523                case RTE_PROC_PRIMARY:
     524                        proc_type = "primary";
     525                        break;
     526                case RTE_PROC_SECONDARY:
     527                        proc_type = "secondary";
     528                        break;
     529                case RTE_PROC_INVALID:
     530                        proc_type = "invalid";
     531                        break;
     532                default:
     533                        proc_type = "something worse than invalid!!";
     534                }
     535                fprintf(stderr, "---Process Type : %s\n", proc_type);
     536        }
     537
     538}
     539#endif
     540
     541/**
     542 * Expects to be called from the master lcore and moves it to the given dpdk id
     543 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     544 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     545 *               and not already in use.
     546 * @return 0 is successful otherwise -1 on error.
     547 */
     548static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
     549        struct rte_config *cfg = rte_eal_get_configuration();
     550        cpu_set_t cpuset;
     551        int i;
     552
     553        assert (core < RTE_MAX_LCORE);
     554        assert (rte_get_master_lcore() == rte_lcore_id());
     555
     556        if (core == rte_lcore_id())
     557                return 0;
     558
     559        /* Make sure we are not overwriting someone else */
     560        assert(!rte_lcore_is_enabled(core));
     561
     562        /* Move the core */
     563        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     564        cfg->lcore_role[core] = ROLE_RTE;
     565        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     566        rte_eal_get_configuration()->master_lcore = core;
     567        RTE_PER_LCORE(_lcore_id) = core;
     568
     569        /* Now change the affinity, either mapped to a single core or all accepted */
     570        CPU_ZERO(&cpuset);
     571
     572        if (lcore_config[core].detected) {
     573                CPU_SET(core, &cpuset);
     574        } else {
     575                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     576                        if (lcore_config[i].detected)
     577                                CPU_SET(i, &cpuset);
     578                }
     579        }
     580
     581        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     582        if (i != 0) {
     583                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
     584                return -1;
     585        }
     586        return 0;
     587}
    456588
    457589/**
     
    485617static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
    486618                                        char * err, int errlen) {
    487     int ret; /* Returned error codes */
    488     struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
    489     char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
    490     char mem_map[20] = {0}; /* The memory name */
    491     long nb_cpu; /* The number of CPUs in the system */
    492     long my_cpu; /* The CPU number we want to bind to */
     619        int ret; /* Returned error codes */
     620        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
     621        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
     622        char mem_map[20] = {0}; /* The memory name */
     623        long nb_cpu; /* The number of CPUs in the system */
     624        long my_cpu; /* The CPU number we want to bind to */
     625        int i;
     626        struct rte_config *cfg = rte_eal_get_configuration();
    493627        struct saved_getopts save_opts;
    494    
    495 #if DEBUG
    496     rte_set_log_level(RTE_LOG_DEBUG);
    497 #else
    498     rte_set_log_level(RTE_LOG_WARNING);
    499 #endif
    500     /*
    501      * Using unique file prefixes mean separate memory is used, unlinking
    502      * the two processes. However be careful we still cannot access a
    503      * port that already in use.
    504      */
    505     char* argv[] = {"libtrace",
    506                     "-c", cpu_number,
    507                     "-n", "1",
    508                     "--proc-type", "auto",
    509                     "--file-prefix", mem_map,
    510                     "-m", "256",
     628
     629        /* This initialises the Environment Abstraction Layer (EAL)
     630         * If we had slave workers these are put into WAITING state
     631         *
     632         * Basically binds this thread to a fixed core, which we choose as
     633         * the last core on the machine (assuming fewer interrupts mapped here).
     634         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
     635         * "-n" the number of memory channels into the CPU (hardware specific)
     636         *      - Most likely to be half the number of ram slots in your machine.
     637         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
     638         * Controls where in memory packets are stored such that they are spread
     639         * across the channels. We just use 1 to be safe.
     640         *
     641         * Using unique file prefixes mean separate memory is used, unlinking
     642         * the two processes. However be careful we still cannot access a
     643         * port that already in use.
     644         */
     645        char* argv[] = {"libtrace",
     646                        "-c", cpu_number,
     647                        "-n", "1",
     648                        "--proc-type", "auto",
     649                        "--file-prefix", mem_map,
     650                        "-m", "512",
    511651#if DPDK_USE_LOG_LEVEL
    512652#       if DEBUG
    513                     "--log-level", "8", /* RTE_LOG_DEBUG */
     653                        "--log-level", "8", /* RTE_LOG_DEBUG */
    514654#       else
    515                     "--log-level", "5", /* RTE_LOG_WARNING */
     655                        "--log-level", "5", /* RTE_LOG_WARNING */
    516656#       endif
    517657#endif
    518                     NULL};
    519     int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    520 
    521     /* This initialises the Environment Abstraction Layer (EAL)
    522      * If we had slave workers these are put into WAITING state
    523      *
    524      * Basically binds this thread to a fixed core, which we choose as
    525      * the last core on the machine (assuming fewer interrupts mapped here).
    526      * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
    527      * "-n" the number of memory channels into the CPU (hardware specific)
    528      *      - Most likely to be half the number of ram slots in your machine.
    529      *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
    530      * Controls where in memory packets are stored and should spread across
    531      * the channels. We just use 1 to be safe.
    532      */
    533 
    534     /* Get the number of cpu cores in the system and use the last core */
    535     nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    536     if (nb_cpu <= 0) {
    537         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    538         nb_cpu = 1; /* fallback to the first core */
    539     }
    540     if (nb_cpu > RTE_MAX_LCORE)
    541         nb_cpu = RTE_MAX_LCORE;
    542 
    543     my_cpu = nb_cpu;
    544     /* This allows the user to specify the core - we would try to do this
    545      * automatically but it's hard to tell that this is secondary
    546      * before running rte_eal_init(...). Currently we are limited to 1
    547      * instance per core due to the way memory is allocated. */
    548     if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
    549         snprintf(err, errlen, "Failed to parse URI");
    550         return -1;
    551     }
    552 
    553     snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
    554                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
    555 
    556     if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
    557         snprintf(err, errlen,
    558           "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
    559           " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
    560         return -1;
    561     }
    562 
    563     /* Make our mask */
    564     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     658                        NULL};
     659        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
     660
     661#if DEBUG
     662        rte_set_log_level(RTE_LOG_DEBUG);
     663#else
     664        rte_set_log_level(RTE_LOG_WARNING);
     665#endif
     666
     667        /* Get the number of cpu cores in the system and use the last core
     668         * on the correct numa node */
     669        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     670        if (nb_cpu <= 0) {
     671                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
     672                       " Falling back to the first core.");
     673                nb_cpu = 1; /* fallback to the first core */
     674        }
     675        if (nb_cpu > RTE_MAX_LCORE)
     676                nb_cpu = RTE_MAX_LCORE;
     677
     678        my_cpu = -1;
     679        /* This allows the user to specify the core - we would try to do this
     680         * automatically but it's hard to tell that this is secondary
     681         * before running rte_eal_init(...). Currently we are limited to 1
     682         * instance per core due to the way memory is allocated. */
     683        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
     684                snprintf(err, errlen, "Failed to parse URI");
     685                return -1;
     686        }
     687
     688#if HAVE_LIBNUMA
     689        format_data->nic_numa_node = pci_to_numa(&use_addr);
     690        if (my_cpu < 0) {
     691#if DEBUG
     692                /* If we can assign to a core on the same numa node */
     693                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
     694#endif
     695                if(format_data->nic_numa_node >= 0) {
     696                        int max_node_cpu = -1;
     697                        struct bitmask *mask = numa_allocate_cpumask();
     698                        assert(mask);
     699                        numa_node_to_cpus(format_data->nic_numa_node, mask);
     700                        for (i = 0 ; i < nb_cpu; ++i) {
     701                                if (numa_bitmask_isbitset(mask,i))
     702                                        max_node_cpu = i+1;
     703                        }
     704                        my_cpu = max_node_cpu;
     705                }
     706        }
     707#endif
     708        if (my_cpu < 0) {
     709                my_cpu = nb_cpu;
     710        }
     711
     712
     713        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
     714                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
     715
     716        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
     717                snprintf(err, errlen,
     718                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
     719                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
     720                return -1;
     721        }
     722
     723        /* Make our mask with all cores turned on this is so that DPDK
     724         * gets all CPU info in older versions */
     725        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     726        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    565727
    566728#if !DPDK_USE_BLACKLIST
    567     /* Black list all ports besides the one that we want to use */
    568     if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
    569         snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    570                  " are you sure the address is correct?: %s", strerror(-ret));
    571         return -1;
    572     }
     729        /* Black list all ports besides the one that we want to use */
     730        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
     731                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     732                         " are you sure the address is correct?: %s", strerror(-ret));
     733                return -1;
     734        }
    573735#endif
    574736
    575737        /* Give the memory map a unique name */
    576738        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
    577     /* rte_eal_init it makes a call to getopt so we need to reset the
    578     * global optind variable of getopt otherwise this fails */
     739        /* rte_eal_init it makes a call to getopt so we need to reset the
     740        * global optind variable of getopt otherwise this fails */
    579741        save_getopts(&save_opts);
    580     optind = 1;
    581     if ((ret = rte_eal_init(argc, argv)) < 0) {
    582         snprintf(err, errlen,
    583           "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
    584         return -1;
    585     }
     742        optind = 1;
     743        if ((ret = rte_eal_init(argc, argv)) < 0) {
     744                snprintf(err, errlen,
     745                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
     746                return -1;
     747        }
    586748        restore_getopts(&save_opts);
     749        // These are still running but will never do anything with DPDK v1.7 we
     750        // should remove this XXX in the future
     751        for(i = 0; i < RTE_MAX_LCORE; ++i) {
     752                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
     753                        cfg->lcore_role[i] = ROLE_OFF;
     754                        cfg->lcore_count--;
     755                }
     756        }
     757        // Only the master should be running
     758        assert(cfg->lcore_count == 1);
     759
     760        // TODO XXX TODO
     761        dpdk_move_master_lcore(NULL, my_cpu-1);
    587762
    588763#if DEBUG
    589     dump_configuration();
     764        dump_configuration();
    590765#endif
    591766
    592767#if DPDK_USE_PMD_INIT
    593     /* This registers all available NICs with Intel DPDK
    594     * These are not loaded until rte_eal_pci_probe() is called.
    595     */
    596     if ((ret = rte_pmd_init_all()) < 0) {
    597         snprintf(err, errlen,
    598           "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
    599         return -1;
    600     }
     768        /* This registers all available NICs with Intel DPDK
     769        * These are not loaded until rte_eal_pci_probe() is called.
     770        */
     771        if ((ret = rte_pmd_init_all()) < 0) {
     772                snprintf(err, errlen,
     773                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
     774                return -1;
     775        }
    601776#endif
    602777
    603778#if DPDK_USE_BLACKLIST
    604     /* Blacklist all ports besides the one that we want to use */
     779        /* Blacklist all ports besides the one that we want to use */
    605780        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
    606781                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     
    611786
    612787#if DPDK_USE_PCI_PROBE
    613     /* This loads DPDK drivers against all ports that are not blacklisted */
     788        /* This loads DPDK drivers against all ports that are not blacklisted */
    614789        if ((ret = rte_eal_pci_probe()) < 0) {
    615         snprintf(err, errlen,
    616             "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
    617         return -1;
    618     }
    619 #endif
    620 
    621     format_data->nb_ports = rte_eth_dev_count();
    622 
    623     if (format_data->nb_ports != 1) {
    624         snprintf(err, errlen,
    625             "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
    626             format_data->nb_ports);
    627         return -1;
    628     }
    629 
    630     return 0;
     790                snprintf(err, errlen,
     791                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
     792                return -1;
     793        }
     794#endif
     795
     796        format_data->nb_ports = rte_eth_dev_count();
     797
     798        if (format_data->nb_ports != 1) {
     799                snprintf(err, errlen,
     800                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
     801                         format_data->nb_ports);
     802                return -1;
     803        }
     804
     805        return 0;
    631806}
    632807
    633808static int dpdk_init_input (libtrace_t *libtrace) {
    634     char err[500];
    635     err[0] = 0;
    636    
    637     libtrace->format_data = (struct dpdk_format_data_t *)
    638                             malloc(sizeof(struct dpdk_format_data_t));
    639     FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    640     FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
    641     FORMAT(libtrace)->nb_ports = 0;
    642     FORMAT(libtrace)->snaplen = 0; /* Use default */
    643     FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
    644     FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
    645     FORMAT(libtrace)->promisc = -1;
    646     FORMAT(libtrace)->pktmbuf_pool = NULL;
     809        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
     810        char err[500];
     811        err[0] = 0;
     812
     813        libtrace->format_data = (struct dpdk_format_data_t *)
     814                                malloc(sizeof(struct dpdk_format_data_t));
     815        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
     816        FORMAT(libtrace)->nb_ports = 0;
     817        FORMAT(libtrace)->snaplen = 0; /* Use default */
     818        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
     819        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
     820        FORMAT(libtrace)->nic_numa_node = -1;
     821        FORMAT(libtrace)->promisc = -1;
     822        FORMAT(libtrace)->pktmbuf_pool = NULL;
    647823#if DPDK_USE_BLACKLIST
    648     FORMAT(libtrace)->nb_blacklist = 0;
    649 #endif
    650     FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    651     FORMAT(libtrace)->mempool_name[0] = 0;
    652 #if HAS_HW_TIMESTAMPS_82580
    653     FORMAT(libtrace)->ts_first_sys = 0;
    654     FORMAT(libtrace)->ts_last_sys = 0;
    655     FORMAT(libtrace)->wrap_count = 0;
    656 #endif
    657 
    658     if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    659         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    660         free(libtrace->format_data);
    661         libtrace->format_data = NULL;
    662         return -1;
    663     }
    664     return 0;
    665 };
     824        FORMAT(libtrace)->nb_blacklist = 0;
     825#endif
     826        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
     827        FORMAT(libtrace)->mempool_name[0] = 0;
     828        memset(FORMAT(libtrace)->burst_pkts, 0,
     829               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     830        FORMAT(libtrace)->burst_size = 0;
     831        FORMAT(libtrace)->burst_offset = 0;
     832
     833        /* Make our first stream */
     834        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
     835        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
     836
     837        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     838                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     839                free(libtrace->format_data);
     840                libtrace->format_data = NULL;
     841                return -1;
     842        }
     843        return 0;
     844}
    666845
    667846static int dpdk_init_output(libtrace_out_t *libtrace)
    668847{
    669     char err[500];
    670     err[0] = 0;
    671    
    672     libtrace->format_data = (struct dpdk_format_data_t *)
    673                             malloc(sizeof(struct dpdk_format_data_t));
    674     FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    675     FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
    676     FORMAT(libtrace)->nb_ports = 0;
    677     FORMAT(libtrace)->snaplen = 0; /* Use default */
    678     FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
    679     FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
    680     FORMAT(libtrace)->promisc = -1;
    681     FORMAT(libtrace)->pktmbuf_pool = NULL;
     848        char err[500];
     849        err[0] = 0;
     850
     851        libtrace->format_data = (struct dpdk_format_data_t *)
     852                                malloc(sizeof(struct dpdk_format_data_t));
     853        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
     854        FORMAT(libtrace)->nb_ports = 0;
     855        FORMAT(libtrace)->snaplen = 0; /* Use default */
     856        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
     857        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
     858        FORMAT(libtrace)->nic_numa_node = -1;
     859        FORMAT(libtrace)->promisc = -1;
     860        FORMAT(libtrace)->pktmbuf_pool = NULL;
    682861#if DPDK_USE_BLACKLIST
    683     FORMAT(libtrace)->nb_blacklist = 0;
    684 #endif
    685     FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    686     FORMAT(libtrace)->mempool_name[0] = 0;
    687 #if HAS_HW_TIMESTAMPS_82580
    688     FORMAT(libtrace)->ts_first_sys = 0;
    689     FORMAT(libtrace)->ts_last_sys = 0;
    690     FORMAT(libtrace)->wrap_count = 0;
    691 #endif
    692 
    693     if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    694         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    695         free(libtrace->format_data);
    696         libtrace->format_data = NULL;
    697         return -1;
    698     }
    699     return 0;
    700 };
     862        FORMAT(libtrace)->nb_blacklist = 0;
     863#endif
     864        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
     865        FORMAT(libtrace)->mempool_name[0] = 0;
     866        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     867        FORMAT(libtrace)->burst_size = 0;
     868        FORMAT(libtrace)->burst_offset = 0;
     869
     870        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     871                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     872                free(libtrace->format_data);
     873                libtrace->format_data = NULL;
     874                return -1;
     875        }
     876        return 0;
     877}
    701878
    702879/**
    703880 * Note here snaplen excludes the MAC checksum. Packets over
    704881 * the requested snaplen will be dropped. (Excluding MAC checksum)
    705  * 
     882 *
    706883 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
    707884 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
    708885 * is set the maximum size of the returned packet would be 1518 otherwise
    709886 * 1514 would be the largest size possibly returned.
    710  * 
     887 *
    711888 */
    712889static int dpdk_config_input (libtrace_t *libtrace,
    713                                         trace_option_t option,
    714                                         void *data) {
    715     switch (option) {
    716         case TRACE_OPTION_SNAPLEN:
    717             /* Only support changing snaplen before a call to start is
    718              * made */
    719             if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
    720                 FORMAT(libtrace)->snaplen=*(int*)data;
    721             else
    722                 return -1;
    723             return 0;
    724                 case TRACE_OPTION_PROMISC:
    725                         FORMAT(libtrace)->promisc=*(int*)data;
    726             return 0;
    727         case TRACE_OPTION_FILTER:
    728             /* TODO filtering */
    729             break;
    730         case TRACE_OPTION_META_FREQ:
    731             break;
    732         case TRACE_OPTION_EVENT_REALTIME:
    733             break;
    734         /* Avoid default: so that future options will cause a warning
    735          * here to remind us to implement it, or flag it as
    736          * unimplementable
    737          */
    738     }
     890                              trace_option_t option,
     891                              void *data) {
     892        switch (option) {
     893        case TRACE_OPTION_SNAPLEN:
     894                /* Only support changing snaplen before a call to start is
     895                 * made */
     896                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
     897                        FORMAT(libtrace)->snaplen=*(int*)data;
     898                else
     899                        return -1;
     900                return 0;
     901        case TRACE_OPTION_PROMISC:
     902                FORMAT(libtrace)->promisc=*(int*)data;
     903                return 0;
     904        case TRACE_OPTION_HASHER:
     905                switch (*((enum hasher_types *) data))
     906                {
     907                case HASHER_BALANCE:
     908                case HASHER_UNIDIRECTIONAL:
     909                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     910                        return 0;
     911                case HASHER_BIDIRECTIONAL:
     912                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     913                        return 0;
     914                case HASHER_CUSTOM:
     915                        // We don't support these
     916                        return -1;
     917                }
     918                break;
     919        case TRACE_OPTION_FILTER:
     920                /* TODO filtering */
     921        case TRACE_OPTION_META_FREQ:
     922        case TRACE_OPTION_EVENT_REALTIME:
     923                break;
     924        /* Avoid default: so that future options will cause a warning
     925         * here to remind us to implement it, or flag it as
     926         * unimplementable
     927         */
     928        }
    739929
    740930        /* Don't set an error - trace_config will try to deal with the
    741931         * option and will set an error if it fails */
    742     return -1;
     932        return -1;
    743933}
    744934
    745935/* Can set jumbo frames/ or limit the size of a frame by setting both
    746936 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
    747  * 
     937 *
    748938 */
    749939static struct rte_eth_conf port_conf = {
    750940        .rxmode = {
     941                .mq_mode = ETH_RSS,
    751942                .split_hdr_size = 0,
    752943                .header_split   = 0, /**< Header Split disabled */
     
    754945                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
    755946                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
    756         .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
     947                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
    757948#if GET_MAC_CRC_CHECKSUM
    758949/* So it appears that if hw_strip_crc is turned off the driver will still
     
    767958 * always cut off the checksum in the future
    768959 */
    769         .hw_strip_crc   = 1, /**< CRC stripped by hardware */
     960                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
    770961#endif
    771962        },
     
    773964                .mq_mode = ETH_DCB_NONE,
    774965        },
     966        .rx_adv_conf = {
     967                .rss_conf = {
     968                        // .rss_key = &rss_key, // We set this per format
     969                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     970                },
     971        },
     972        .intr_conf = {
     973                .lsc = 1
     974        }
    775975};
    776976
     
    781981                .wthresh = 4,/* RX_WTHRESH writeback */
    782982        },
    783     .rx_free_thresh = 0,
    784     .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
     983        .rx_free_thresh = 0,
     984        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
    785985};
    786986
    787987static const struct rte_eth_txconf tx_conf = {
    788988        .tx_thresh = {
    789         /**
    790         * TX_PTHRESH prefetch
    791         * Set on the NIC, if the number of unprocessed descriptors to queued on
    792         * the card fall below this try grab at least hthresh more unprocessed
    793         * descriptors.
    794         */
     989                /*
     990                * TX_PTHRESH prefetch
     991                * Set on the NIC, if the number of unprocessed descriptors to queued on
     992                * the card fall below this try grab at least hthresh more unprocessed
     993                * descriptors.
     994                */
    795995                .pthresh = 36,
    796996
    797         /* TX_HTHRESH host
    798         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
    799         */
     997                /* TX_HTHRESH host
     998                * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
     999                */
    8001000                .hthresh = 0,
    801        
    802         /* TX_WTHRESH writeback
    803         * Set on the NIC, the number of sent descriptors before writing back
    804         * status to confirm the transmission. This is done more efficiently as
    805         * a bulk DMA-transfer rather than writing one at a time.
    806         * Similar to tx_free_thresh however this is applied to the NIC, where
    807         * as tx_free_thresh is when DPDK will check these. This is extended
    808         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
    809         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
    810         */
     1001
     1002                /* TX_WTHRESH writeback
     1003                * Set on the NIC, the number of sent descriptors before writing back
     1004                * status to confirm the transmission. This is done more efficiently as
     1005                * a bulk DMA-transfer rather than writing one at a time.
     1006                * Similar to tx_free_thresh however this is applied to the NIC, where
     1007                * as tx_free_thresh is when DPDK will check these. This is extended
     1008                * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
     1009                * descriptors rather only every n'th item, reducing DMA memory bandwidth.
     1010                */
    8111011                .wthresh = 4,
    8121012        },
    8131013
    814     /* Used internally by DPDK rather than passed to the NIC. The number of
    815     * packet descriptors to send before checking for any responses written
    816     * back (to confirm the transmission). Default = 32 if set to 0)
    817     */
     1014        /* Used internally by DPDK rather than passed to the NIC. The number of
     1015        * packet descriptors to send before checking for any responses written
     1016        * back (to confirm the transmission). Default = 32 if set to 0)
     1017        */
    8181018        .tx_free_thresh = 0,
    8191019
    820     /* This is the Report Status threshold, used by 10Gbit cards,
    821      * This signals the card to only write back status (such as
    822     * transmission successful) after this minimum number of transmit
    823     * descriptors are seen. The default is 32 (if set to 0) however if set
    824     * to greater than 1 TX wthresh must be set to zero, because this is kindof
    825     * a replacement. See the dpdk programmers guide for more restrictions.
    826     */
     1020        /* This is the Report Status threshold, used by 10Gbit cards,
     1021         * This signals the card to only write back status (such as
     1022        * transmission successful) after this minimum number of transmit
     1023        * descriptors are seen. The default is 32 (if set to 0) however if set
     1024        * to greater than 1 TX wthresh must be set to zero, because this is kindof
     1025        * a replacement. See the dpdk programmers guide for more restrictions.
     1026        */
    8271027        .tx_rs_thresh = 1,
    8281028};
    8291029
    830 /* Attach memory to the port and start the port or restart the port.
    831  */
    832 static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
    833     int ret; /* Check return values for errors */
    834     struct rte_eth_link link_info; /* Wait for link */
    835    
    836     /* Already started */
    837     if (format_data->paused == DPDK_RUNNING)
    838         return 0;
    839 
    840     /* First time started we need to alloc our memory, doing this here
    841      * rather than in environment setup because we don't have snaplen then */
    842     if (format_data->paused == DPDK_NEVER_STARTED) {
    843         if (format_data->snaplen == 0) {
    844             format_data->snaplen = RX_MBUF_SIZE;
    845             port_conf.rxmode.jumbo_frame = 0;
    846             port_conf.rxmode.max_rx_pkt_len = 0;
    847         } else {
    848             /* Use jumbo frames */
    849             port_conf.rxmode.jumbo_frame = 1;
    850             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    851         }
    852 
    853         /* This is additional overhead so make sure we allow space for this */
     1030/**
     1031 * A callback for a link state change (LSC).
     1032 *
     1033 * Packets may be received before this notification. In fact the DPDK IGXBE
     1034 * driver likes to put a delay upto 5sec before sending this.
     1035 *
     1036 * We use this to ensure the link speed is correct for our timestamp
     1037 * calculations. Because packets might be received before the link up we still
     1038 * update this when the packet is received.
     1039 *
     1040 * @param port The DPDK port
     1041 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
     1042 * @param cb_arg The dpdk_format_data_t structure associated with the format
     1043 */
     1044static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
     1045                              void *cb_arg) {
     1046        struct dpdk_format_data_t * format_data = cb_arg;
     1047        struct rte_eth_link link_info;
     1048        assert(event == RTE_ETH_EVENT_INTR_LSC);
     1049        assert(port == format_data->port);
     1050
     1051        rte_eth_link_get_nowait(port, &link_info);
     1052
     1053        if (link_info.link_status)
     1054                format_data->link_speed = link_info.link_speed;
     1055        else
     1056                format_data->link_speed = 0;
     1057
     1058#if DEBUG
     1059        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
     1060                link_info.link_status ? "up" : "down",
     1061                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
     1062                                          "full-duplex" : "half-duplex",
     1063                (int) link_info.link_speed);
     1064#endif
     1065
     1066        /* Turns out DPDK drivers might not come back up if the link speed
     1067         * changes. So we reset the autoneg procedure. This is very unsafe
     1068         * we have have threads reading packets and we stop the port. */
     1069#if 0
     1070        if (!link_info.link_status) {
     1071                int ret;
     1072                rte_eth_dev_stop(port);
     1073                ret = rte_eth_dev_start(port);
     1074                if (ret < 0) {
     1075                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
     1076                                strerror(-ret));
     1077                }
     1078        }
     1079#endif
     1080}
     1081
     1082/** Reserve a DPDK lcore ID for a thread globally.
     1083 *
     1084 * @param real If true allocate a real lcore, otherwise allocate a core which
     1085 * does not exist on the local machine.
     1086 * @param socket the prefered NUMA socket - only used if a real core is requested
     1087 * @return a valid core, which can later be used with dpdk_register_lcore() or a
     1088 * -1 if have run out of cores.
     1089 *
     1090 * If any thread is reading or freeing packets we need to register it here
     1091 * due to TLS caches in the memory pools.
     1092 */
     1093static int dpdk_reserve_lcore(bool real, int socket) {
     1094        int new_id = -1;
     1095        int i;
     1096        struct rte_config *cfg = rte_eal_get_configuration();
     1097
     1098        pthread_mutex_lock(&dpdk_lock);
     1099        /* If 'reading packets' fill in cores from 0 up and bind affinity
     1100         * otherwise start from the MAX core (which is also the master) and work backwards
     1101         * in this case physical cores on the system will not exist so we don't bind
     1102         * these to any particular physical core */
     1103        if (real) {
     1104#if HAVE_LIBNUMA
     1105                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1106                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
     1107                                new_id = i;
     1108                                if (!lcore_config[i].detected)
     1109                                        new_id = -1;
     1110                                break;
     1111                        }
     1112                }
     1113#endif
     1114                /* Retry without the the numa restriction */
     1115                if (new_id == -1) {
     1116                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1117                                if (!rte_lcore_is_enabled(i)) {
     1118                                        new_id = i;
     1119                                        if (!lcore_config[i].detected)
     1120                                                fprintf(stderr, "Warning the"
     1121                                                        " number of 'reading' "
     1122                                                        "threads exceed cores\n");
     1123                                        break;
     1124                                }
     1125                        }
     1126                }
     1127        } else {
     1128                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1129                        if (!rte_lcore_is_enabled(i)) {
     1130                                new_id = i;
     1131                                break;
     1132                        }
     1133                }
     1134        }
     1135
     1136        if (new_id != -1) {
     1137                /* Enable the core in global DPDK structs */
     1138                cfg->lcore_role[new_id] = ROLE_RTE;
     1139                cfg->lcore_count++;
     1140        }
     1141
     1142        pthread_mutex_unlock(&dpdk_lock);
     1143        return new_id;
     1144}
     1145
     1146/** Register a thread as a lcore
     1147 * @param libtrace any error is set against libtrace on exit
     1148 * @param real If this is a true lcore we will bind its affinty to the
     1149 * requested core.
     1150 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
     1151 * @return 0, if successful otherwise -1 if an error occured (details are stored
     1152 * in libtrace)
     1153 *
     1154 * @note This must be called from the thread being registered.
     1155 */
     1156static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
     1157        int ret;
     1158        RTE_PER_LCORE(_lcore_id) = lcore;
     1159
     1160        /* Set affinity bind to corresponding core */
     1161        if (real) {
     1162                cpu_set_t cpuset;
     1163                CPU_ZERO(&cpuset);
     1164                CPU_SET(rte_lcore_id(), &cpuset);
     1165                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1166                if (ret != 0) {
     1167                        trace_set_err(libtrace, errno, "Warning "
     1168                                      "pthread_setaffinity_np failed");
     1169                        return -1;
     1170                }
     1171        }
     1172
     1173        return 0;
     1174}
     1175
     1176/** Allocates a new dpdk packet buffer memory pool.
     1177 *
     1178 * @param n The number of threads
     1179 * @param pkt_size The packet size we need ot store
     1180 * @param socket_id The NUMA socket id
     1181 * @param A new mempool, if NULL query the DPDK library for the error code
     1182 * see rte_mempool_create() documentation.
     1183 *
     1184 * This allocates a new pool or recycles an existing memory pool.
     1185 * Call dpdk_free_memory() to free the memory.
     1186 * We cannot delete memory so instead we store the pools, allowing them to be
     1187 * re-used.
     1188 */
     1189static struct rte_mempool *dpdk_alloc_memory(unsigned n,
     1190                                             unsigned pkt_size,
     1191                                             int socket_id) {
     1192        struct rte_mempool *ret;
     1193        size_t j,k;
     1194        char name[MEMPOOL_NAME_LEN];
     1195
     1196        /* Add on packet size overheads */
     1197        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1198
     1199        pthread_mutex_lock(&dpdk_lock);
     1200
     1201        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
     1202                /* Best guess go for zero */
     1203                socket_id = 0;
     1204        }
     1205
     1206        /* Find a valid pool */
     1207        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
     1208                if (mem_pools[socket_id][j]->size >= n &&
     1209                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
     1210                        break;
     1211                }
     1212        }
     1213
     1214        /* Find the end (+1) of the list */
     1215        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
     1216
     1217        if (mem_pools[socket_id][j]) {
     1218                ret = mem_pools[socket_id][j];
     1219                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
     1220                mem_pools[socket_id][k-1] = NULL;
     1221                mem_pools[socket_id][j] = NULL;
     1222        } else {
     1223                static uint32_t test = 10;
     1224                test++;
     1225                snprintf(name, MEMPOOL_NAME_LEN,
     1226                         "libtrace_pool_%"PRIu32, test);
     1227
     1228                ret = rte_mempool_create(name, n, pkt_size,
     1229                                         128, sizeof(struct rte_pktmbuf_pool_private),
     1230                                         rte_pktmbuf_pool_init, NULL,
     1231                                         rte_pktmbuf_init, NULL,
     1232                                         socket_id, 0);
     1233        }
     1234
     1235        pthread_mutex_unlock(&dpdk_lock);
     1236        return ret;
     1237}
     1238
     1239/** Stores the memory against the DPDK library.
     1240 *
     1241 * @param mempool The mempool to free
     1242 * @param socket_id The NUMA socket this mempool was allocated upon.
     1243 *
     1244 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
     1245 * store the memory shared globally against the format.
     1246 */
     1247static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
     1248        size_t i;
     1249        pthread_mutex_lock(&dpdk_lock);
     1250
     1251        /* We should have all entries back in the mempool */
     1252        rte_mempool_audit(mempool);
     1253        if (!rte_mempool_full(mempool)) {
     1254                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
     1255                        "free all packets before finishing a trace\n",
     1256                        rte_mempool_count(mempool), mempool->size);
     1257        }
     1258
     1259        /* Find the end (+1) of the list */
     1260        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
     1261
     1262        if (i >= RTE_MAX_LCORE) {
     1263                fprintf(stderr, "Too many memory pools, dropping this one\n");
     1264        } else {
     1265                mem_pools[socket_id][i] = mempool;
     1266        }
     1267
     1268        pthread_mutex_unlock(&dpdk_lock);
     1269}
     1270
     1271/* Attach memory to the port and start (or restart) the port/s.
     1272 */
     1273static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
     1274                              char *err, int errlen, uint16_t rx_queues) {
     1275        int ret, i;
     1276        struct rte_eth_link link_info; /* Wait for link */
     1277        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
     1278
     1279        /* Already started */
     1280        if (format_data->paused == DPDK_RUNNING)
     1281                return 0;
     1282
     1283        /* First time started we need to alloc our memory, doing this here
     1284         * rather than in environment setup because we don't have snaplen then */
     1285        if (format_data->paused == DPDK_NEVER_STARTED) {
     1286                if (format_data->snaplen == 0) {
     1287                        format_data->snaplen = RX_MBUF_SIZE;
     1288                        port_conf.rxmode.jumbo_frame = 0;
     1289                        port_conf.rxmode.max_rx_pkt_len = 0;
     1290                } else {
     1291                        /* Use jumbo frames */
     1292                        port_conf.rxmode.jumbo_frame = 1;
     1293                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1294                }
     1295
    8541296#if GET_MAC_CRC_CHECKSUM
    855         format_data->snaplen += ETHER_CRC_LEN;
     1297                /* This is additional overhead so make sure we allow space for this */
     1298                format_data->snaplen += ETHER_CRC_LEN;
    8561299#endif
    8571300#if HAS_HW_TIMESTAMPS_82580
    858         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    859 #endif
    860 
    861         /* Create the mbuf pool, which is the place our packets are allocated
    862          * from - TODO figure out if there is is a free function (I cannot see one)
    863          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    864          * allocate however that extra 1 packet is not used.
    865         * (I assume <= vs < error some where in DPDK code)
    866          * TX requires nb_tx_buffers + 1 in the case the queue is full
    867         * so that will fill the new buffer and wait until slots in the
    868         * ring become available.
    869         */
     1301                format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1302#endif
     1303
     1304                /* Create the mbuf pool, which is the place packets are allocated
     1305                 * from - There is no free function (I cannot see one).
     1306                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1307                 * allocate however that extra 1 packet is not used.
     1308                * (I assume <= vs < error some where in DPDK code)
     1309                 * TX requires nb_tx_buffers + 1 in the case the queue is full
     1310                * so that will fill the new buffer and wait until slots in the
     1311                * ring become available.
     1312                */
    8701313#if DEBUG
    871     fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    872 #endif
    873         format_data->pktmbuf_pool =
    874             rte_mempool_create(format_data->mempool_name,
    875                        format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
    876                        format_data->snaplen + sizeof(struct rte_mbuf)
    877                                         + RTE_PKTMBUF_HEADROOM,
    878                        8, sizeof(struct rte_pktmbuf_pool_private),
    879                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    880                        rte_socket_id(), MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
    881 
    882         if (format_data->pktmbuf_pool == NULL) {
    883             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
    884                         "pool failed: %s", strerror(rte_errno));
    885             return -1;
    886         }
    887     }
    888    
    889     /* ----------- Now do the setup for the port mapping ------------ */
    890     /* Order of calls must be
    891      * rte_eth_dev_configure()
    892      * rte_eth_tx_queue_setup()
    893      * rte_eth_rx_queue_setup()
    894      * rte_eth_dev_start()
    895      * other rte_eth calls
    896      */
    897    
    898     /* This must be called first before another *eth* function
    899      * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    900     ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
    901     if (ret < 0) {
    902         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    903                             " %"PRIu8" : %s", format_data->port,
    904                             strerror(-ret));
    905         return -1;
    906     }
    907     /* Initialise the TX queue a minimum value if using this port for
    908      * receiving. Otherwise a larger size if writing packets.
    909      */
    910     ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    911                         format_data->nb_tx_buf, rte_socket_id(),
    912                         DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
    913     if (ret < 0) {
    914         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    915                             " %"PRIu8" : %s", format_data->port,
    916                             strerror(-ret));
    917         return -1;
    918     }
    919     /* Initialise the RX queue with some packets from memory */
    920     ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
    921                             format_data->nb_rx_buf, rte_socket_id(),
    922                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &rx_conf,
    923                             format_data->pktmbuf_pool);
    924     if (ret < 0) {
    925         snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
    926                     " %"PRIu8" : %s", format_data->port,
    927                     strerror(-ret));
    928         return -1;
    929     }
    930    
    931     /* Start device */
    932     ret = rte_eth_dev_start(format_data->port);
    933     if (ret < 0) {
    934         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    935                     strerror(-ret));
    936         return -1;
    937     }
    938 
    939     /* Default promiscuous to on */
    940     if (format_data->promisc == -1)
    941         format_data->promisc = 1;
    942    
    943     if (format_data->promisc == 1)
    944         rte_eth_promiscuous_enable(format_data->port);
    945     else
    946         rte_eth_promiscuous_disable(format_data->port);
    947    
    948     /* Wait for the link to come up */
    949     rte_eth_link_get(format_data->port, &link_info);
     1314                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
     1315#endif
     1316                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
     1317                                                              format_data->snaplen,
     1318                                                              format_data->nic_numa_node);
     1319
     1320                if (format_data->pktmbuf_pool == NULL) {
     1321                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1322                                 "pool failed: %s", strerror(rte_errno));
     1323                        return -1;
     1324                }
     1325        }
     1326
     1327        /* ----------- Now do the setup for the port mapping ------------ */
     1328        /* Order of calls must be
     1329         * rte_eth_dev_configure()
     1330         * rte_eth_tx_queue_setup()
     1331         * rte_eth_rx_queue_setup()
     1332         * rte_eth_dev_start()
     1333         * other rte_eth calls
     1334         */
     1335
     1336        /* This must be called first before another *eth* function
     1337         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
     1338        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     1339        if (ret < 0) {
     1340                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1341                         " %"PRIu8" : %s", format_data->port,
     1342                         strerror(-ret));
     1343                return -1;
     1344        }
    9501345#if DEBUG
    951     fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    952             (int) link_info.link_duplex, (int) link_info.link_speed);
    953 #endif
    954 
    955     /* We have now successfully started/unpaused */
    956     format_data->paused = DPDK_RUNNING;
    957    
    958     return 0;
     1346        fprintf(stderr, "Doing dev configure\n");
     1347#endif
     1348        /* Initialise the TX queue a minimum value if using this port for
     1349         * receiving. Otherwise a larger size if writing packets.
     1350         */
     1351        ret = rte_eth_tx_queue_setup(format_data->port,
     1352                                     0 /* queue XXX */,
     1353                                     format_data->nb_tx_buf,
     1354                                     SOCKET_ID_ANY,
     1355                                     DPDK_USE_NULL_QUEUE_CONFIG ? NULL : &tx_conf);
     1356        if (ret < 0) {
     1357                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
     1358                         " on port %"PRIu8" : %s", format_data->port,
     1359                         strerror(-ret));
     1360                return -1;
     1361        }
     1362
     1363        /* Attach memory to our RX queues */
     1364        for (i=0; i < rx_queues; i++) {
     1365                dpdk_per_stream_t *stream;
     1366#if DEBUG
     1367                fprintf(stderr, "Configuring queue %d\n", i);
     1368#endif
     1369
     1370                /* Add storage for the stream */
     1371                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
     1372                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
     1373                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
     1374                stream->queue_id = i;
     1375
     1376                if (stream->lcore == -1)
     1377                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
     1378
     1379                if (stream->lcore == -1) {
     1380                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
     1381                                 ". Too many threads?");
     1382                        return -1;
     1383                }
     1384
     1385                if (stream->mempool == NULL) {
     1386                        stream->mempool = dpdk_alloc_memory(
     1387                                                  format_data->nb_rx_buf*2,
     1388                                                  format_data->snaplen,
     1389                                                  rte_lcore_to_socket_id(stream->lcore));
     1390
     1391                        if (stream->mempool == NULL) {
     1392                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1393                                         "pool failed: %s", strerror(rte_errno));
     1394                                return -1;
     1395                        }
     1396                }
     1397
     1398                /* Initialise the RX queue with some packets from memory */
     1399                ret = rte_eth_rx_queue_setup(format_data->port,
     1400                                             stream->queue_id,
     1401                                             format_data->nb_rx_buf,
     1402                                             format_data->nic_numa_node,
     1403                                             DPDK_USE_NULL_QUEUE_CONFIG ? NULL: &rx_conf,
     1404                                             stream->mempool);
     1405                if (ret < 0) {
     1406                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
     1407                                 " RX queue on port %"PRIu8" : %s",
     1408                                 format_data->port,
     1409                                 strerror(-ret));
     1410                        return -1;
     1411                }
     1412        }
     1413
     1414#if DEBUG
     1415        fprintf(stderr, "Doing start device\n");
     1416#endif
     1417        rte_eth_stats_reset(format_data->port);
     1418        /* Start device */
     1419        ret = rte_eth_dev_start(format_data->port);
     1420        if (ret < 0) {
     1421                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1422                         strerror(-ret));
     1423                return -1;
     1424        }
     1425
     1426        /* Default promiscuous to on */
     1427        if (format_data->promisc == -1)
     1428                format_data->promisc = 1;
     1429
     1430        if (format_data->promisc == 1)
     1431                rte_eth_promiscuous_enable(format_data->port);
     1432        else
     1433                rte_eth_promiscuous_disable(format_data->port);
     1434
     1435        /* We have now successfully started/unpased */
     1436        format_data->paused = DPDK_RUNNING;
     1437
     1438
     1439        /* Register a callback for link state changes */
     1440        ret = rte_eth_dev_callback_register(format_data->port,
     1441                                            RTE_ETH_EVENT_INTR_LSC,
     1442                                            dpdk_lsc_callback,
     1443                                            format_data);
     1444#if DEBUG
     1445        if (ret)
     1446                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
     1447                        ret, strerror(-ret));
     1448#endif
     1449
     1450        /* Get the current link status */
     1451        rte_eth_link_get_nowait(format_data->port, &link_info);
     1452        format_data->link_speed = link_info.link_speed;
     1453#if DEBUG
     1454        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
     1455                (int) link_info.link_duplex, (int) link_info.link_speed);
     1456#endif
     1457
     1458        return 0;
    9591459}
    9601460
    9611461static int dpdk_start_input (libtrace_t *libtrace) {
    962     char err[500];
    963     err[0] = 0;
    964 
    965     if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    966         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    967         free(libtrace->format_data);
    968         libtrace->format_data = NULL;
    969         return -1;
    970     }
    971     return 0;
     1462        char err[500];
     1463        err[0] = 0;
     1464
     1465        /* Make sure we don't reserve an extra thread for this */
     1466        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
     1467
     1468        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
     1469                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1470                free(libtrace->format_data);
     1471                libtrace->format_data = NULL;
     1472                return -1;
     1473        }
     1474        return 0;
     1475}
     1476
     1477static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
     1478        struct rte_eth_dev_info dev_info;
     1479        rte_eth_dev_info_get(port_id, &dev_info);
     1480        return dev_info.max_rx_queues;
     1481}
     1482
     1483static inline size_t dpdk_processor_count () {
     1484        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1485        if (nb_cpu <= 0)
     1486                return 1;
     1487        else
     1488                return (size_t) nb_cpu;
     1489}
     1490
     1491static int dpdk_pstart_input (libtrace_t *libtrace) {
     1492        char err[500];
     1493        int i=0, phys_cores=0;
     1494        int tot = libtrace->perpkt_thread_count;
     1495        libtrace_list_node_t *n;
     1496        err[0] = 0;
     1497
     1498        if (rte_lcore_id() != rte_get_master_lcore())
     1499                fprintf(stderr, "Warning dpdk_pstart_input should be called"
     1500                        " from the master DPDK thread!\n");
     1501
     1502        /* If the master is not on the last thread we move it there */
     1503        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1504                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
     1505                        return -1;
     1506        }
     1507
     1508        /* Don't exceed the number of cores in the system/detected by dpdk
     1509         * We don't have to force this but performance wont be good if we don't */
     1510        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1511                if (lcore_config[i].detected) {
     1512                        if (rte_lcore_is_enabled(i)) {
     1513#if DEBUG
     1514                                fprintf(stderr, "Found core %d already in use!\n", i);
     1515#endif
     1516                        } else {
     1517                                phys_cores++;
     1518                        }
     1519                }
     1520        }
     1521        /* If we are restarting we have already allocated some threads as such
     1522         * we add these back to the count for this calculation */
     1523        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
     1524                dpdk_per_stream_t * stream = n->data;
     1525                if (stream->lcore != -1)
     1526                        phys_cores++;
     1527        }
     1528
     1529        tot = MIN(libtrace->perpkt_thread_count,
     1530                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1531        tot = MIN(tot, phys_cores);
     1532
     1533#if DEBUG
     1534        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
     1535                libtrace->perpkt_thread_count, phys_cores);
     1536#endif
     1537
     1538        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1539                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1540                free(libtrace->format_data);
     1541                libtrace->format_data = NULL;
     1542                return -1;
     1543        }
     1544
     1545        /* Make sure we only start the number that we should */
     1546        libtrace->perpkt_thread_count = tot;
     1547        return 0;
     1548}
     1549
     1550/**
     1551 * Register a thread with the DPDK system,
     1552 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1553 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1554 * gives it.
     1555 *
     1556 * We then allow a mapper thread to be started on every real core as DPDK would,
     1557 * we also bind these to the corresponding CPU cores.
     1558 *
     1559 * @param libtrace A pointer to the trace
     1560 * @param reading True if the thread will be used to read packets, i.e. will
     1561 *                call pread_packet(), false if thread used to process packet
     1562 *                in any other manner including statistics functions.
     1563 */
     1564static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1565{
     1566#if DEBUG
     1567        char name[99];
     1568        name[0] = 0;
     1569#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
     1570        pthread_getname_np(pthread_self(),
     1571                           name, sizeof(name));
     1572#endif
     1573#endif
     1574        if (reading) {
     1575                dpdk_per_stream_t *stream;
     1576                /* Attach our thread */
     1577                if(t->type == THREAD_PERPKT) {
     1578                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
     1579                        if (t->format_data == NULL) {
     1580                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     1581                                              "Too many threads registered");
     1582                                return -1;
     1583                        }
     1584                } else {
     1585                        t->format_data = FORMAT_DATA_FIRST(libtrace);
     1586                }
     1587                stream = t->format_data;
     1588#if DEBUG
     1589                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
     1590#endif
     1591                return dpdk_register_lcore(libtrace, true, stream->lcore);
     1592        } else {
     1593                int lcore = dpdk_reserve_lcore(reading, 0);
     1594                if (lcore == -1) {
     1595                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
     1596                                      " for DPDK");
     1597                        return -1;
     1598                }
     1599#if DEBUG
     1600                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
     1601#endif
     1602                return dpdk_register_lcore(libtrace, false, lcore);
     1603        }
     1604
     1605        return 0;
     1606}
     1607
     1608/**
     1609 * Unregister a thread with the DPDK system.
     1610 *
     1611 * Only previously registered threads should be calling this just before
     1612 * they are destroyed.
     1613 */
     1614static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
     1615{
     1616        struct rte_config *cfg = rte_eal_get_configuration();
     1617
     1618        assert(rte_lcore_id() < RTE_MAX_LCORE);
     1619        pthread_mutex_lock(&dpdk_lock);
     1620        /* Skip if master */
     1621        if (rte_lcore_id() == rte_get_master_lcore()) {
     1622                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1623                pthread_mutex_unlock(&dpdk_lock);
     1624                return;
     1625        }
     1626
     1627        /* Disable this core in global DPDK structs */
     1628        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1629        cfg->lcore_count--;
     1630        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1631        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1632        pthread_mutex_unlock(&dpdk_lock);
     1633        return;
    9721634}
    9731635
    9741636static int dpdk_start_output(libtrace_out_t *libtrace)
    9751637{
    976     char err[500];
    977     err[0] = 0;
    978    
    979     if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    980         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    981         free(libtrace->format_data);
    982         libtrace->format_data = NULL;
    983         return -1;
    984     }
    985     return 0;
    986 }
    987 
    988 static int dpdk_pause_input(libtrace_t * libtrace){
    989     /* This stops the device, but can be restarted using rte_eth_dev_start() */
    990     if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
    991 #if DEBUG     
    992         fprintf(stderr, "Pausing port\n");
    993 #endif
    994         rte_eth_dev_stop(FORMAT(libtrace)->port);
    995         FORMAT(libtrace)->paused = DPDK_PAUSED;
    996         /* If we pause it the driver will be reset and likely our counter */
     1638        char err[500];
     1639        err[0] = 0;
     1640
     1641        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
     1642                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1643                free(libtrace->format_data);
     1644                libtrace->format_data = NULL;
     1645                return -1;
     1646        }
     1647        return 0;
     1648}
     1649
     1650static int dpdk_pause_input(libtrace_t * libtrace) {
     1651        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
     1652        /* This stops the device, but can be restarted using rte_eth_dev_start() */
     1653        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
     1654#if DEBUG
     1655                fprintf(stderr, "Pausing DPDK port\n");
     1656#endif
     1657                rte_eth_dev_stop(FORMAT(libtrace)->port);
     1658                FORMAT(libtrace)->paused = DPDK_PAUSED;
     1659                /* Empty the queue of packets */
     1660                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
     1661                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
     1662                }
     1663                FORMAT(libtrace)->burst_offset = 0;
     1664                FORMAT(libtrace)->burst_size = 0;
     1665
     1666                for (; tmp != NULL; tmp = tmp->next) {
     1667                        dpdk_per_stream_t *stream = tmp->data;
     1668                        stream->ts_last_sys = 0;
    9971669#if HAS_HW_TIMESTAMPS_82580
    998         FORMAT(libtrace)->ts_first_sys = 0;
    999         FORMAT(libtrace)->ts_last_sys = 0;
    1000 #endif
    1001     }
    1002     return 0;
    1003 }
    1004 
    1005 static int dpdk_write_packet(libtrace_out_t *trace,
    1006                 libtrace_packet_t *packet){
    1007     struct rte_mbuf* m_buff[1];
    1008    
    1009     int wirelen = trace_get_wire_length(packet);
    1010     int caplen = trace_get_capture_length(packet);
    1011    
    1012     /* Check for a checksum and remove it */
    1013     if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
    1014                                             wirelen == caplen)
    1015         caplen -= ETHER_CRC_LEN;
    1016 
    1017     m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
    1018     if (m_buff[0] == NULL) {
    1019         trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
    1020         return -1;
    1021     } else {
    1022         int ret;
    1023         memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
    1024         do {
    1025             ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
    1026         } while (ret != 1);
    1027     }
    1028 
    1029     return 0;
     1670                        stream->ts_first_sys = 0;
     1671#endif
     1672                }
     1673
     1674        }
     1675        return 0;
     1676}
     1677
     1678static int dpdk_write_packet(libtrace_out_t *trace,
     1679                             libtrace_packet_t *packet){
     1680        struct rte_mbuf* m_buff[1];
     1681
     1682        int wirelen = trace_get_wire_length(packet);
     1683        int caplen = trace_get_capture_length(packet);
     1684
     1685        /* Check for a checksum and remove it */
     1686        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
     1687            wirelen == caplen)
     1688                caplen -= ETHER_CRC_LEN;
     1689
     1690        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
     1691        if (m_buff[0] == NULL) {
     1692                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
     1693                return -1;
     1694        } else {
     1695                int ret;
     1696                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
     1697                do {
     1698                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
     1699                } while (ret != 1);
     1700        }
     1701
     1702        return 0;
    10301703}
    10311704
    10321705static int dpdk_fin_input(libtrace_t * libtrace) {
    1033     /* Free our memory structures */
    1034     if (libtrace->format_data != NULL) {
    1035         /* Close the device completely, device cannot be restarted */
    1036         if (FORMAT(libtrace)->port != 0xFF)
    1037             rte_eth_dev_close(FORMAT(libtrace)->port);
    1038         /* filter here if we used it */
     1706        libtrace_list_node_t * n;
     1707        /* Free our memory structures */
     1708        if (libtrace->format_data != NULL) {
     1709
     1710                if (FORMAT(libtrace)->port != 0xFF)
     1711                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
     1712                                                        RTE_ETH_EVENT_INTR_LSC,
     1713                                                        dpdk_lsc_callback,
     1714                                                        FORMAT(libtrace));
     1715                /* Close the device completely, device cannot be restarted */
     1716                rte_eth_dev_close(FORMAT(libtrace)->port);
     1717
     1718                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
     1719                                 FORMAT(libtrace)->nic_numa_node);
     1720
     1721                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
     1722                        dpdk_per_stream_t * stream = n->data;
     1723                        if (stream->mempool)
     1724                                dpdk_free_memory(stream->mempool,
     1725                                                 rte_lcore_to_socket_id(stream->lcore));
     1726                }
     1727
     1728                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
     1729                /* filter here if we used it */
    10391730                free(libtrace->format_data);
    10401731        }
    10411732
    1042     /* Revert to the original PCI drivers */
    1043     /* No longer in DPDK
    1044     rte_eal_pci_exit(); */
    1045     return 0;
     1733        return 0;
    10461734}
    10471735
    10481736
    10491737static int dpdk_fin_output(libtrace_out_t * libtrace) {
    1050     /* Free our memory structures */
    1051     if (libtrace->format_data != NULL) {
    1052         /* Close the device completely, device cannot be restarted */
    1053         if (FORMAT(libtrace)->port != 0xFF)
    1054             rte_eth_dev_close(FORMAT(libtrace)->port);
    1055         /* filter here if we used it */
     1738        /* Free our memory structures */
     1739        if (libtrace->format_data != NULL) {
     1740                /* Close the device completely, device cannot be restarted */
     1741                if (FORMAT(libtrace)->port != 0xFF)
     1742                        rte_eth_dev_close(FORMAT(libtrace)->port);
     1743                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
     1744                /* filter here if we used it */
    10561745                free(libtrace->format_data);
    10571746        }
    10581747
    1059     /* Revert to the original PCI drivers */
    1060     /* No longer in DPDK
    1061     rte_eal_pci_exit(); */
    1062     return 0;
    1063 }
    1064 
    1065 /**
    1066  * Get the start of additional header that we added to a packet.
     1748        return 0;
     1749}
     1750
     1751/**
     1752 * Get the start of the additional header that we added to a packet.
    10671753 */
    10681754static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
    1069     uint8_t *hdrsize;
    1070     assert(packet);
    1071     assert(packet->buffer);
    1072     hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
    1073     /* The byte before the original packet data denotes the size in bytes
    1074      * of our additional header that we added sits before the 'size byte' */
    1075     hdrsize--;
    1076     return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
     1755        assert(packet);
     1756        assert(packet->buffer);
     1757        /* Our header sits straight after the mbuf header */
     1758        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
    10771759}
    10781760
    10791761static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
    1080     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1081     return hdr->cap_len;
     1762        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1763        return hdr->cap_len;
    10821764}
    10831765
    10841766static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
    1085     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1086     if (size > hdr->cap_len) {
    1087         /* Cannot make a packet bigger */
     1767        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1768        if (size > hdr->cap_len) {
     1769                /* Cannot make a packet bigger */
    10881770                return trace_get_capture_length(packet);
    10891771        }
    10901772
    1091     /* Reset the cached capture length first*/
    1092     packet->capture_length = -1;
    1093     hdr->cap_len = (uint32_t) size;
     1773        /* Reset the cached capture length first*/
     1774        packet->capture_length = -1;
     1775        hdr->cap_len = (uint32_t) size;
    10941776        return trace_get_capture_length(packet);
    10951777}
    10961778
    10971779static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
    1098     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1099     int org_cap_size; /* The original capture size */
    1100     if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
    1101         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1102                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
    1103                             sizeof(struct hw_timestamp_82580);
    1104     } else {
    1105         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1106                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
    1107     }
    1108     if (hdr->flags & INCLUDES_CHECKSUM) {
    1109         return org_cap_size;
    1110     } else {
    1111         /* DPDK packets are always TRACE_TYPE_ETH packets */
    1112         return org_cap_size + ETHER_CRC_LEN;
    1113     }
    1114 }
     1780        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1781        int org_cap_size; /* The original capture size */
     1782        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
     1783                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
     1784                               sizeof(struct hw_timestamp_82580);
     1785        } else {
     1786                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
     1787        }
     1788        if (hdr->flags & INCLUDES_CHECKSUM) {
     1789                return org_cap_size;
     1790        } else {
     1791                /* DPDK packets are always TRACE_TYPE_ETH packets */
     1792                return org_cap_size + ETHER_CRC_LEN;
     1793        }
     1794}
     1795
    11151796static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
    1116     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1117     if (hdr->flags & INCLUDES_HW_TIMESTAMP)
    1118         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
    1119                 sizeof(struct hw_timestamp_82580);
    1120     else
    1121         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1797        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1798        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
     1799                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
     1800                                sizeof(struct hw_timestamp_82580);
     1801        else
     1802                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
    11221803}
    11231804
    11241805static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
    1125                 libtrace_packet_t *packet, void *buffer,
    1126                 libtrace_rt_types_t rt_type, uint32_t flags) {
    1127     assert(packet);
    1128     if (packet->buffer != buffer &&
    1129         packet->buf_control == TRACE_CTRL_PACKET) {
    1130         free(packet->buffer);
    1131     }
    1132 
    1133     if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    1134         packet->buf_control = TRACE_CTRL_PACKET;
    1135     } else
    1136         packet->buf_control = TRACE_CTRL_EXTERNAL;
    1137 
    1138     packet->buffer = buffer;
    1139     packet->header = buffer;
    1140 
    1141     /* Don't use pktmbuf_mtod will fail if the packet is a copy */
    1142     packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
    1143     packet->type = rt_type;
    1144     return 0;
    1145 }
    1146 
    1147 /*
    1148  * Does any extra preperation to a captured packet.
    1149  * This includes adding our extra header to it with the timestamp
    1150  */
    1151 static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
    1152                                                         struct rte_mbuf* pkt){
    1153     uint8_t * hdr_size;
    1154     struct dpdk_addt_hdr *hdr;
     1806                               libtrace_packet_t *packet, void *buffer,
     1807                               libtrace_rt_types_t rt_type, uint32_t flags) {
     1808        assert(packet);
     1809        if (packet->buffer != buffer &&
     1810            packet->buf_control == TRACE_CTRL_PACKET) {
     1811                free(packet->buffer);
     1812        }
     1813
     1814        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
     1815                packet->buf_control = TRACE_CTRL_PACKET;
     1816        else
     1817                packet->buf_control = TRACE_CTRL_EXTERNAL;
     1818
     1819        packet->buffer = buffer;
     1820        packet->header = buffer;
     1821
     1822        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
     1823        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
     1824        packet->type = rt_type;
     1825        return 0;
     1826}
     1827
     1828/**
     1829 * Given a packet size and a link speed, computes the
     1830 * time to transmit in nanoseconds.
     1831 *
     1832 * @param format_data The dpdk format data from which we get the link speed
     1833 *        and if unset updates it in a thread safe manner
     1834 * @param pkt_size The size of the packet in bytes
     1835 * @return The wire time in nanoseconds
     1836 */
     1837static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
     1838        uint32_t wire_time;
     1839        /* 20 extra bytes of interframe gap and preamble */
     1840# if GET_MAC_CRC_CHECKSUM
     1841        wire_time = ((pkt_size + 20) * 8000);
     1842# else
     1843        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
     1844# endif
     1845
     1846        /* Division is really slow and introduces a pipeline stall
     1847         * The compiler will optimise this into magical multiplication and shifting
     1848         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
     1849         */
     1850retry_calc_wiretime:
     1851        switch (format_data->link_speed) {
     1852        case ETH_LINK_SPEED_40G:
     1853                wire_time /=  ETH_LINK_SPEED_40G;
     1854                break;
     1855        case ETH_LINK_SPEED_20G:
     1856                wire_time /= ETH_LINK_SPEED_20G;
     1857                break;
     1858        case ETH_LINK_SPEED_10G:
     1859                wire_time /= ETH_LINK_SPEED_10G;
     1860                break;
     1861        case ETH_LINK_SPEED_1000:
     1862                wire_time /= ETH_LINK_SPEED_1000;
     1863                break;
     1864        case 0:
     1865                {
     1866                /* Maybe the link was down originally, but now it should be up */
     1867                struct rte_eth_link link = {0};
     1868                rte_eth_link_get_nowait(format_data->port, &link);
     1869                if (link.link_status && link.link_speed) {
     1870                        format_data->link_speed = link.link_speed;
     1871#ifdef DEBUG
     1872                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
     1873#endif
     1874                        goto retry_calc_wiretime;
     1875                }
     1876                /* We don't know the link speed, make sure numbers are counting up */
     1877                wire_time = 1;
     1878                break;
     1879                }
     1880        default:
     1881                wire_time /= format_data->link_speed;
     1882        }
     1883        return wire_time;
     1884}
     1885
     1886/**
     1887 * Does any extra preperation to all captured packets
     1888 * This includes adding our extra header to it with the timestamp,
     1889 * and any snapping
     1890 *
     1891 * @param format_data The DPDK format data
     1892 * @param plc The DPDK per lcore format data
     1893 * @param pkts An array of size nb_pkts of DPDK packets
     1894 */
     1895static inline void dpdk_ready_pkts(libtrace_t *libtrace,
     1896                                   struct dpdk_per_stream_t *plc,
     1897                                   struct rte_mbuf **pkts,
     1898                                   size_t nb_pkts) {
     1899        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
     1900        struct dpdk_addt_hdr *hdr;
     1901        size_t i;
     1902        uint64_t cur_sys_time_ns;
    11551903#if HAS_HW_TIMESTAMPS_82580
    1156     struct hw_timestamp_82580 *hw_ts;
    1157     struct timeval cur_sys_time;
    1158     uint64_t cur_sys_time_ns;
    1159     uint64_t estimated_wraps;
    1160    
    1161     /* Using gettimeofday because it's most likely to be a vsyscall
    1162      * We don't want to slow down anything with systemcalls we dont need
    1163      * accauracy */
    1164     gettimeofday(&cur_sys_time, NULL);
     1904        struct hw_timestamp_82580 *hw_ts;
     1905        uint64_t estimated_wraps;
    11651906#else
    1166 # if USE_CLOCK_GETTIME
    1167     struct timespec cur_sys_time;
    1168    
    1169     /* This looks terrible and I feel bad doing it. But it's OK
    1170      * on new kernels, because this is a vsyscall */
    1171     clock_gettime(CLOCK_REALTIME, &cur_sys_time);
    1172 # else
    1173     struct timeval cur_sys_time;
    1174     /* Should be a vsyscall */
    1175     gettimeofday(&cur_sys_time, NULL);
    1176 # endif
    1177 #endif
    1178 
    1179     /* Record the size of our header */
    1180     hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
    1181     *hdr_size = sizeof(struct dpdk_addt_hdr);
    1182     /* Now put our header in front of that size */
    1183     hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
    1184     memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
    1185    
     1907
     1908#endif
     1909
     1910#if USE_CLOCK_GETTIME
     1911        struct timespec cur_sys_time = {0};
     1912        /* This looks terrible and I feel bad doing it. But it's OK
     1913         * on new kernels, because this is a fast vsyscall */
     1914        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
     1915        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
     1916#else
     1917        struct timeval cur_sys_time = {0};
     1918        /* Also a fast vsyscall */
     1919        gettimeofday(&cur_sys_time, NULL);
     1920        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
     1921#endif
     1922
     1923        /* The system clock is not perfect so when running
     1924         * at linerate we could timestamp a packet in the past.
     1925         * To avoid this we munge the timestamp to appear 1ns
     1926         * after the previous packet. We should eventually catch up
     1927         * to system time since a 64byte packet on a 10G link takes 67ns.
     1928         *
     1929         * Note with parallel readers timestamping packets
     1930         * with duplicate stamps or out of order is unavoidable without
     1931         * hardware timestamping from the NIC.
     1932         */
     1933#if !HAS_HW_TIMESTAMPS_82580
     1934        if (plc->ts_last_sys >= cur_sys_time_ns) {
     1935                cur_sys_time_ns = plc->ts_last_sys + 1;
     1936        }
     1937#endif
     1938
     1939        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
     1940        for (i = 0 ; i < nb_pkts ; ++i) {
     1941
     1942                /* We put our header straight after the dpdk header */
     1943                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
     1944                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
     1945
    11861946#if GET_MAC_CRC_CHECKSUM
    1187     /* Add back in the CRC sum */
    1188     rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
    1189     rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
    1190     hdr->flags |= INCLUDES_CHECKSUM;
    1191 #endif
     1947                /* Add back in the CRC sum */
     1948                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
     1949                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
     1950                hdr->flags |= INCLUDES_CHECKSUM;
     1951#endif
     1952
     1953                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
    11921954
    11931955#if HAS_HW_TIMESTAMPS_82580
    1194     /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
    1195      *
    1196      *        +----------+---+   +--------------+
    1197      *  82580 |    24    | 8 |   |      32      |
    1198      *        +----------+---+   +--------------+
    1199      *          reserved  \______ 40 bits _____/
    1200      *
    1201      * The 40 bit 82580 SYSTIM overflows every
    1202      *   2^40 * 10^-9 /  60  = 18.3 minutes.
    1203      *
    1204      * NOTE picture is in Big Endian order, in memory it's acutally in Little
    1205      * Endian (for the full 64 bits) i.e. picture is mirrored
    1206      */
    1207    
    1208     /* The timestamp is sitting before our packet and is included in pkt_len */
    1209     hdr->flags |= INCLUDES_HW_TIMESTAMP;
    1210     hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
    1211    
    1212     /* Despite what the documentation says this is in Little
    1213      * Endian byteorder. Mask the reserved section out.
    1214      */
    1215     hdr->timestamp = le64toh(hw_ts->timestamp) &
    1216                 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
    1217                
    1218     cur_sys_time_ns = TV_TO_NS(cur_sys_time);
    1219     if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
    1220         FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
    1221         FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
    1222     }
    1223    
    1224     /* This will have serious problems if packets aren't read quickly
    1225      * that is within a couple of seconds because our clock cycles every
    1226      * 18 seconds */
    1227     estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
    1228                             / (1ull<<TS_NBITS_82580);
    1229    
    1230     /* Estimated_wraps gives the number of times the counter should have
    1231      * wrapped (however depending on value last time it could have wrapped
    1232      * twice more (if hw clock is close to its max value) or once less (allowing
    1233      * for a bit of variance between hw and sys clock). But if the clock
    1234      * shouldn't have wrapped once then don't allow it to go backwards in time */
    1235     if (unlikely(estimated_wraps >= 2)) {
    1236         /* 2 or more wrap arounds add all but the very last wrap */
    1237         FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
    1238     }
    1239    
    1240     /* Set the timestamp to the lowest possible value we're considering */
    1241     hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
    1242                         FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
    1243    
    1244     /* In most runs only the first if() will need evaluating - i.e our
    1245      * estimate is correct. */
    1246     if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
    1247                                 hdr->timestamp, MAXSKEW_82580))) {
    1248         /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
    1249         FORMAT(libtrace)->wrap_count++;
    1250         hdr->timestamp += (1ull<<TS_NBITS_82580);
    1251         if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1252                                 hdr->timestamp, MAXSKEW_82580)) {
    1253             /* Failed to match estimated_wraps */
    1254             FORMAT(libtrace)->wrap_count++;
    1255             hdr->timestamp += (1ull<<TS_NBITS_82580);
    1256             if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1257                                 hdr->timestamp, MAXSKEW_82580)) {
    1258                 if (estimated_wraps == 0) {
    1259                     /* 0 case Failed to match estimated_wraps+2 */
    1260                     printf("WARNING - Hardware Timestamp failed to"
    1261                                             " match using systemtime!\n");
    1262                     hdr->timestamp = cur_sys_time_ns;
    1263                 } else {
    1264                     /* Failed to match estimated_wraps+1 */
    1265                     FORMAT(libtrace)->wrap_count++;
    1266                     hdr->timestamp += (1ull<<TS_NBITS_82580);
    1267                     if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1268                                 hdr->timestamp, MAXSKEW_82580)) {
    1269                         /* Failed to match estimated_wraps+2 */
    1270                         printf("WARNING - Hardware Timestamp failed to"
    1271                                             " match using systemtime!!\n");
    1272                     }
    1273                 }
    1274             }
    1275         }
    1276     }
    1277 
    1278     /* Log our previous for the next loop */
    1279     FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
    1280 
     1956                /* The timestamp is sitting before our packet and is included in pkt_len */
     1957                hdr->flags |= INCLUDES_HW_TIMESTAMP;
     1958                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
     1959                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
     1960
     1961                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
     1962                 *
     1963                 *        +----------+---+   +--------------+
     1964                 *  82580 |    24    | 8 |   |      32      |
     1965                 *        +----------+---+   +--------------+
     1966                 *          reserved  \______ 40 bits _____/
     1967                 *
     1968                 * The 40 bit 82580 SYSTIM overflows every
     1969                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
     1970                 *
     1971                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
     1972                 * Endian (for the full 64 bits) i.e. picture is mirrored
     1973                 */
     1974
     1975                /* Despite what the documentation says this is in Little
     1976                 * Endian byteorder. Mask the reserved section out.
     1977                 */
     1978                hdr->timestamp = le64toh(hw_ts->timestamp) &
     1979                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
     1980
     1981                if (unlikely(plc->ts_first_sys == 0)) {
     1982                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
     1983                        plc->ts_last_sys = plc->ts_first_sys;
     1984                }
     1985
     1986                /* This will have serious problems if packets aren't read quickly
     1987                 * that is within a couple of seconds because our clock cycles every
     1988                 * 18 seconds */
     1989                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
     1990                                  / (1ull<<TS_NBITS_82580);
     1991
     1992                /* Estimated_wraps gives the number of times the counter should have
     1993                 * wrapped (however depending on value last time it could have wrapped
     1994                 * twice more (if hw clock is close to its max value) or once less (allowing
     1995                 * for a bit of variance between hw and sys clock). But if the clock
     1996                 * shouldn't have wrapped once then don't allow it to go backwards in time */
     1997                if (unlikely(estimated_wraps >= 2)) {
     1998                        /* 2 or more wrap arounds add all but the very last wrap */
     1999                        plc->wrap_count += estimated_wraps - 1;
     2000                }
     2001
     2002                /* Set the timestamp to the lowest possible value we're considering */
     2003                hdr->timestamp += plc->ts_first_sys +
     2004                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
     2005
     2006                /* In most runs only the first if() will need evaluating - i.e our
     2007                 * estimate is correct. */
     2008                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
     2009                                              hdr->timestamp, MAXSKEW_82580))) {
     2010                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
     2011                        plc->wrap_count++;
     2012                        hdr->timestamp += (1ull<<TS_NBITS_82580);
     2013                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2014                                             hdr->timestamp, MAXSKEW_82580)) {
     2015                                /* Failed to match estimated_wraps */
     2016                                plc->wrap_count++;
     2017                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     2018                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2019                                                     hdr->timestamp, MAXSKEW_82580)) {
     2020                                        if (estimated_wraps == 0) {
     2021                                                /* 0 case Failed to match estimated_wraps+2 */
     2022                                                printf("WARNING - Hardware Timestamp failed to"
     2023                                                       " match using systemtime!\n");
     2024                                                hdr->timestamp = cur_sys_time_ns;
     2025                                        } else {
     2026                                                /* Failed to match estimated_wraps+1 */
     2027                                                plc->wrap_count++;
     2028                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     2029                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2030                                                                     hdr->timestamp, MAXSKEW_82580)) {
     2031                                                        /* Failed to match estimated_wraps+2 */
     2032                                                        printf("WARNING - Hardware Timestamp failed to"
     2033                                                               " match using systemtime!!\n");
     2034                                                }
     2035                                        }
     2036                                }
     2037                        }
     2038                }
    12812039#else
    1282 # if USE_CLOCK_GETTIME
    1283     hdr->timestamp = TS_TO_NS(cur_sys_time);
    1284 # else
    1285     hdr->timestamp = TV_TO_NS(cur_sys_time);
    1286 # endif
    1287 #endif
    1288 
    1289     /* Intels samples prefetch into level 0 cache lets assume it is a good
    1290      * idea and do the same */
    1291     rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
    1292     packet->buffer = pkt;
    1293     dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
    1294 
    1295     /* Set our capture length for the first time */
    1296     hdr->cap_len = dpdk_get_wire_length(packet);
    1297     if (!(hdr->flags & INCLUDES_CHECKSUM)) {
    1298         hdr->cap_len -= ETHER_CRC_LEN;
    1299     }
    1300    
    1301 
    1302     return dpdk_get_framing_length(packet) +
    1303                         dpdk_get_capture_length(packet);
     2040
     2041                hdr->timestamp = cur_sys_time_ns;
     2042                /* Offset the next packet by the wire time of previous */
     2043                calculate_wire_time(format_data, hdr->cap_len);
     2044
     2045#endif
     2046        }
     2047
     2048        plc->ts_last_sys = cur_sys_time_ns;
     2049        return;
     2050}
     2051
     2052
     2053static void dpdk_fin_packet(libtrace_packet_t *packet)
     2054{
     2055        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     2056                rte_pktmbuf_free(packet->buffer);
     2057                packet->buffer = NULL;
     2058        }
     2059}
     2060
     2061/** Reads at least one packet or returns an error
     2062 */
     2063static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
     2064                                           dpdk_per_stream_t *stream,
     2065                                           libtrace_message_queue_t *mesg,
     2066                                           struct rte_mbuf* pkts_burst[],
     2067                                           size_t nb_packets) {
     2068        size_t nb_rx; /* Number of rx packets we've recevied */
     2069        while (1) {
     2070                /* Poll for a batch of packets */
     2071                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
     2072                                         stream->queue_id, pkts_burst, nb_packets);
     2073                if (nb_rx > 0) {
     2074                        /* Got some packets - otherwise we keep spining */
     2075                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
     2076                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     2077                        return nb_rx;
     2078                }
     2079                /* Check the message queue this could be less than 0 */
     2080                if (mesg && libtrace_message_queue_count(mesg) > 0)
     2081                        return READ_MESSAGE;
     2082                if (libtrace_halt)
     2083                        return READ_EOF;
     2084                /* Wait a while, polling on memory degrades performance
     2085                 * This relieves the pressure on memory allowing the NIC to DMA */
     2086                rte_delay_us(10);
     2087        }
     2088
     2089        /* We'll never get here - but if we did it would be bad */
     2090        return READ_ERROR;
     2091}
     2092
     2093static int dpdk_pread_packets (libtrace_t *libtrace,
     2094                                    libtrace_thread_t *t,
     2095                                    libtrace_packet_t **packets,
     2096                                    size_t nb_packets) {
     2097        int nb_rx; /* Number of rx packets we've recevied */
     2098        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
     2099        int i;
     2100        dpdk_per_stream_t *stream = t->format_data;
     2101
     2102        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
     2103                                         pkts_burst, nb_packets);
     2104
     2105        if (nb_rx > 0) {
     2106                for (i = 0; i < nb_rx; ++i) {
     2107                        if (packets[i]->buffer != NULL) {
     2108                                /* The packet should always be finished */
     2109                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
     2110                                free(packets[i]->buffer);
     2111                        }
     2112                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
     2113                        packets[i]->type = TRACE_RT_DATA_DPDK;
     2114                        packets[i]->buffer = pkts_burst[i];
     2115                        packets[i]->trace = libtrace;
     2116                        packets[i]->error = 1;
     2117                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
     2118                }
     2119        }
     2120
     2121        return nb_rx;
    13042122}
    13052123
    13062124static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    1307     int nb_rx; /* Number of rx packets we've recevied */
    1308     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
    1309 
    1310     /* Free the last packet buffer */
    1311     if (packet->buffer != NULL) {
    1312         /* Buffer is owned by DPDK */
    1313         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1314             rte_pktmbuf_free(packet->buffer);
    1315             packet->buffer = NULL;
    1316         } else
    1317         /* Buffer is owned by packet i.e. has been malloc'd */
    1318         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1319             free(packet->buffer);
    1320             packet->buffer = NULL;
    1321         }
    1322     }
    1323    
    1324     packet->buf_control = TRACE_CTRL_EXTERNAL;
    1325     packet->type = TRACE_RT_DATA_DPDK;
    1326    
    1327     /* Wait for a packet */
    1328     while (1) {
    1329         /* Poll for a single packet */
    1330         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    1331                             FORMAT(libtrace)->queue_id, pkts_burst, 1);
    1332         if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    1333             return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    1334         }
    1335         if (libtrace_halt) {
    1336             return 0;
    1337         }
    1338     }
    1339    
    1340     /* We'll never get here - but if we did it would be bad */
    1341     return -1;
     2125        int nb_rx; /* Number of rx packets we've received */
     2126        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
     2127
     2128        /* Free the last packet buffer */
     2129        if (packet->buffer != NULL) {
     2130                /* The packet should always be finished */
     2131                assert(packet->buf_control == TRACE_CTRL_PACKET);
     2132                free(packet->buffer);
     2133                packet->buffer = NULL;
     2134        }
     2135
     2136        packet->buf_control = TRACE_CTRL_EXTERNAL;
     2137        packet->type = TRACE_RT_DATA_DPDK;
     2138
     2139        /* Check if we already have some packets buffered */
     2140        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
     2141                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
     2142                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2143                return 1; // TODO should be bytes read, which essentially useless anyway
     2144        }
     2145
     2146        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
     2147                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
     2148
     2149        if (nb_rx > 0) {
     2150                FORMAT(libtrace)->burst_size = nb_rx;
     2151                FORMAT(libtrace)->burst_offset = 1;
     2152                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
     2153                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2154                return 1;
     2155        }
     2156        return nb_rx;
    13422157}
    13432158
    13442159static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
    1345     struct timeval tv;
    1346     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1347    
    1348     tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    1349     tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
    1350     return tv;
     2160        struct timeval tv;
     2161        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2162
     2163        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
     2164        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
     2165        return tv;
    13512166}
    13522167
    13532168static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
    1354     struct timespec ts;
    1355     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1356    
    1357     ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    1358     ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
    1359     return ts;
     2169        struct timespec ts;
     2170        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2171
     2172        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
     2173        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
     2174        return ts;
    13602175}
    13612176
    13622177static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
    1363     return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
     2178        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
    13642179}
    13652180
    13662181static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
    1367     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1368     return (libtrace_direction_t) hdr->direction;
     2182        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2183        return (libtrace_direction_t) hdr->direction;
    13692184}
    13702185
    13712186static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
    1372     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1373     hdr->direction = (uint8_t) direction;
    1374     return (libtrace_direction_t) hdr->direction;
    1375 }
    1376