Changeset d420777


Ignore:
Timestamp:
08/19/15 13:37:05 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
461582b
Parents:
c24de65 (diff), 6210f82 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge remote branch 'parallel/develop' into libtrace4

Files:
48 added
1 deleted
29 edited

Legend:

Unmodified
Added
Removed
  • README

    r3e5518a r4631115  
     1This fork of Libtrace aims to support parallel packet processing.
     2
     3This is still work in progress and is full of bugs, some of the original
     4Libtrace functions might not function correctly breaking the supplied tools.
     5
    16libtrace 3.0.22
    27
  • configure.in

    rc24de65 rd420777  
    3939        tools/traceends/Makefile
    4040        examples/Makefile examples/skeleton/Makefile examples/rate/Makefile
    41         examples/stats/Makefile examples/tutorial/Makefile
     41        examples/stats/Makefile examples/tutorial/Makefile examples/parallel/Makefile
    4242        docs/libtrace.doxygen
    4343        lib/libtrace.h
     
    401401fi
    402402
     403# If we use DPDK we might be able to use libnuma
     404AC_CHECK_LIB(numa, numa_node_to_cpus, have_numa=1, have_numa=0)
     405
    403406# Checks for various "optional" libraries
    404407AC_CHECK_LIB(pthread, pthread_create, have_pthread=1, have_pthread=0)
     408
     409AC_CHECK_LIB(pthread, pthread_setname_np, have_pthread_setname_np=1, have_pthread_setname_np=0)
    405410
    406411# Check for ncurses
     
    420425AC_CHECK_LIB(rt, clock_gettime, have_clock_gettime=1, have_clock_gettime=0)
    421426LIBS=
     427
     428if test "$have_numa" = 1; then
     429        LIBTRACE_LIBS="$LIBTRACE_LIBS -lnuma"
     430        AC_DEFINE(HAVE_LIBNUMA, 1, [Set to 1 if libnuma is supported])
     431        with_numa=yes
     432else
     433        AC_DEFINE(HAVE_LIBNUMA, 0, [Set to 1 if libnuma is supported])
     434        with_numa=no
     435fi
    422436
    423437if test "$dlfound" = 0; then
     
    434448if test "$have_pthread" = 1; then
    435449        AC_DEFINE(HAVE_LIBPTHREAD, 1, [Set to 1 if pthreads are supported])
     450fi
     451
     452if test "$have_pthread_setname_np" = 1; then
     453        AC_DEFINE(HAVE_PTHREAD_SETNAME_NP, 1, [Set to 1 if pthread_setname_np is found])
    436454fi
    437455
     
    453471
    454472if test "$have_clock_gettime" = 1; then
    455     LIBTRACE_LIBS="$LIBTRACE_LIBS -lrt"
     473        LIBTRACE_LIBS="$LIBTRACE_LIBS -lrt"
     474        AC_DEFINE(HAVE_CLOCK_GETTIME, 1, [Set to 1 if clock_gettime is supported])
     475        with_clock_gettime=yes
     476else
     477        AC_DEFINE(HAVE_CLOCK_GETTIME, 0, [Set to 1 if clock_gettime is supported])
     478        with_clock_gettime=no
    456479fi
    457480
     
    599622
    600623if test x"$libtrace_dpdk" = xtrue; then
    601     AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes])
     624        AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes])
     625        reportopt "Compiled with DPDK trace NUMA support" $with_numa
     626        reportopt "Compiled with clock_gettime support" $with_clock_gettime
    602627elif test x"$want_dpdk" != "xno"; then
    603628#   We don't officially support DPDK so only report failure if the user
    604629#   explicitly asked for DPDK. That way, we can hopefully keep it hidden
    605630#   from most users for now...
    606        
    607     AC_MSG_NOTICE([Compiled with DPDK live capture support: No])
    608     AC_MSG_NOTICE([Note: Requires DPDK v1.5 or newer])
     631
     632        AC_MSG_NOTICE([Compiled with DPDK live capture support: No])
     633        AC_MSG_NOTICE([Note: Requires DPDK v1.5 or newer])
    609634fi
    610635reportopt "Compiled with LLVM BPF JIT support" $JIT
  • examples/Makefile.am

    r8835f5a r16cb2a2  
    11
    2 SUBDIRS=skeleton rate stats tutorial
     2SUBDIRS=skeleton rate stats tutorial parallel
  • lib/Makefile.am

    r3a333e2 rd420777  
    11lib_LTLIBRARIES = libtrace.la
    2 include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h  
     2include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h libtrace_parallel.h
    33
    4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@
    5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@
     4AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread
     5AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread
    66
    77extra_DIST = format_template.c
    8 NATIVEFORMATS=format_linux.c
     8NATIVEFORMATS=format_linux_common.c format_linux_ring.c format_linux_int.c format_linux_common.h
    99BPFFORMATS=format_bpf.c
    1010
     
    2929NATIVEFORMATS+= format_dpdk.c
    3030# So we also make libtrace.mk in dpdk otherwise automake tries to expand
    31 # it to early which I cannot seem to stop unless we use a path that
     31# it too early which I cannot seem to stop unless we use a path that
    3232# doesn't exist currently
    3333export RTE_SDK=@RTE_SDK@
     
    4444endif
    4545
    46 libtrace_la_SOURCES = trace.c common.h \
     46libtrace_la_SOURCES = trace.c trace_parallel.c common.h \
    4747                format_erf.c format_pcap.c format_legacy.c \
    4848                format_rt.c format_helper.c format_helper.h format_pcapfile.c \
     
    5757                $(DAGSOURCE) format_erf.h \
    5858                $(BPFJITSOURCE) \
    59                 libtrace_arphrd.h
     59                libtrace_arphrd.h \
     60                data-struct/ring_buffer.c data-struct/vector.c \
     61                data-struct/message_queue.c data-struct/deque.c \
     62                data-struct/sliding_window.c data-struct/object_cache.c \
     63                data-struct/linked_list.c hash_toeplitz.c combiner_ordered.c \
     64                combiner_sorted.c combiner_unordered.c \
     65                pthread_spinlock.c pthread_spinlock.h
    6066
    6167if DAG2_4
  • lib/format_atmhdr.c

    r5952ff0 r5ab626a  
    227227        NULL,                           /* get_filtered_packets */
    228228        NULL,                           /* get_dropped_packets */
    229         NULL,                           /* get_captured_packets */
     229        NULL,                           /* get_statistics */
    230230        NULL,                           /* get_fd */
    231231        trace_event_trace,              /* trace_event */
    232232        NULL,                           /* help */
    233         NULL                            /* next pointer */
     233        NULL,                            /* next pointer */
     234        NON_PARALLEL(false)
    234235};
    235236       
  • lib/format_bpf.c

    r08f5060 r5ab626a  
    610610        NULL,                   /* get_filtered_packets */
    611611        bpf_get_dropped_packets,/* get_dropped_packets */
    612         NULL,                   /* get_captured_packets */
     612        NULL,                   /* get_statistics */
    613613        bpf_get_fd,             /* get_fd */
    614614        trace_event_device,     /* trace_event */
    615615        bpf_help,               /* help */
    616         NULL
     616        NULL,                   /* next pointer */
     617        NON_PARALLEL(true)
    617618};
    618619#else   /* HAVE_DECL_BIOCSETIF */
     
    656657        bpf_get_framing_length, /* get_framing_length */
    657658        NULL,                   /* set_capture_length */
    658         NULL,/* get_received_packets */
     659        NULL,                   /* get_received_packets */
    659660        NULL,                   /* get_filtered_packets */
    660         NULL,/* get_dropped_packets */
    661         NULL,                   /* get_captured_packets */
     661        NULL,                   /* get_dropped_packets */
     662        NULL,                   /* get_statistics */
    662663        NULL,                   /* get_fd */
    663664        NULL,                   /* trace_event */
    664665        bpf_help,               /* help */
    665         NULL
     666        NULL,                   /* next pointer */
     667        NON_PARALLEL(true)
    666668};
    667669#endif  /* HAVE_DECL_BIOCSETIF */
  • lib/format_dag24.c

    rc70f59f r5ab626a  
    554554        NULL,                           /* get_filtered_packets */
    555555        dag_get_dropped_packets,        /* get_dropped_packets */
    556         NULL,                           /* get_captured_packets */
     556        NULL,                           /* get_statistics */
    557557        NULL,                           /* get_fd */
    558558        trace_event_dag,                /* trace_event */
    559559        dag_help,                       /* help */
    560         NULL                            /* next pointer */
     560        NULL,                            /* next pointer */
     561    NON_PARALLEL(true)
    561562};
    562563
  • lib/format_dag25.c

    r0054c50 rd420777  
    7979 */
    8080
    81 
    8281#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8382#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
    8484
    8585#define FORMAT_DATA DATA(libtrace)
     
    8787
    8888#define DUCK FORMAT_DATA->duck
     89
     90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
     91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
     92
    8993static struct libtrace_format_t dag;
    9094
     
    104108/* "Global" data that is stored for each DAG output trace */
    105109struct dag_format_data_out_t {
    106         /* String containing the DAG device name */
    107         char *device_name;
    108110        /* The DAG device being used for writing */
    109111        struct dag_dev_t *device;
     
    118120};
    119121
     122/* Data that is stored against each input stream */
     123struct dag_per_stream_t {
     124        /* DAG stream number */
     125        uint16_t dagstream;
     126        /* Pointer to the last unread byte in the DAG memory */
     127        uint8_t *top;
     128        /* Pointer to the first unread byte in the DAG memory */
     129        uint8_t *bottom;
     130        /* Amount of data processed from the bottom pointer */
     131        uint32_t processed;
     132        /* Number of packets seen by the stream */
     133        uint64_t pkt_count;
     134        /* Drop count for this particular stream */
     135        uint64_t drops;
     136        /* Boolean values to indicate if a particular interface has been seen
     137         * or not. This is limited to four interfaces, which is enough to
     138         * support all current DAG cards */
     139        uint8_t seeninterface[4];
     140};
     141
    120142/* "Global" data that is stored for each DAG input trace */
    121143struct dag_format_data_t {
    122 
    123         /* Data required for regular DUCK reporting */
     144        /* DAG device */
     145        struct dag_dev_t *device;
     146        /* Boolean flag indicating whether the trace is currently attached */
     147        int stream_attached;
     148        /* Data stored against each DAG input stream */
     149        libtrace_list_t *per_stream;
     150
     151        /* Data required for regular DUCK reporting.
     152         * We put this on a new cache line otherwise we have a lot of false
     153         * sharing caused by updating the last_pkt.
     154         * This should only ever be accessed by the first thread stream,
     155         * that includes both read and write operations.
     156         */
    124157        struct {
    125158                /* Timestamp of the last DUCK report */
    126159                uint32_t last_duck;
    127160                /* The number of seconds between each DUCK report */
    128                 uint32_t duck_freq;
     161                uint32_t duck_freq;
    129162                /* Timestamp of the last packet read from the DAG card */
    130                 uint32_t last_pkt;
     163                uint32_t last_pkt;
    131164                /* Dummy trace to ensure DUCK packets are dealt with using the
    132165                 * DUCK format functions */
    133                 libtrace_t *dummy_duck;
    134         } duck;
    135 
    136         /* String containing the DAG device name */
    137         char *device_name;
    138         /* The DAG device that we are reading from */
    139         struct dag_dev_t *device;
    140         /* The DAG stream that we are reading from */
    141         unsigned int dagstream;
    142         /* Boolean flag indicating whether the stream is currently attached */
    143         int stream_attached;
    144         /* Pointer to the first unread byte in the DAG memory hole */
    145         uint8_t *bottom;
    146         /* Pointer to the last unread byte in the DAG memory hole */
    147         uint8_t *top;
    148         /* The amount of data processed thus far from the bottom pointer */
    149         uint32_t processed;
    150         /* The number of packets that have been dropped */
    151         uint64_t drops;
    152 
    153         uint8_t seeninterface[4];
     166                libtrace_t *dummy_duck;
     167        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
    154168};
    155169
     
    207221
    208222/* Initialises the DAG output data structure */
    209 static void dag_init_format_out_data(libtrace_out_t *libtrace) {
    210         libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
     223static void dag_init_format_out_data(libtrace_out_t *libtrace)
     224{
     225        libtrace->format_data = (struct dag_format_data_out_t *)
     226                malloc(sizeof(struct dag_format_data_out_t));
    211227        // no DUCK on output
    212228        FORMAT_DATA_OUT->stream_attached = 0;
    213229        FORMAT_DATA_OUT->device = NULL;
    214         FORMAT_DATA_OUT->device_name = NULL;
    215230        FORMAT_DATA_OUT->dagstream = 0;
    216231        FORMAT_DATA_OUT->waiting = 0;
     
    219234
    220235/* Initialises the DAG input data structure */
    221 static void dag_init_format_data(libtrace_t *libtrace) {
     236static void dag_init_format_data(libtrace_t *libtrace)
     237{
     238        struct dag_per_stream_t stream_data;
     239
    222240        libtrace->format_data = (struct dag_format_data_t *)
    223241                malloc(sizeof(struct dag_format_data_t));
    224242        DUCK.last_duck = 0;
    225         DUCK.duck_freq = 0;
    226         DUCK.last_pkt = 0;
    227         DUCK.dummy_duck = NULL;
    228         FORMAT_DATA->stream_attached = 0;
    229         FORMAT_DATA->drops = 0;
    230         FORMAT_DATA->device_name = NULL;
    231         FORMAT_DATA->device = NULL;
    232         FORMAT_DATA->dagstream = 0;
    233         FORMAT_DATA->processed = 0;
    234         FORMAT_DATA->bottom = NULL;
    235         FORMAT_DATA->top = NULL;
    236         memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
     243        DUCK.duck_freq = 0;
     244        DUCK.last_pkt = 0;
     245        DUCK.dummy_duck = NULL;
     246
     247        FORMAT_DATA->per_stream =
     248                libtrace_list_init(sizeof(stream_data));
     249        assert(FORMAT_DATA->per_stream != NULL);
     250
     251        /* We'll start with just one instance of stream_data, and we'll
     252         * add more later if we need them */
     253        memset(&stream_data, 0, sizeof(stream_data));
     254        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
    237255}
    238256
     
    241259 *
    242260 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    243 static struct dag_dev_t *dag_find_open_device(char *dev_name) {
     261static struct dag_dev_t *dag_find_open_device(char *dev_name)
     262{
    244263        struct dag_dev_t *dag_dev;
    245264
     
    252271                        dag_dev->ref_count ++;
    253272                        return dag_dev;
    254 
    255273                }
    256274                dag_dev = dag_dev->next;
    257275        }
    258276        return NULL;
    259 
    260 
    261277}
    262278
     
    267283 *
    268284 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    269 static void dag_close_device(struct dag_dev_t *dev) {
     285static void dag_close_device(struct dag_dev_t *dev)
     286{
    270287        /* Need to remove from the device list */
    271 
    272288        assert(dev->ref_count == 0);
    273289
     
    290306 *
    291307 * NOTE: this function should only be called when opening a DAG device for
    292  * writing - there is little practical difference between this and the 
     308 * writing - there is little practical difference between this and the
    293309 * function below that covers the reading case, but we need the output trace
    294  * object to report errors properly so the two functions take slightly 
     310 * object to report errors properly so the two functions take slightly
    295311 * different arguments. This is really lame and there should be a much better
    296312 * way of doing this.
    297313 *
    298  * NOTE: This function assumes the open_dag_mutex is held by the caller 
     314 * NOTE: This function assumes the open_dag_mutex is held by the caller
    299315 */
    300 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
     316static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
     317                                                char *dev_name)
     318{
    301319        struct stat buf;
    302320        int fd;
     
    307325                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
    308326                return NULL;
    309 }
     327        }
    310328
    311329        /* Make sure it is the appropriate type of device */
     
    344362 *
    345363 * NOTE: this function should only be called when opening a DAG device for
    346  * reading - there is little practical difference between this and the 
     364 * reading - there is little practical difference between this and the
    347365 * function above that covers the writing case, but we need the input trace
    348  * object to report errors properly so the two functions take slightly 
     366 * object to report errors properly so the two functions take slightly
    349367 * different arguments. This is really lame and there should be a much better
    350368 * way of doing this.
     
    357375
    358376        /* Make sure the device exists */
    359         if (stat(dev_name, &buf) == -1) {
    360                 trace_set_err(libtrace,errno,"stat(%s)",dev_name);
    361                 return NULL;
    362         }
     377        if (stat(dev_name, &buf) == -1) {
     378                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
     379                return NULL;
     380        }
    363381
    364382        /* Make sure it is the appropriate type of device */
     
    366384                /* Try opening the DAG device */
    367385                if((fd = dag_open(dev_name)) < 0) {
    368                         trace_set_err(libtrace,errno,"Cannot open DAG %s",
    369                                         dev_name);
    370                         return NULL;
    371                 }
     386                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
     387                                      dev_name);
     388                        return NULL;
     389                }
    372390        } else {
    373391                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
    374                                 dev_name);
    375                 return NULL;
    376         }
     392                              dev_name);
     393                return NULL;
     394        }
    377395
    378396        /* Add the device to our device list - it is just a doubly linked
     
    395413
    396414/* Creates and initialises a DAG output trace */
    397 static int dag_init_output(libtrace_out_t *libtrace) {
     415static int dag_init_output(libtrace_out_t *libtrace)
     416{
     417        /* Upon successful creation, the device name is stored against the
     418         * device and free when it is free()d */
     419        char *dag_dev_name = NULL;
    398420        char *scan = NULL;
    399421        struct dag_dev_t *dag_device = NULL;
    400422        int stream = 1;
    401        
     423
    402424        /* XXX I don't know if this is important or not, but this function
    403425         * isn't present in all of the driver releases that this code is
     
    409431
    410432        dag_init_format_out_data(libtrace);
    411         /* Grab the mutex while we're likely to be messing with the device 
     433        /* Grab the mutex while we're likely to be messing with the device
    412434         * list */
    413435        pthread_mutex_lock(&open_dag_mutex);
    414        
     436
    415437        /* Specific streams are signified using a comma in the libtrace URI,
    416438         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
     
    418440         * If no stream is specified, we will write using stream 1 */
    419441        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    420                 FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
     442                dag_dev_name = strdup(libtrace->uridata);
    421443        } else {
    422                 FORMAT_DATA_OUT->device_name =
    423                                 (char *)strndup(libtrace->uridata,
     444                dag_dev_name = (char *)strndup(libtrace->uridata,
    424445                                (size_t)(scan - libtrace->uridata));
    425446                stream = atoi(++scan);
     
    428449
    429450        /* See if our DAG device is already open */
    430         dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
     451        dag_device = dag_find_open_device(dag_dev_name);
    431452
    432453        if (dag_device == NULL) {
    433454                /* Device not yet opened - open it ourselves */
    434                 dag_device = dag_open_output_device(libtrace,
    435                                 FORMAT_DATA_OUT->device_name);
     455                dag_device = dag_open_output_device(libtrace, dag_dev_name);
     456        } else {
     457                /* Otherwise, just use the existing one */
     458                free(dag_dev_name);
     459                dag_dev_name = NULL;
    436460        }
    437461
    438462        /* Make sure we have successfully opened a DAG device */
    439463        if (dag_device == NULL) {
    440                 if (FORMAT_DATA_OUT->device_name) {
    441                         free(FORMAT_DATA_OUT->device_name);
    442                         FORMAT_DATA_OUT->device_name = NULL;
     464                if (dag_dev_name) {
     465                        free(dag_dev_name);
    443466                }
    444467                pthread_mutex_unlock(&open_dag_mutex);
     
    453476/* Creates and initialises a DAG input trace */
    454477static int dag_init_input(libtrace_t *libtrace) {
     478        /* Upon successful creation, the device name is stored against the
     479         * device and free when it is free()d */
     480        char *dag_dev_name = NULL;
    455481        char *scan = NULL;
    456482        int stream = 0;
     
    458484
    459485        dag_init_format_data(libtrace);
    460         /* Grab the mutex while we're likely to be messing with the device 
     486        /* Grab the mutex while we're likely to be messing with the device
    461487         * list */
    462488        pthread_mutex_lock(&open_dag_mutex);
    463        
    464        
    465         /* Specific streams are signified using a comma in the libtrace URI,
     489
     490
     491        /* DAG cards support multiple streams. In a single threaded capture,
     492         * these are specified using a comma in the libtrace URI,
    466493         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    467494         *
    468          * If no stream is specified, we will read from stream 0 */
     495         * If no stream is specified, we will read from stream 0 with
     496         * one thread
     497         */
    469498        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    470                 FORMAT_DATA->device_name = strdup(libtrace->uridata);
     499                dag_dev_name = strdup(libtrace->uridata);
    471500        } else {
    472                 FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
     501                dag_dev_name = (char *)strndup(libtrace->uridata,
    473502                                (size_t)(scan - libtrace->uridata));
    474503                stream = atoi(++scan);
    475504        }
    476505
    477         FORMAT_DATA->dagstream = stream;
     506        FORMAT_DATA_FIRST->dagstream = stream;
    478507
    479508        /* See if our DAG device is already open */
    480         dag_device = dag_find_open_device(FORMAT_DATA->device_name);
     509        dag_device = dag_find_open_device(dag_dev_name);
    481510
    482511        if (dag_device == NULL) {
    483512                /* Device not yet opened - open it ourselves */
    484                 dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
     513                dag_device = dag_open_device(libtrace, dag_dev_name);
     514        } else {
     515                /* Otherwise, just use the existing one */
     516                free(dag_dev_name);
     517                dag_dev_name = NULL;
    485518        }
    486519
    487520        /* Make sure we have successfully opened a DAG device */
    488521        if (dag_device == NULL) {
    489                 if (FORMAT_DATA->device_name)
    490                         free(FORMAT_DATA->device_name);
    491                 FORMAT_DATA->device_name = NULL;
     522                if (dag_dev_name)
     523                        free(dag_dev_name);
     524                dag_dev_name = NULL;
    492525                pthread_mutex_unlock(&open_dag_mutex);
    493526                return -1;
     
    496529        FORMAT_DATA->device = dag_device;
    497530
    498         /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
    499         /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
    500  
    501         *  The symptom of the port being disabled is that libtrace will appear to hang.
    502         */
     531        /* See Config_Status_API_Programming_Guide.pdf from the Endace
     532           Dag Documentation */
     533        /* Check kBooleanAttributeActive is true -- no point capturing
     534         * on an interface that's disabled
     535         *
     536         * The symptom of the port being disabled is that libtrace
     537         * will appear to hang. */
    503538        /* Check kBooleanAttributeFault is false */
    504539        /* Check kBooleanAttributeLocalFault is false */
     
    506541        /* Check kBooleanAttributePeerLink ? */
    507542
    508         /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
     543        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
     544           on libtrace promisc attribute?*/
    509545        /* Set kUint32AttributeSnapLength to the snaplength */
    510546
    511547        pthread_mutex_unlock(&open_dag_mutex);
    512         return 0;
     548        return 0;
    513549}
    514550
    515551/* Configures a DAG input trace */
    516552static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
    517                                 void *data) {
    518         char conf_str[4096];
     553                            void *data)
     554{
     555        char conf_str[4096];
    519556        switch(option) {
    520                 case TRACE_OPTION_META_FREQ:
    521                         /* This option is used to specify the frequency of DUCK
    522                          * updates */
    523                         DUCK.duck_freq = *(int *)data;
    524                         return 0;
    525                 case TRACE_OPTION_SNAPLEN:
    526                         /* Tell the card our new snap length */
    527                         snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
    528                         if (dag_configure(FORMAT_DATA->device->fd,
    529                                                 conf_str) != 0) {
    530                                 trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
    531                                 return -1;
    532                         }
    533                         return 0;
    534                 case TRACE_OPTION_PROMISC:
    535                         /* DAG already operates in a promisc fashion */
    536                         return -1;
    537                 case TRACE_OPTION_FILTER:
    538                         /* We don't yet support pushing filters into DAG
    539                          * cards */
    540                         return -1;
    541                 case TRACE_OPTION_EVENT_REALTIME:
    542                         /* Live capture is always going to be realtime */
     557        case TRACE_OPTION_META_FREQ:
     558                /* This option is used to specify the frequency of DUCK
     559                 * updates */
     560                DUCK.duck_freq = *(int *)data;
     561                return 0;
     562        case TRACE_OPTION_SNAPLEN:
     563                /* Tell the card our new snap length */
     564                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
     565                if (dag_configure(FORMAT_DATA->device->fd,
     566                                  conf_str) != 0) {
     567                        trace_set_err(libtrace, errno, "Failed to configure "
     568                                      "snaplen on DAG card: %s",
     569                                      libtrace->uridata);
    543570                        return -1;
    544         }
     571                }
     572                return 0;
     573        case TRACE_OPTION_PROMISC:
     574                /* DAG already operates in a promisc fashion */
     575                return -1;
     576        case TRACE_OPTION_FILTER:
     577                /* We don't yet support pushing filters into DAG
     578                 * cards */
     579                return -1;
     580        case TRACE_OPTION_EVENT_REALTIME:
     581                /* Live capture is always going to be realtime */
     582                return -1;
     583        case TRACE_OPTION_HASHER:
     584                /* Lets just say we did this, it's currently still up to
     585                 * the user to configure this correctly. */
     586                return 0;
     587        }
    545588        return -1;
    546589}
    547590
    548591/* Starts a DAG output trace */
    549 static int dag_start_output(libtrace_out_t *libtrace) {
     592static int dag_start_output(libtrace_out_t *libtrace)
     593{
    550594        struct timeval zero, nopoll;
    551595
     
    555599
    556600        /* Attach and start the DAG stream */
    557 
    558601        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
    559602                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
     
    570613
    571614        /* We don't want the dag card to do any sleeping */
    572 
    573615        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
    574616                        FORMAT_DATA_OUT->dagstream, 0, &zero,
     
    578620}
    579621
    580 /* Starts a DAG input trace */
    581 static int dag_start_input(libtrace_t *libtrace) {
    582         struct timeval zero, nopoll;
    583         uint8_t *top, *bottom, *starttop;
     622static int dag_start_input_stream(libtrace_t *libtrace,
     623                                  struct dag_per_stream_t * stream) {
     624        struct timeval zero, nopoll;
     625        uint8_t *top, *bottom, *starttop;
    584626        top = bottom = NULL;
    585627
    586628        zero.tv_sec = 0;
    587         zero.tv_usec = 10000;
    588         nopoll = zero;
     629        zero.tv_usec = 10000;
     630        nopoll = zero;
    589631
    590632        /* Attach and start the DAG stream */
    591633        if (dag_attach_stream(FORMAT_DATA->device->fd,
    592                                 FORMAT_DATA->dagstream, 0, 0) < 0) {
    593                 trace_set_err(libtrace, errno, "Cannot attach DAG stream");
    594                 return -1;
    595         }
     634                              stream->dagstream, 0, 0) < 0) {
     635                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
     636                              stream->dagstream);
     637                return -1;
     638        }
    596639
    597640        if (dag_start_stream(FORMAT_DATA->device->fd,
    598                                 FORMAT_DATA->dagstream) < 0) {
    599                 trace_set_err(libtrace, errno, "Cannot start DAG stream");
    600                 return -1;
    601         }
     641                             stream->dagstream) < 0) {
     642                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
     643                              stream->dagstream);
     644                return -1;
     645        }
    602646        FORMAT_DATA->stream_attached = 1;
    603        
     647
    604648        /* We don't want the dag card to do any sleeping */
    605         dag_set_stream_poll(FORMAT_DATA->device->fd,
    606                                 FORMAT_DATA->dagstream, 0, &zero,
    607                                 &nopoll);
     649        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
     650                            stream->dagstream, 0, &zero,
     651                            &nopoll) < 0) {
     652                trace_set_err(libtrace, errno,
     653                              "dag_set_stream_poll failed!");
     654                return -1;
     655        }
    608656
    609657        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
    610                                         FORMAT_DATA->dagstream,
    611                                         &bottom);
     658                                      stream->dagstream,
     659                                      &bottom);
    612660
    613661        /* Should probably flush the memory hole now */
     
    616664                bottom += (starttop - bottom);
    617665                top = dag_advance_stream(FORMAT_DATA->device->fd,
    618                                         FORMAT_DATA->dagstream,
    619                                         &bottom);
    620         }
    621         FORMAT_DATA->top = top;
    622         FORMAT_DATA->bottom = bottom;
    623         FORMAT_DATA->processed = 0;
    624         FORMAT_DATA->drops = 0;
     666                                         stream->dagstream,
     667                                         &bottom);
     668        }
     669        stream->top = top;
     670        stream->bottom = bottom;
     671        stream->processed = 0;
     672        stream->drops = 0;
    625673
    626674        return 0;
     675
     676}
     677
     678/* Starts a DAG input trace */
     679static int dag_start_input(libtrace_t *libtrace)
     680{
     681        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
     682}
     683
     684static int dag_pstart_input(libtrace_t *libtrace)
     685{
     686        char *scan, *tok;
     687        uint16_t stream_count = 0, max_streams;
     688        int iserror = 0;
     689        struct dag_per_stream_t stream_data;
     690
     691        /* Check we aren't trying to create more threads than the DAG card can
     692         * handle */
     693        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
     694        if (libtrace->perpkt_thread_count > max_streams) {
     695                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     696                              "trying to create too many threads (max is %u)",
     697                              max_streams);
     698                iserror = 1;
     699                goto cleanup;
     700        }
     701
     702        /* Get the stream names from the uri */
     703        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     704                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     705                              "format uri doesn't specify the DAG streams");
     706                iserror = 1;
     707                goto cleanup;
     708        }
     709
     710        scan++;
     711
     712        tok = strtok(scan, ",");
     713        while (tok != NULL) {
     714                /* Ensure we haven't specified too many streams */
     715                if (stream_count >= libtrace->perpkt_thread_count) {
     716                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     717                                      "format uri specifies too many streams. "
     718                                      "Max is %u", max_streams);
     719                        iserror = 1;
     720                        goto cleanup;
     721                }
     722
     723                /* Save the stream details */
     724                if (stream_count == 0) {
     725                        /* Special case where we update the existing stream
     726                         * data structure */
     727                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
     728                } else {
     729                        memset(&stream_data, 0, sizeof(stream_data));
     730                        stream_data.dagstream = (uint16_t)atoi(tok);
     731                        libtrace_list_push_back(FORMAT_DATA->per_stream,
     732                                                &stream_data);
     733                }
     734
     735                stream_count++;
     736                tok = strtok(NULL, ",");
     737        }
     738
     739        FORMAT_DATA->stream_attached = 1;
     740
     741 cleanup:
     742        if (iserror) {
     743                return -1;
     744        } else {
     745                return 0;
     746        }
    627747}
    628748
    629749/* Pauses a DAG output trace */
    630 static int dag_pause_output(libtrace_out_t *libtrace) {
    631 
     750static int dag_pause_output(libtrace_out_t *libtrace)
     751{
    632752        /* Stop and detach the stream */
    633753        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
    634                         FORMAT_DATA_OUT->dagstream) < 0) {
     754                            FORMAT_DATA_OUT->dagstream) < 0) {
    635755                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
    636756                return -1;
    637757        }
    638758        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
    639                         FORMAT_DATA_OUT->dagstream) < 0) {
    640                 trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
     759                              FORMAT_DATA_OUT->dagstream) < 0) {
     760                trace_set_err_out(libtrace, errno,
     761                                  "Could not detach DAG stream");
    641762                return -1;
    642763        }
     
    646767
    647768/* Pauses a DAG input trace */
    648 static int dag_pause_input(libtrace_t *libtrace) {
    649 
    650         /* Stop and detach the stream */
    651         if (dag_stop_stream(FORMAT_DATA->device->fd,
    652                                 FORMAT_DATA->dagstream) < 0) {
    653                 trace_set_err(libtrace, errno, "Could not stop DAG stream");
    654                 return -1;
    655         }
    656         if (dag_detach_stream(FORMAT_DATA->device->fd,
    657                                 FORMAT_DATA->dagstream) < 0) {
    658                 trace_set_err(libtrace, errno, "Could not detach DAG stream");
    659                 return -1;
    660         }
     769static int dag_pause_input(libtrace_t *libtrace)
     770{
     771        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     772
     773        /* Stop and detach each stream */
     774        while (tmp != NULL) {
     775                if (dag_stop_stream(FORMAT_DATA->device->fd,
     776                                    STREAM_DATA(tmp)->dagstream) < 0) {
     777                        trace_set_err(libtrace, errno,
     778                                      "Could not stop DAG stream");
     779                        printf("Count not stop DAG stream\n");
     780                        return -1;
     781                }
     782                if (dag_detach_stream(FORMAT_DATA->device->fd,
     783                                      STREAM_DATA(tmp)->dagstream) < 0) {
     784                        trace_set_err(libtrace, errno,
     785                                      "Could not detach DAG stream");
     786                        printf("Count not detach DAG stream\n");
     787                        return -1;
     788                }
     789
     790                tmp = tmp->next;
     791        }
     792
    661793        FORMAT_DATA->stream_attached = 0;
    662794        return 0;
    663795}
    664796
     797
     798
    665799/* Closes a DAG input trace */
    666 static int dag_fin_input(libtrace_t *libtrace) {
     800static int dag_fin_input(libtrace_t *libtrace)
     801{
    667802        /* Need the lock, since we're going to be handling the device list */
    668803        pthread_mutex_lock(&open_dag_mutex);
    669        
     804
    670805        /* Detach the stream if we are not paused */
    671806        if (FORMAT_DATA->stream_attached)
    672807                dag_pause_input(libtrace);
    673         FORMAT_DATA->device->ref_count --;
     808        FORMAT_DATA->device->ref_count--;
    674809
    675810        /* Close the DAG device if there are no more references to it */
    676811        if (FORMAT_DATA->device->ref_count == 0)
    677812                dag_close_device(FORMAT_DATA->device);
     813
    678814        if (DUCK.dummy_duck)
    679815                trace_destroy_dead(DUCK.dummy_duck);
    680         if (FORMAT_DATA->device_name)
    681                 free(FORMAT_DATA->device_name);
     816
     817        /* Clear the list */
     818        libtrace_list_deinit(FORMAT_DATA->per_stream);
    682819        free(libtrace->format_data);
    683820        pthread_mutex_unlock(&open_dag_mutex);
    684         return 0; /* success */
     821        return 0; /* success */
    685822}
    686823
    687824/* Closes a DAG output trace */
    688 static int dag_fin_output(libtrace_out_t *libtrace) {
    689        
     825static int dag_fin_output(libtrace_out_t *libtrace)
     826{
     827
    690828        /* Commit any outstanding traffic in the txbuffer */
    691829        if (FORMAT_DATA_OUT->waiting) {
    692                 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    693                                 FORMAT_DATA_OUT->waiting );
    694         }
    695 
    696         /* Wait until the buffer is nearly clear before exiting the program,
     830                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     831                                           FORMAT_DATA_OUT->dagstream,
     832                                           FORMAT_DATA_OUT->waiting );
     833        }
     834
     835        /* Wait until the buffer is nearly clear before exiting the program,
    697836         * as we will lose packets otherwise */
    698         dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    699                         FORMAT_DATA_OUT->dagstream,
    700                         dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
    701                                         FORMAT_DATA_OUT->dagstream) - 8
    702                         );
     837        dag_tx_get_stream_space
     838                (FORMAT_DATA_OUT->device->fd,
     839                 FORMAT_DATA_OUT->dagstream,
     840                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
     841                                            FORMAT_DATA_OUT->dagstream) - 8);
    703842
    704843        /* Need the lock, since we're going to be handling the device list */
     
    713852        if (FORMAT_DATA_OUT->device->ref_count == 0)
    714853                dag_close_device(FORMAT_DATA_OUT->device);
    715         if (FORMAT_DATA_OUT->device_name)
    716                 free(FORMAT_DATA_OUT->device_name);
    717854        free(libtrace->format_data);
    718855        pthread_mutex_unlock(&open_dag_mutex);
     
    750887
    751888        /* Allocate memory for the DUCK data */
    752         if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
    753                         !packet->buffer) {
    754                 packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
    755                 packet->buf_control = TRACE_CTRL_PACKET;
    756                 if (!packet->buffer) {
    757                         trace_set_err(libtrace, errno,
    758                                         "Cannot allocate packet buffer");
    759                         return -1;
    760                 }
    761         }
     889        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
     890            !packet->buffer) {
     891                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
     892                packet->buf_control = TRACE_CTRL_PACKET;
     893                if (!packet->buffer) {
     894                        trace_set_err(libtrace, errno,
     895                                      "Cannot allocate packet buffer");
     896                        return -1;
     897                }
     898        }
    762899
    763900        /* DUCK doesn't have a format header */
    764         packet->header = 0;
    765         packet->payload = packet->buffer;
    766 
    767         /* No need to check if we can get DUCK or not - we're modern
    768         * enough so just grab the DUCK info */
    769         if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
    770                                         (duckinf_t *)packet->payload) < 0)) {
    771                 trace_set_err(libtrace, errno, "Error using DUCK ioctl");
     901        packet->header = 0;
     902        packet->payload = packet->buffer;
     903
     904        /* No need to check if we can get DUCK or not - we're modern
     905        * enough so just grab the DUCK info */
     906        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
     907                   (duckinf_t *)packet->payload) < 0)) {
     908                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
    772909                DUCK.duck_freq = 0;
    773                 return -1;
    774         }
    775 
    776         packet->type = LIBTRACE_DUCK_VERSION;
     910                return -1;
     911        }
     912
     913        packet->type = LIBTRACE_DUCK_VERSION;
    777914
    778915        /* Set the packet's trace to point at a DUCK trace, so that the
    779916         * DUCK format functions will be called on the packet rather than the
    780917         * DAG ones */
    781         if (!DUCK.dummy_duck)
    782                 DUCK.dummy_duck = trace_create_dead("duck:dummy");
    783         packet->trace = DUCK.dummy_duck;
    784         DUCK.last_duck = DUCK.last_pkt;
    785         return sizeof(duckinf_t);
     918        if (!DUCK.dummy_duck)
     919                DUCK.dummy_duck = trace_create_dead("duck:dummy");
     920        packet->trace = DUCK.dummy_duck;
     921        DUCK.last_duck = DUCK.last_pkt;
     922        packet->error = sizeof(duckinf_t);
     923        return sizeof(duckinf_t);
    786924}
    787925
    788926/* Determines the amount of data available to read from the DAG card */
    789 static int dag_available(libtrace_t *libtrace) {
    790         uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     927static int dag_available(libtrace_t *libtrace,
     928                         struct dag_per_stream_t *stream_data)
     929{
     930        uint32_t diff = stream_data->top - stream_data->bottom;
    791931
    792932        /* If we've processed more than 4MB of data since we last called
    793933         * dag_advance_stream, then we should call it again to allow the
    794934         * space occupied by that 4MB to be released */
    795         if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
     935        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
    796936                return diff;
    797        
     937
    798938        /* Update the top and bottom pointers */
    799         FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
    800                         FORMAT_DATA->dagstream,
    801                         &(FORMAT_DATA->bottom));
    802        
    803         if (FORMAT_DATA->top == NULL) {
     939        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
     940                                              stream_data->dagstream,
     941                                              &(stream_data->bottom));
     942
     943        if (stream_data->top == NULL) {
    804944                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
    805945                return -1;
    806946        }
    807         FORMAT_DATA->processed = 0;
    808         diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     947        stream_data->processed = 0;
     948        diff = stream_data->top - stream_data->bottom;
    809949        return diff;
    810950}
    811951
    812952/* Returns a pointer to the start of the next complete ERF record */
    813 static dag_record_t *dag_get_record(libtrace_t *libtrace) {
    814         dag_record_t *erfptr = NULL;
    815         uint16_t size;
    816         erfptr = (dag_record_t *)FORMAT_DATA->bottom;
     953static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
     954{
     955        dag_record_t *erfptr = NULL;
     956        uint16_t size;
     957
     958        erfptr = (dag_record_t *)stream_data->bottom;
    817959        if (!erfptr)
    818                 return NULL;
    819         size = ntohs(erfptr->rlen);
    820         assert( size >= dag_record_size );
     960                return NULL;
     961
     962        size = ntohs(erfptr->rlen);
     963        assert( size >= dag_record_size );
     964
    821965        /* Make certain we have the full packet available */
    822         if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
     966        if (size > (stream_data->top - stream_data->bottom))
    823967                return NULL;
    824         FORMAT_DATA->bottom += size;
    825         FORMAT_DATA->processed += size;
     968
     969        stream_data->bottom += size;
     970        stream_data->processed += size;
    826971        return erfptr;
    827972}
     
    829974/* Converts a buffer containing a recently read DAG packet record into a
    830975 * libtrace packet */
    831 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
    832                 void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
    833 
     976static int dag_prepare_packet_stream(libtrace_t *libtrace,
     977                                     struct dag_per_stream_t *stream_data,
     978                                     libtrace_packet_t *packet,
     979                                     void *buffer, libtrace_rt_types_t rt_type,
     980                                     uint32_t flags)
     981{
    834982        dag_record_t *erfptr;
    835        
     983
    836984        /* If the packet previously owned a buffer that is not the buffer
    837         * that contains the new packet data, we're going to need to free the
    838         * old one to avoid memory leaks */
     985        * that contains the new packet data, we're going to need to free the
     986        * old one to avoid memory leaks */
    839987        if (packet->buffer != buffer &&
    840                         packet->buf_control == TRACE_CTRL_PACKET) {
     988            packet->buf_control == TRACE_CTRL_PACKET) {
    841989                free(packet->buffer);
    842990        }
     
    851999        erfptr = (dag_record_t *)buffer;
    8521000        packet->buffer = erfptr;
    853         packet->header = erfptr;
    854         packet->type = rt_type;
     1001        packet->header = erfptr;
     1002        packet->type = rt_type;
    8551003
    8561004        if (erfptr->flags.rxerror == 1) {
    857                 /* rxerror means the payload is corrupt - drop the payload
    858                 * by tweaking rlen */
    859                 packet->payload = NULL;
    860                 erfptr->rlen = htons(erf_get_framing_length(packet));
    861         } else {
    862                 packet->payload = (char*)packet->buffer
    863                         + erf_get_framing_length(packet);
    864         }
     1005                /* rxerror means the payload is corrupt - drop the payload
     1006                * by tweaking rlen */
     1007                packet->payload = NULL;
     1008                erfptr->rlen = htons(erf_get_framing_length(packet));
     1009        } else {
     1010                packet->payload = (char*)packet->buffer
     1011                        + erf_get_framing_length(packet);
     1012        }
    8651013
    8661014        if (libtrace->format_data == NULL) {
     
    8691017
    8701018        /* Update the dropped packets counter */
    871 
    872         /* No loss counter for DSM coloured records - have to use
    873          * some other API */
     1019        /* No loss counter for DSM coloured records - have to use some
     1020         * other API */
    8741021        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    8751022                /* TODO */
    8761023        } else {
    8771024                /* Use the ERF loss counter */
    878                 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
    879                         FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
     1025                if (stream_data->seeninterface[erfptr->flags.iface]
     1026                    == 0) {
     1027                        stream_data->seeninterface[erfptr->flags.iface]
     1028                                = 1;
    8801029                } else {
    881                         FORMAT_DATA->drops += ntohs(erfptr->lctr);
     1030                        stream_data->drops += ntohs(erfptr->lctr);
    8821031                }
    8831032        }
    8841033
    8851034        return 0;
     1035}
     1036
     1037static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
     1038                              void *buffer, libtrace_rt_types_t rt_type,
     1039                              uint32_t flags)
     1040{
     1041        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
     1042                                       buffer, rt_type, flags);
    8861043}
    8871044
     
    8961053/* Pushes an ERF record onto the transmit stream */
    8971054static int dag_dump_packet(libtrace_out_t *libtrace,
    898                 dag_record_t *erfptr, unsigned int pad, void *buffer) {
     1055                           dag_record_t *erfptr, unsigned int pad,
     1056                           void *buffer)
     1057{
    8991058        int size;
    9001059
    9011060        /*
    902          * If we've got 0 bytes waiting in the txqueue, assume that we haven't
    903          * requested any space yet, and request some, storing the pointer at
    904          * FORMAT_DATA_OUT->txbuffer.
     1061         * If we've got 0 bytes waiting in the txqueue, assume that we
     1062         * haven't requested any space yet, and request some, storing
     1063         * the pointer at FORMAT_DATA_OUT->txbuffer.
    9051064         *
    9061065         * The amount to request is slightly magical at the moment - it's
     
    9091068         */
    9101069        if (FORMAT_DATA_OUT->waiting == 0) {
    911                 FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    912                                 FORMAT_DATA_OUT->dagstream, 16908288);
     1070                FORMAT_DATA_OUT->txbuffer =
     1071                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
     1072                                                FORMAT_DATA_OUT->dagstream,
     1073                                                16908288);
    9131074        }
    9141075
     
    9171078         * are in contiguous memory
    9181079         */
    919         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
     1080        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
     1081               (dag_record_size + pad));
    9201082        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
    921 
    922 
    9231083
    9241084        /*
     
    9271087         */
    9281088        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
    929         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
     1089        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
     1090               size);
    9301091        FORMAT_DATA_OUT->waiting += size;
    9311092
     
    9361097         * case there is still data in the buffer at program exit.
    9371098         */
    938 
    9391099        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
    940                 FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    941                         FORMAT_DATA_OUT->waiting );
     1100                FORMAT_DATA_OUT->txbuffer =
     1101                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     1102                                                   FORMAT_DATA_OUT->dagstream,
     1103                                                   FORMAT_DATA_OUT->waiting);
    9421104                FORMAT_DATA_OUT->waiting = 0;
    9431105        }
    9441106
    9451107        return size + pad + dag_record_size;
    946 
    9471108}
    9481109
     
    9501111 * if one is found, false otherwise */
    9511112static bool find_compatible_linktype(libtrace_out_t *libtrace,
    952                                 libtrace_packet_t *packet, char *type)
    953 {
    954          // Keep trying to simplify the packet until we can find
    955          //something we can do with it
     1113                                     libtrace_packet_t *packet, char *type)
     1114{
     1115        /* Keep trying to simplify the packet until we can find
     1116         * something we can do with it */
    9561117
    9571118        do {
    958                 *type=libtrace_to_erf_type(trace_get_link_type(packet));
    959 
    960                 // Success
     1119                *type = libtrace_to_erf_type(trace_get_link_type(packet));
     1120
     1121                /* Success */
    9611122                if (*type != (char)-1)
    9621123                        return true;
     
    9641125                if (!demote_packet(packet)) {
    9651126                        trace_set_err_out(libtrace,
    966                                         TRACE_ERR_NO_CONVERSION,
    967                                         "No erf type for packet (%i)",
    968                                         trace_get_link_type(packet));
     1127                                          TRACE_ERR_NO_CONVERSION,
     1128                                          "No erf type for packet (%i)",
     1129                                          trace_get_link_type(packet));
    9691130                        return false;
    9701131                }
     
    9761137
    9771138/* Writes a packet to the provided DAG output trace */
    978 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    979         /*
    980          * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
    981          * sucks, sorry about that.
     1139static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
     1140{
     1141        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
     1142         * coding sucks, sorry about that.
    9821143         */
    9831144        unsigned int pad = 0;
     
    9881149
    9891150        if(!packet->header) {
    990                 /* No header, probably an RT packet. Lifted from 
     1151                /* No header, probably an RT packet. Lifted from
    9911152                 * erf_write_packet(). */
    9921153                return -1;
     
    10071168
    10081169        if (packet->type == TRACE_RT_DATA_ERF) {
    1009                 numbytes = dag_dump_packet(libtrace,
    1010                                 header,
    1011                                 pad,
    1012                                 payload
    1013                                 );
    1014 
     1170                numbytes = dag_dump_packet(libtrace, header, pad, payload);
    10151171        } else {
    10161172                /* Build up a new packet header from the existing header */
    10171173
    1018                 /* Simplify the packet first - if we can't do this, break 
     1174                /* Simplify the packet first - if we can't do this, break
    10191175                 * early */
    10201176                if (!find_compatible_linktype(libtrace,packet,&erf_type))
     
    10351191
    10361192                /* Packet length (rlen includes format overhead) */
    1037                 assert(trace_get_capture_length(packet)>0
    1038                                 && trace_get_capture_length(packet)<=65536);
    1039                 assert(erf_get_framing_length(packet)>0
    1040                                 && trace_get_framing_length(packet)<=65536);
    1041                 assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
    1042                       &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
     1193                assert(trace_get_capture_length(packet) > 0
     1194                       && trace_get_capture_length(packet) <= 65536);
     1195                assert(erf_get_framing_length(packet) > 0
     1196                       && trace_get_framing_length(packet) <= 65536);
     1197                assert(trace_get_capture_length(packet) +
     1198                       erf_get_framing_length(packet) > 0
     1199                       && trace_get_capture_length(packet) +
     1200                       erf_get_framing_length(packet) <= 65536);
    10431201
    10441202                erfhdr.rlen = htons(trace_get_capture_length(packet)
    1045                         + erf_get_framing_length(packet));
     1203                                    + erf_get_framing_length(packet));
    10461204
    10471205
     
    10521210
    10531211                /* Write it out */
    1054                 numbytes = dag_dump_packet(libtrace,
    1055                                 &erfhdr,
    1056                                 pad,
    1057                                 payload);
    1058 
     1212                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
    10591213        }
    10601214
     
    10661220 * If DUCK reporting is enabled, the packet returned may be a DUCK update
    10671221 */
    1068 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1069         int size = 0;
    1070         struct timeval tv;
    1071         dag_record_t *erfptr = NULL;
     1222static int dag_read_packet_stream(libtrace_t *libtrace,
     1223                                struct dag_per_stream_t *stream_data,
     1224                                libtrace_thread_t *t, /* Optional */
     1225                                libtrace_packet_t *packet)
     1226{
     1227        int size = 0;
     1228        dag_record_t *erfptr = NULL;
     1229        struct timeval tv;
    10721230        int numbytes = 0;
    10731231        uint32_t flags = 0;
    1074         struct timeval maxwait;
    1075         struct timeval pollwait;
     1232        struct timeval maxwait, pollwait;
    10761233
    10771234        pollwait.tv_sec = 0;
     
    10801237        maxwait.tv_usec = 250000;
    10811238
    1082         /* Check if we're due for a DUCK report */
    1083         size = dag_get_duckinfo(libtrace, packet);
    1084 
    1085         if (size != 0)
    1086                 return size;
     1239        /* Check if we're due for a DUCK report - only report on the first thread */
     1240        if (stream_data == FORMAT_DATA_FIRST) {
     1241                size = dag_get_duckinfo(libtrace, packet);
     1242                if (size != 0)
     1243                        return size;
     1244        }
    10871245
    10881246
     
    10921250        /* If the packet buffer is currently owned by libtrace, free it so
    10931251         * that we can set the packet to point into the DAG memory hole */
    1094         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1095                 free(packet->buffer);
    1096                 packet->buffer = 0;
    1097         }
    1098        
    1099         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1100                         FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait,
    1101                         &pollwait) == -1)
    1102         {
     1252        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1253                free(packet->buffer);
     1254                packet->buffer = 0;
     1255        }
     1256
     1257        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
     1258                                sizeof(dag_record_t), &maxwait,
     1259                                &pollwait) == -1) {
    11031260                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11041261                return -1;
    11051262        }
    1106 
    11071263
    11081264        /* Grab a full ERF record */
    11091265        do {
    1110                 numbytes = dag_available(libtrace);
     1266                numbytes = dag_available(libtrace, stream_data);
    11111267                if (numbytes < 0)
    11121268                        return numbytes;
    11131269                if (numbytes < dag_record_size) {
     1270                        /* Check the message queue if we have one to check */
     1271                        if (t != NULL &&
     1272                            libtrace_message_queue_count(&t->messages) > 0)
     1273                                return -2;
     1274
    11141275                        if (libtrace_halt)
    11151276                                return 0;
     
    11171278                        continue;
    11181279                }
    1119                 erfptr = dag_get_record(libtrace);
     1280                erfptr = dag_get_record(stream_data);
    11201281        } while (erfptr == NULL);
    11211282
     1283        packet->trace = libtrace;
    11221284        /* Prepare the libtrace packet */
    1123         if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
    1124                                 flags))
    1125                 return -1;
    1126 
    1127         /* Update the DUCK timer */
    1128         tv = trace_get_timeval(packet);
    1129         DUCK.last_pkt = tv.tv_sec;
    1130 
    1131         return packet->payload ? htons(erfptr->rlen) :
    1132                                 erf_get_framing_length(packet);
     1285        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
     1286                                    TRACE_RT_DATA_ERF, flags))
     1287                return -1;
     1288
     1289        /* Update the DUCK timer - don't re-order this check (false-sharing) */
     1290        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
     1291                tv = trace_get_timeval(packet);
     1292                DUCK.last_pkt = tv.tv_sec;
     1293        }
     1294
     1295        packet->error = packet->payload ? htons(erfptr->rlen) :
     1296                                          erf_get_framing_length(packet);
     1297
     1298        return packet->error;
     1299}
     1300
     1301static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1302{
     1303        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
     1304}
     1305
     1306static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
     1307                             libtrace_packet_t **packets, size_t nb_packets)
     1308{
     1309        int ret;
     1310        size_t read_packets = 0;
     1311        int numbytes = 0;
     1312
     1313        struct dag_per_stream_t *stream_data =
     1314                (struct dag_per_stream_t *)t->format_data;
     1315
     1316        /* Read as many packets as we can, but read atleast one packet */
     1317        do {
     1318                ret = dag_read_packet_stream(libtrace, stream_data, t,
     1319                                           packets[read_packets]);
     1320                if (ret < 0)
     1321                        return ret;
     1322
     1323                read_packets++;
     1324
     1325                /* Make sure we don't read too many packets..! */
     1326                if (read_packets >= nb_packets)
     1327                        break;
     1328
     1329                numbytes = dag_available(libtrace, stream_data);
     1330        } while (numbytes >= dag_record_size);
     1331
     1332        return read_packets;
    11331333}
    11341334
     
    11381338 */
    11391339static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
    1140                                         libtrace_packet_t *packet) {
    1141         libtrace_eventobj_t event = {0,0,0.0,0};
     1340                                           libtrace_packet_t *packet)
     1341{
     1342        libtrace_eventobj_t event = {0,0,0.0,0};
    11421343        dag_record_t *erfptr = NULL;
    11431344        int numbytes;
     
    11581359        }
    11591360       
    1160         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1161                         FORMAT_DATA->dagstream, 0, &minwait,
    1162                         &minwait) == -1)
    1163         {
     1361        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
     1362                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
     1363                                &minwait) == -1) {
    11641364                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11651365                event.type = TRACE_EVENT_TERMINATE;
     
    11701370                erfptr = NULL;
    11711371                numbytes = 0;
    1172        
     1372
    11731373                /* Need to call dag_available so that the top pointer will get
    11741374                 * updated, otherwise we'll never see any data! */
    1175                 numbytes = dag_available(libtrace);
    1176 
    1177                 /* May as well not bother calling dag_get_record if 
     1375                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
     1376
     1377                /* May as well not bother calling dag_get_record if
    11781378                 * dag_available suggests that there's no data */
    11791379                if (numbytes != 0)
    1180                         erfptr = dag_get_record(libtrace);
     1380                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
    11811381                if (erfptr == NULL) {
    11821382                        /* No packet available - sleep for a very short time */
    11831383                        if (libtrace_halt) {
    11841384                                event.type = TRACE_EVENT_TERMINATE;
    1185                         } else {                       
     1385                        } else {
    11861386                                event.type = TRACE_EVENT_SLEEP;
    11871387                                event.seconds = 0.0001;
     
    11891389                        break;
    11901390                }
    1191                 if (dag_prepare_packet(libtrace, packet, erfptr,
    1192                                         TRACE_RT_DATA_ERF, flags)) {
     1391                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
     1392                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
    11931393                        event.type = TRACE_EVENT_TERMINATE;
    11941394                        break;
     
    11961396
    11971397
    1198                 event.size = trace_get_capture_length(packet) + 
    1199                                 trace_get_framing_length(packet);
    1200                
     1398                event.size = trace_get_capture_length(packet) +
     1399                        trace_get_framing_length(packet);
     1400
    12011401                /* XXX trace_read_packet() normally applies the following
    12021402                 * config options for us, but this function is called via
     
    12041404
    12051405                if (libtrace->filter) {
    1206                         int filtret = trace_apply_filter(libtrace->filter, 
    1207                                         packet);
     1406                        int filtret = trace_apply_filter(libtrace->filter,
     1407                                                         packet);
    12081408                        if (filtret == -1) {
    12091409                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
    1210                                                 "Bad BPF Filter");
     1410                                              "Bad BPF Filter");
    12111411                                event.type = TRACE_EVENT_TERMINATE;
    12121412                                break;
     
    12191419                                 * a sleep event in this case, like we used to
    12201420                                 * do! */
    1221                                 libtrace->filtered_packets ++;
     1421                                libtrace->filtered_packets ++;
    12221422                                trace_clear_cache(packet);
    12231423                                continue;
    12241424                        }
    1225                                
     1425
    12261426                        event.type = TRACE_EVENT_PACKET;
    12271427                } else {
     
    12361436                        trace_set_capture_length(packet, libtrace->snaplen);
    12371437                }
    1238                 libtrace->accepted_packets ++;
     1438                libtrace->accepted_packets ++;
    12391439                break;
    1240         } while (1);
     1440        } while(1);
    12411441
    12421442        return event;
    12431443}
    12441444
    1245 /* Gets the number of dropped packets */
    1246 static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
    1247         if (trace->format_data == NULL)
    1248                 return (uint64_t)-1;
    1249         return DATA(trace)->drops;
     1445static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
     1446{
     1447        libtrace_list_node_t *tmp;
     1448        assert(stat && libtrace);
     1449        tmp = FORMAT_DATA_HEAD;
     1450
     1451        /* Dropped packets */
     1452        stat->dropped_valid = 1;
     1453        stat->dropped = 0;
     1454        while (tmp != NULL) {
     1455                stat->dropped += STREAM_DATA(tmp)->drops;
     1456                tmp = tmp->next;
     1457        }
     1458
     1459}
     1460
     1461static void dag_get_thread_statisitics(libtrace_t *libtrace, libtrace_thread_t *t,
     1462                                       libtrace_stat_t *stat) {
     1463        struct dag_per_stream_t *stream_data = t->format_data;
     1464        assert(stat && libtrace);
     1465
     1466        stat->dropped_valid = 1;
     1467        stat->dropped = stream_data->drops;
     1468
     1469        stat->filtered_valid = 1;
     1470        stat->filtered = 0;
    12501471}
    12511472
    12521473/* Prints some semi-useful help text about the DAG format module */
    12531474static void dag_help(void) {
    1254         printf("dag format module: $Revision: 1755 $\n");
    1255         printf("Supported input URIs:\n");
    1256         printf("\tdag:/dev/dagn\n");
    1257         printf("\n");
    1258         printf("\te.g.: dag:/dev/dag0\n");
    1259         printf("\n");
    1260         printf("Supported output URIs:\n");
    1261         printf("\tnone\n");
    1262         printf("\n");
     1475        printf("dag format module: $Revision: 1755 $\n");
     1476        printf("Supported input URIs:\n");
     1477        printf("\tdag:/dev/dagn\n");
     1478        printf("\n");
     1479        printf("\te.g.: dag:/dev/dag0\n");
     1480        printf("\n");
     1481        printf("Supported output URIs:\n");
     1482        printf("\tnone\n");
     1483        printf("\n");
     1484}
     1485
     1486static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     1487                                bool reader)
     1488{
     1489        struct dag_per_stream_t *stream_data;
     1490        libtrace_list_node_t *node;
     1491
     1492        if (reader && t->type == THREAD_PERPKT) {
     1493                fprintf(stderr, "t%u: registered reader thread", t->perpkt_num);
     1494                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
     1495                                                t->perpkt_num);
     1496                if (node == NULL) {
     1497                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     1498                                      "Too few streams supplied for the"
     1499                                      " number of threads lanuched");
     1500                        return -1;
     1501                }
     1502                stream_data = node->data;
     1503
     1504                /* Pass the per thread data to the thread */
     1505                t->format_data = stream_data;
     1506
     1507                /* Attach and start the DAG stream */
     1508                printf("t%u: starting and attaching stream #%u\n",
     1509                       t->perpkt_num, stream_data->dagstream);
     1510                if (dag_start_input_stream(libtrace, stream_data) < 0)
     1511                        return -1;
     1512        }
     1513
     1514        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
     1515
     1516        return 0;
    12631517}
    12641518
    12651519static struct libtrace_format_t dag = {
    1266         "dag",
    1267         "$Id$",
    1268         TRACE_FORMAT_ERF,
     1520        "dag",
     1521        "$Id$",
     1522        TRACE_FORMAT_ERF,
    12691523        dag_probe_filename,             /* probe filename */
    12701524        NULL,                           /* probe magic */
    1271         dag_init_input,                 /* init_input */
    1272         dag_config_input,               /* config_input */
    1273         dag_start_input,                /* start_input */
    1274         dag_pause_input,                /* pause_input */
     1525        dag_init_input,                 /* init_input */
     1526        dag_config_input,               /* config_input */
     1527        dag_start_input,                /* start_input */
     1528        dag_pause_input,                /* pause_input */
    12751529        dag_init_output,                /* init_output */
    1276         NULL,                           /* config_output */
     1530        NULL,                           /* config_output */
    12771531        dag_start_output,               /* start_output */
    1278         dag_fin_input,                  /* fin_input */
     1532        dag_fin_input,                  /* fin_input */
    12791533        dag_fin_output,                 /* fin_output */
    1280         dag_read_packet,                /* read_packet */
    1281         dag_prepare_packet,             /* prepare_packet */
     1534        dag_read_packet,                /* read_packet */
     1535        dag_prepare_packet,             /* prepare_packet */
    12821536        NULL,                           /* fin_packet */
    12831537        dag_write_packet,               /* write_packet */
    1284         erf_get_link_type,              /* get_link_type */
    1285         erf_get_direction,              /* get_direction */
    1286         erf_set_direction,              /* set_direction */
    1287         erf_get_erf_timestamp,          /* get_erf_timestamp */
    1288         NULL,                           /* get_timeval */
    1289         NULL,                           /* get_seconds */
     1538        erf_get_link_type,              /* get_link_type */
     1539        erf_get_direction,              /* get_direction */
     1540        erf_set_direction,              /* set_direction */
     1541        erf_get_erf_timestamp,          /* get_erf_timestamp */
     1542        NULL,                           /* get_timeval */
     1543        NULL,                           /* get_seconds */
    12901544        NULL,                           /* get_timespec */
    1291         NULL,                           /* seek_erf */
    1292         NULL,                           /* seek_timeval */
    1293         NULL,                           /* seek_seconds */
    1294         erf_get_capture_length,         /* get_capture_length */
    1295         erf_get_wire_length,            /* get_wire_length */
    1296         erf_get_framing_length,         /* get_framing_length */
    1297         erf_set_capture_length,         /* set_capture_length */
     1545        NULL,                           /* seek_erf */
     1546        NULL,                           /* seek_timeval */
     1547        NULL,                           /* seek_seconds */
     1548        erf_get_capture_length,         /* get_capture_length */
     1549        erf_get_wire_length,            /* get_wire_length */
     1550        erf_get_framing_length,         /* get_framing_length */
     1551        erf_set_capture_length,         /* set_capture_length */
    12981552        NULL,                           /* get_received_packets */
    12991553        NULL,                           /* get_filtered_packets */
    1300         dag_get_dropped_packets,        /* get_dropped_packets */
    1301         NULL,                           /* get_captured_packets */
    1302         NULL,                           /* get_fd */
    1303         trace_event_dag,                /* trace_event */
    1304         dag_help,                       /* help */
    1305         NULL                            /* next pointer */
     1554        NULL,                           /* get_dropped_packets */
     1555        dag_get_statistics,             /* get_statistics */
     1556        NULL,                           /* get_fd */
     1557        trace_event_dag,                /* trace_event */
     1558        dag_help,                       /* help */
     1559        NULL,                            /* next pointer */
     1560        {true, 0}, /* live packet capture, thread limit TBD */
     1561        dag_pstart_input,
     1562        dag_pread_packets,
     1563        dag_pause_input,
     1564        NULL,
     1565        dag_pregister_thread,
     1566        NULL,
     1567        dag_get_thread_statisitics      /* get thread stats */
    13061568};
    13071569
    1308 void dag_constructor(void) {
     1570void dag_constructor(void)
     1571{
    13091572        register_format(&dag);
    13101573}
  • lib/format_dpdk.c

    rb585975 r773a2a3  
    33 * This file is part of libtrace
    44 *
    5  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 
     5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
    66 * New Zealand.
    77 *
    8  * Author: Richard Sanger 
    9  *         
     8 * Author: Richard Sanger
     9 *
    1010 * All rights reserved.
    1111 *
    12  * This code has been developed by the University of Waikato WAND 
     12 * This code has been developed by the University of Waikato WAND
    1313 * research group. For further information please see http://www.wand.net.nz/
    1414 *
     
    3636 * Intel Data Plane Development Kit is a LIVE capture format.
    3737 *
    38  * This format also supports writing which will write packets out to the 
    39  * network as a form of packet replay. This should not be confused with the 
    40  * RT protocol which is intended to transfer captured packet records between 
     38 * This format also supports writing which will write packets out to the
     39 * network as a form of packet replay. This should not be confused with the
     40 * RT protocol which is intended to transfer captured packet records between
    4141 * RT-speaking programs.
    4242 */
     43
     44#define _GNU_SOURCE
    4345
    4446#include "config.h"
     
    4749#include "format_helper.h"
    4850#include "libtrace_arphrd.h"
     51#include "hash_toeplitz.h"
    4952
    5053#ifdef HAVE_INTTYPES_H
     
    5962#include <endian.h>
    6063#include <string.h>
     64
     65#if HAVE_LIBNUMA
     66#include <numa.h>
     67#endif
    6168
    6269/* We can deal with any minor differences by checking the RTE VERSION
     
    151158#include <rte_mempool.h>
    152159#include <rte_mbuf.h>
    153 
    154 /* The default size of memory buffers to use - This is the max size of standard
     160#include <rte_launch.h>
     161#include <rte_lcore.h>
     162#include <rte_per_lcore.h>
     163#include <rte_cycles.h>
     164#include <pthread.h>
     165#ifdef __FreeBSD__
     166#include <pthread_np.h>
     167#endif
     168
     169
     170/* The default size of memory buffers to use - This is the max size of standard
    155171 * ethernet packet less the size of the MAC CHECKSUM */
    156172#define RX_MBUF_SIZE 1514
    157173
    158 /* The minimum number of memory buffers per queue tx or rx. Search for 
     174/* The minimum number of memory buffers per queue tx or rx. Search for
    159175 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
    160176 */
     
    174190#define NB_TX_MBUF 1024
    175191
    176 /* The size of the PCI blacklist needs to be big enough to contain 
     192/* The size of the PCI blacklist needs to be big enough to contain
    177193 * every PCI device address (listed by lspci every bus:device.function tuple).
    178194 */
     
    181197/* The maximum number of characters the mempool name can be */
    182198#define MEMPOOL_NAME_LEN 20
     199
     200/* For single threaded libtrace we read packets as a batch/burst
     201 * this is the maximum size of said burst */
     202#define BURST_SIZE 50
    183203
    184204#define MBUF(x) ((struct rte_mbuf *) x)
     
    186206#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    187207#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     208#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     209
     210#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
     211#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
     212
    188213#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    189                         (uint64_t) tv.tv_usec*1000ull)
     214                        (uint64_t) tv.tv_usec*1000ull)
    190215#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
    191                         (uint64_t) ts.tv_nsec)
     216                        (uint64_t) ts.tv_nsec)
    192217
    193218#if RTE_PKTMBUF_HEADROOM != 128
    194219#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
    195         "any libtrace instance processing these packet must be have the" \
    196         "same RTE_PKTMBUF_HEADROOM set"
     220        "any libtrace instance processing these packet must be have the" \
     221        "same RTE_PKTMBUF_HEADROOM set"
    197222#endif
    198223
    199224/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    200  * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 
    201  * 
     225 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
     226 *
    202227 * Make sure you understand what these are doing before enabling them.
    203228 * They might make traces incompatable with other builds etc.
    204  * 
     229 *
    205230 * These are also included to show how to do somethings which aren't
    206231 * obvious in the DPDK documentation.
    207232 */
    208233
    209 /* Print verbose messages to stdout */
     234/* Print verbose messages to stderr */
    210235#define DEBUG 0
    211236
    212 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 
    213  * only turn on if you know clock_gettime is a vsyscall on your system 
     237/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     238 * only turn on if you know clock_gettime is a vsyscall on your system
    214239 * overwise could be a large overhead. Again gettimeofday() should be
    215240 * vsyscall also if it's not you should seriously consider updating your
    216241 * kernel.
    217242 */
    218 #ifdef HAVE_LIBRT
     243#ifdef HAVE_CLOCK_GETTIME
    219244/* You can turn this on (set to 1) to prefer clock_gettime */
    220 #define USE_CLOCK_GETTIME 0
     245#define USE_CLOCK_GETTIME 1
    221246#else
    222 /* DONT CHANGE THIS !!! */
     247/* DON'T CHANGE THIS !!! */
    223248#define USE_CLOCK_GETTIME 0
    224249#endif
     
    229254 * hence writing out a port such as int: ring: and dpdk: assumes there
    230255 * is no checksum and will attempt to write the checksum as part of the
    231  * packet 
     256 * packet
    232257 */
    233258#define GET_MAC_CRC_CHECKSUM 0
    234259
    235260/* This requires a modification of the pmd drivers (inside Intel DPDK)
     261 * TODO this requires updating (packet sizes are wrong TS most likely also)
    236262 */
    237263#define HAS_HW_TIMESTAMPS_82580 0
     
    244270#endif
    245271
     272static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
     273/* Memory pools Per NUMA node */
     274static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
     275
    246276/* As per Intel 82580 specification - mismatch in 82580 datasheet
    247277 * it states ts is stored in Big Endian, however its actually Little */
    248278struct hw_timestamp_82580 {
    249     uint64_t reserved;
    250     uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
     279        uint64_t reserved;
     280        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
    251281};
    252282
    253283enum paused_state {
    254     DPDK_NEVER_STARTED,
    255     DPDK_RUNNING,
    256     DPDK_PAUSED,
     284        DPDK_NEVER_STARTED,
     285        DPDK_RUNNING,
     286        DPDK_PAUSED,
    257287};
     288
     289struct dpdk_per_stream_t
     290{
     291        uint16_t queue_id;
     292        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
     293        struct rte_mempool *mempool;
     294        int lcore;
     295#if HAS_HW_TIMESTAMPS_82580
     296        /* Timestamping only relevent to RX */
     297        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
     298        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
     299#endif
     300} ALIGN_STRUCT(CACHE_LINE_SIZE);
     301
     302#if HAS_HW_TIMESTAMPS_82580
     303#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
     304#else
     305#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
     306#endif
     307
     308typedef struct dpdk_per_stream_t dpdk_per_stream_t;
    258309
    259310/* Used by both input and output however some fields are not used
    260311 * for output */
    261312struct dpdk_format_data_t {
    262     int8_t promisc; /* promiscuous mode - RX only */
    263     uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
    264     uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
    265     uint8_t paused; /* See paused_state */
    266     uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
    267     int snaplen; /* The snap length for the capture - RX only */
    268     /* We always have to setup both rx and tx queues even if we don't want them */
    269     int nb_rx_buf; /* The number of packet buffers in the rx ring */
    270     int nb_tx_buf; /* The number of packet buffers in the tx ring */
    271     struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
     313        int8_t promisc; /* promiscuous mode - RX only */
     314        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
     315        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
     316        uint8_t paused; /* See paused_state */
     317        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
     318        int snaplen; /* The snap length for the capture - RX only */
     319        /* We always have to setup both rx and tx queues even if we don't want them */
     320        int nb_rx_buf; /* The number of packet buffers in the rx ring */
     321        int nb_tx_buf; /* The number of packet buffers in the tx ring */
     322        int nic_numa_node; /* The NUMA node that the NIC is attached to */
     323        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
    272324#if DPDK_USE_BLACKLIST
    273     struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
     325        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
    274326        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
    275327#endif
    276     char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    277 #if HAS_HW_TIMESTAMPS_82580
    278     /* Timestamping only relevent to RX */
    279     uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
    280     uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
    281     uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    282 #endif
     328        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
     329        uint8_t rss_key[40]; // This is the RSS KEY
     330        /* To improve single-threaded performance we always batch reading
     331         * packets, in a burst, otherwise the parallel library does this for us */
     332        struct rte_mbuf* burst_pkts[BURST_SIZE];
     333        int burst_size; /* The total number read in the burst */
     334        int burst_offset; /* The offset we are into the burst */
     335
     336        /* Our parallel streams */
     337        libtrace_list_t *per_stream;
    283338};
    284339
    285340enum dpdk_addt_hdr_flags {
    286     INCLUDES_CHECKSUM = 0x1,
    287     INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
     341        INCLUDES_CHECKSUM = 0x1,
     342        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
    288343};
    289344
    290 /** 
     345/**
    291346 * A structure placed in front of the packet where we can store
    292347 * additional information about the given packet.
     
    294349 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
    295350 * +--------------------------+
    296  * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
    297  * +--------------------------+
    298351 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
    299352 * +--------------------------+
    300  * |   sizeof(dpdk_addt_hdr)  | 1 byte
    301  * +--------------------------+ 
     353 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
     354 * +--------------------------+
    302355 * *   hw_timestamp_82580     * 16 bytes Optional
    303356 * +--------------------------+
     
    306359 */
    307360struct dpdk_addt_hdr {
    308     uint64_t timestamp;
    309     uint8_t flags;
    310     uint8_t direction;
    311     uint8_t reserved1;
    312     uint8_t reserved2;
    313     uint32_t cap_len; /* The size to say the capture is */
     361        uint64_t timestamp;
     362        uint8_t flags;
     363        uint8_t direction;
     364        uint8_t reserved1;
     365        uint8_t reserved2;
     366        uint32_t cap_len; /* The size to say the capture is */
    314367};
    315368
     
    317370 * We want to blacklist all devices except those on the whitelist
    318371 * (I say list, but yes it is only the one).
    319  * 
     372 *
    320373 * The default behaviour of rte_pci_probe() will map every possible device
    321374 * to its DPDK driver. The DPDK driver will take the ethernet device
    322375 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
    323  * 
    324  * So blacklist all devices except the one that we wish to use so that 
     376 *
     377 * So blacklist all devices except the one that we wish to use so that
    325378 * the others can still be used as standard ethernet ports.
    326379 *
     
    336389
    337390        TAILQ_FOREACH(dev, &device_list, next) {
    338         if (whitelist != NULL && whitelist->domain == dev->addr.domain
    339             && whitelist->bus == dev->addr.bus
    340             && whitelist->devid == dev->addr.devid
    341             && whitelist->function == dev->addr.function)
    342             continue;
     391        if (whitelist != NULL && whitelist->domain == dev->addr.domain
     392            && whitelist->bus == dev->addr.bus
     393            && whitelist->devid == dev->addr.devid
     394            && whitelist->function == dev->addr.function)
     395            continue;
    343396                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
    344                                 / sizeof (format_data->blacklist[0])) {
    345                         printf("Warning: too many devices to blacklist consider"
    346                                         " increasing BLACK_LIST_SIZE");
     397                                / sizeof (format_data->blacklist[0])) {
     398                        fprintf(stderr, "Warning: too many devices to blacklist consider"
     399                                        " increasing BLACK_LIST_SIZE");
    347400                        break;
    348401                }
     
    360413        char pci_str[20] = {0};
    361414        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
    362                 whitelist->domain,
    363                 whitelist->bus,
    364                 whitelist->devid,
    365                 whitelist->function);
     415                whitelist->domain,
     416                whitelist->bus,
     417                whitelist->devid,
     418                whitelist->function);
    366419        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
    367420                return -1;
     
    375428 * Fills in addr, note core is optional and is unchanged if
    376429 * a value for it is not provided.
    377  * 
     430 *
    378431 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
    379  * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 
     432 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
    380433 */
    381434static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
    382     int matches;
    383     assert(str);
    384     matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
    385                      &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
    386     if (matches >= 4) {
    387         return 0;
    388     } else {
    389         return -1;
    390     }
     435        int matches;
     436        assert(str);
     437        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
     438                         &addr->domain, &addr->bus, &addr->devid,
     439                         &addr->function, core);
     440        if (matches >= 4) {
     441                return 0;
     442        } else {
     443                return -1;
     444        }
     445}
     446
     447/**
     448 * Convert a pci address to the numa node it is
     449 * connected to.
     450 *
     451 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
     452 * so we can call it before DPDK
     453 *
     454 * @return -1 if unknown otherwise a number 0 or higher of the numa node
     455 */
     456static int pci_to_numa(struct rte_pci_addr * dev_addr) {
     457        char path[50] = {0};
     458        FILE *file;
     459
     460        /* Read from the system */
     461        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
     462                 dev_addr->domain,
     463                 dev_addr->bus,
     464                 dev_addr->devid,
     465                 dev_addr->function);
     466
     467        if((file = fopen(path, "r")) != NULL) {
     468                int numa_node = -1;
     469                fscanf(file, "%d", &numa_node);
     470                fclose(file);
     471                return numa_node;
     472        }
     473        return -1;
    391474}
    392475
     
    395478static inline void dump_configuration()
    396479{
    397     struct rte_config * global_config;
    398     long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    399 
    400     if (nb_cpu <= 0) {
    401         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    402         nb_cpu = 1; /* fallback to just 1 core */
    403     }
    404     if (nb_cpu > RTE_MAX_LCORE)
    405         nb_cpu = RTE_MAX_LCORE;
    406 
    407     global_config = rte_eal_get_configuration();
    408 
    409     if (global_config != NULL) {
    410         int i;
    411         fprintf(stderr, "Intel DPDK setup\n"
    412                "---Version      : %s\n"
    413                "---Master LCore : %"PRIu32"\n"
    414                "---LCore Count  : %"PRIu32"\n",
    415                rte_version(),
    416                global_config->master_lcore, global_config->lcore_count);
    417 
    418         for (i = 0 ; i < nb_cpu; i++) {
    419             fprintf(stderr, "   ---Core %d : %s\n", i,
    420                    global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
    421         }
    422 
    423         const char * proc_type;
    424         switch (global_config->process_type) {
    425             case RTE_PROC_AUTO:
    426                 proc_type = "auto";
    427                 break;
    428             case RTE_PROC_PRIMARY:
    429                 proc_type = "primary";
    430                 break;
    431             case RTE_PROC_SECONDARY:
    432                 proc_type = "secondary";
    433                 break;
    434             case RTE_PROC_INVALID:
    435                 proc_type = "invalid";
    436                 break;
    437             default:
    438                 proc_type = "something worse than invalid!!";
    439         }
    440         fprintf(stderr, "---Process Type : %s\n", proc_type);
    441     }
    442 
    443 }
    444 #endif
     480        struct rte_config * global_config;
     481        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     482
     483        if (nb_cpu <= 0) {
     484                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
     485                       " Falling back to the first core.");
     486                nb_cpu = 1; /* fallback to just 1 core */
     487        }
     488        if (nb_cpu > RTE_MAX_LCORE)
     489                nb_cpu = RTE_MAX_LCORE;
     490
     491        global_config = rte_eal_get_configuration();
     492
     493        if (global_config != NULL) {
     494                int i;
     495                fprintf(stderr, "Intel DPDK setup\n"
     496                        "---Version      : %s\n"
     497                        "---Master LCore : %"PRIu32"\n"
     498                        "---LCore Count  : %"PRIu32"\n",
     499                        rte_version(),
     500                        global_config->master_lcore, global_config->lcore_count);
     501
     502                for (i = 0 ; i < nb_cpu; i++) {
     503                        fprintf(stderr, "   ---Core %d : %s\n", i,
     504                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
     505                }
     506
     507                const char * proc_type;
     508                switch (global_config->process_type) {
     509                case RTE_PROC_AUTO:
     510                        proc_type = "auto";
     511                        break;
     512                case RTE_PROC_PRIMARY:
     513                        proc_type = "primary";
     514                        break;
     515                case RTE_PROC_SECONDARY:
     516                        proc_type = "secondary";
     517                        break;
     518                case RTE_PROC_INVALID:
     519                        proc_type = "invalid";
     520                        break;
     521                default:
     522                        proc_type = "something worse than invalid!!";
     523                }
     524                fprintf(stderr, "---Process Type : %s\n", proc_type);
     525        }
     526
     527}
     528#endif
     529
     530/**
     531 * Expects to be called from the master lcore and moves it to the given dpdk id
     532 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     533 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     534 *               and not already in use.
     535 * @return 0 is successful otherwise -1 on error.
     536 */
     537static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
     538        struct rte_config *cfg = rte_eal_get_configuration();
     539        cpu_set_t cpuset;
     540        int i;
     541
     542        assert (core < RTE_MAX_LCORE);
     543        assert (rte_get_master_lcore() == rte_lcore_id());
     544
     545        if (core == rte_lcore_id())
     546                return 0;
     547
     548        /* Make sure we are not overwriting someone else */
     549        assert(!rte_lcore_is_enabled(core));
     550
     551        /* Move the core */
     552        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     553        cfg->lcore_role[core] = ROLE_RTE;
     554        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     555        rte_eal_get_configuration()->master_lcore = core;
     556        RTE_PER_LCORE(_lcore_id) = core;
     557
     558        /* Now change the affinity, either mapped to a single core or all accepted */
     559        CPU_ZERO(&cpuset);
     560
     561        if (lcore_config[core].detected) {
     562                CPU_SET(core, &cpuset);
     563        } else {
     564                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     565                        if (lcore_config[i].detected)
     566                                CPU_SET(i, &cpuset);
     567                }
     568        }
     569
     570        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     571        if (i != 0) {
     572                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
     573                return -1;
     574        }
     575        return 0;
     576}
    445577
    446578/**
     
    474606static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
    475607                                        char * err, int errlen) {
    476     int ret; /* Returned error codes */
    477     struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
    478     char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
    479     char mem_map[20] = {0}; /* The memory name */
    480     long nb_cpu; /* The number of CPUs in the system */
    481     long my_cpu; /* The CPU number we want to bind to */
     608        int ret; /* Returned error codes */
     609        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
     610        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
     611        char mem_map[20] = {0}; /* The memory name */
     612        long nb_cpu; /* The number of CPUs in the system */
     613        long my_cpu; /* The CPU number we want to bind to */
     614        int i;
     615        struct rte_config *cfg = rte_eal_get_configuration();
    482616        struct saved_getopts save_opts;
    483    
    484 #if DEBUG
    485     rte_set_log_level(RTE_LOG_DEBUG);
    486 #else
    487     rte_set_log_level(RTE_LOG_WARNING);
    488 #endif
    489     /*
    490      * Using unique file prefixes mean separate memory is used, unlinking
    491      * the two processes. However be careful we still cannot access a
    492      * port that already in use.
    493      */
    494     char* argv[] = {"libtrace",
    495                     "-c", cpu_number,
    496                     "-n", "1",
    497                     "--proc-type", "auto",
    498                     "--file-prefix", mem_map,
    499                     "-m", "256",
     617
     618        /* This initialises the Environment Abstraction Layer (EAL)
     619         * If we had slave workers these are put into WAITING state
     620         *
     621         * Basically binds this thread to a fixed core, which we choose as
     622         * the last core on the machine (assuming fewer interrupts mapped here).
     623         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
     624         * "-n" the number of memory channels into the CPU (hardware specific)
     625         *      - Most likely to be half the number of ram slots in your machine.
     626         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
     627         * Controls where in memory packets are stored such that they are spread
     628         * across the channels. We just use 1 to be safe.
     629         *
     630         * Using unique file prefixes mean separate memory is used, unlinking
     631         * the two processes. However be careful we still cannot access a
     632         * port that already in use.
     633         */
     634        char* argv[] = {"libtrace",
     635                        "-c", cpu_number,
     636                        "-n", "1",
     637                        "--proc-type", "auto",
     638                        "--file-prefix", mem_map,
     639                        "-m", "512",
    500640#if DPDK_USE_LOG_LEVEL
    501641#       if DEBUG
    502                     "--log-level", "8", /* RTE_LOG_DEBUG */
     642                        "--log-level", "8", /* RTE_LOG_DEBUG */
    503643#       else
    504                     "--log-level", "5", /* RTE_LOG_WARNING */
     644                        "--log-level", "5", /* RTE_LOG_WARNING */
    505645#       endif
    506646#endif
    507                     NULL};
    508     int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    509 
    510     /* This initialises the Environment Abstraction Layer (EAL)
    511      * If we had slave workers these are put into WAITING state
    512      *
    513      * Basically binds this thread to a fixed core, which we choose as
    514      * the last core on the machine (assuming fewer interrupts mapped here).
    515      * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
    516      * "-n" the number of memory channels into the CPU (hardware specific)
    517      *      - Most likely to be half the number of ram slots in your machine.
    518      *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
    519      * Controls where in memory packets are stored and should spread across
    520      * the channels. We just use 1 to be safe.
    521      */
    522 
    523     /* Get the number of cpu cores in the system and use the last core */
    524     nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    525     if (nb_cpu <= 0) {
    526         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    527         nb_cpu = 1; /* fallback to the first core */
    528     }
    529     if (nb_cpu > RTE_MAX_LCORE)
    530         nb_cpu = RTE_MAX_LCORE;
    531 
    532     my_cpu = nb_cpu;
    533     /* This allows the user to specify the core - we would try to do this
    534      * automatically but it's hard to tell that this is secondary
    535      * before running rte_eal_init(...). Currently we are limited to 1
    536      * instance per core due to the way memory is allocated. */
    537     if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
    538         snprintf(err, errlen, "Failed to parse URI");
    539         return -1;
    540     }
    541 
    542     snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
    543                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
    544 
    545     if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
    546         snprintf(err, errlen,
    547           "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
    548           " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
    549         return -1;
    550     }
    551 
    552     /* Make our mask */
    553     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     647                        NULL};
     648        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
     649
     650#if DEBUG
     651        rte_set_log_level(RTE_LOG_DEBUG);
     652#else
     653        rte_set_log_level(RTE_LOG_WARNING);
     654#endif
     655
     656        /* Get the number of cpu cores in the system and use the last core
     657         * on the correct numa node */
     658        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     659        if (nb_cpu <= 0) {
     660                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
     661                       " Falling back to the first core.");
     662                nb_cpu = 1; /* fallback to the first core */
     663        }
     664        if (nb_cpu > RTE_MAX_LCORE)
     665                nb_cpu = RTE_MAX_LCORE;
     666
     667        my_cpu = -1;
     668        /* This allows the user to specify the core - we would try to do this
     669         * automatically but it's hard to tell that this is secondary
     670         * before running rte_eal_init(...). Currently we are limited to 1
     671         * instance per core due to the way memory is allocated. */
     672        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
     673                snprintf(err, errlen, "Failed to parse URI");
     674                return -1;
     675        }
     676
     677#if HAVE_LIBNUMA
     678        format_data->nic_numa_node = pci_to_numa(&use_addr);
     679        if (my_cpu < 0) {
     680#if DEBUG
     681                /* If we can assign to a core on the same numa node */
     682                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
     683#endif
     684                if(format_data->nic_numa_node >= 0) {
     685                        int max_node_cpu = -1;
     686                        struct bitmask *mask = numa_allocate_cpumask();
     687                        assert(mask);
     688                        numa_node_to_cpus(format_data->nic_numa_node, mask);
     689                        for (i = 0 ; i < nb_cpu; ++i) {
     690                                if (numa_bitmask_isbitset(mask,i))
     691                                        max_node_cpu = i+1;
     692                        }
     693                        my_cpu = max_node_cpu;
     694                }
     695        }
     696#endif
     697        if (my_cpu < 0) {
     698                my_cpu = nb_cpu;
     699        }
     700
     701
     702        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
     703                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
     704
     705        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
     706                snprintf(err, errlen,
     707                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
     708                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
     709                return -1;
     710        }
     711
     712        /* Make our mask with all cores turned on this is so that DPDK
     713         * gets all CPU info in older versions */
     714        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     715        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    554716
    555717#if !DPDK_USE_BLACKLIST
    556     /* Black list all ports besides the one that we want to use */
    557     if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
    558         snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    559                  " are you sure the address is correct?: %s", strerror(-ret));
    560         return -1;
    561     }
     718        /* Black list all ports besides the one that we want to use */
     719        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
     720                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     721                         " are you sure the address is correct?: %s", strerror(-ret));
     722                return -1;
     723        }
    562724#endif
    563725
    564726        /* Give the memory map a unique name */
    565727        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
    566     /* rte_eal_init it makes a call to getopt so we need to reset the
    567     * global optind variable of getopt otherwise this fails */
     728        /* rte_eal_init it makes a call to getopt so we need to reset the
     729        * global optind variable of getopt otherwise this fails */
    568730        save_getopts(&save_opts);
    569     optind = 1;
    570     if ((ret = rte_eal_init(argc, argv)) < 0) {
    571         snprintf(err, errlen,
    572           "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
    573         return -1;
    574     }
     731        optind = 1;
     732        if ((ret = rte_eal_init(argc, argv)) < 0) {
     733                snprintf(err, errlen,
     734                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
     735                return -1;
     736        }
    575737        restore_getopts(&save_opts);
     738        // These are still running but will never do anything with DPDK v1.7 we
     739        // should remove this XXX in the future
     740        for(i = 0; i < RTE_MAX_LCORE; ++i) {
     741                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
     742                        cfg->lcore_role[i] = ROLE_OFF;
     743                        cfg->lcore_count--;
     744                }
     745        }
     746        // Only the master should be running
     747        assert(cfg->lcore_count == 1);
     748
     749        // TODO XXX TODO
     750        dpdk_move_master_lcore(NULL, my_cpu-1);
    576751
    577752#if DEBUG
    578     dump_configuration();
     753        dump_configuration();
    579754#endif
    580755
    581756#if DPDK_USE_PMD_INIT
    582     /* This registers all available NICs with Intel DPDK
    583     * These are not loaded until rte_eal_pci_probe() is called.
    584     */
    585     if ((ret = rte_pmd_init_all()) < 0) {
    586         snprintf(err, errlen,
    587           "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
    588         return -1;
    589     }
     757        /* This registers all available NICs with Intel DPDK
     758        * These are not loaded until rte_eal_pci_probe() is called.
     759        */
     760        if ((ret = rte_pmd_init_all()) < 0) {
     761                snprintf(err, errlen,
     762                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
     763                return -1;
     764        }
    590765#endif
    591766
    592767#if DPDK_USE_BLACKLIST
    593     /* Blacklist all ports besides the one that we want to use */
     768        /* Blacklist all ports besides the one that we want to use */
    594769        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
    595770                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     
    600775
    601776#if DPDK_USE_PCI_PROBE
    602     /* This loads DPDK drivers against all ports that are not blacklisted */
     777        /* This loads DPDK drivers against all ports that are not blacklisted */
    603778        if ((ret = rte_eal_pci_probe()) < 0) {
    604         snprintf(err, errlen,
    605             "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
    606         return -1;
    607     }
    608 #endif
    609 
    610     format_data->nb_ports = rte_eth_dev_count();
    611 
    612     if (format_data->nb_ports != 1) {
    613         snprintf(err, errlen,
    614             "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
    615             format_data->nb_ports);
    616         return -1;
    617     }
    618 
    619     return 0;
     779                snprintf(err, errlen,
     780                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
     781                return -1;
     782        }
     783#endif
     784
     785        format_data->nb_ports = rte_eth_dev_count();
     786
     787        if (format_data->nb_ports != 1) {
     788                snprintf(err, errlen,
     789                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
     790                         format_data->nb_ports);
     791                return -1;
     792        }
     793
     794        return 0;
    620795}
    621796
    622797static int dpdk_init_input (libtrace_t *libtrace) {
    623     char err[500];
    624     err[0] = 0;
    625    
    626     libtrace->format_data = (struct dpdk_format_data_t *)
    627                             malloc(sizeof(struct dpdk_format_data_t));
    628     FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    629     FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
    630     FORMAT(libtrace)->nb_ports = 0;
    631     FORMAT(libtrace)->snaplen = 0; /* Use default */
    632     FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
    633     FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
    634     FORMAT(libtrace)->promisc = -1;
    635     FORMAT(libtrace)->pktmbuf_pool = NULL;
     798        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
     799        char err[500];
     800        err[0] = 0;
     801
     802        libtrace->format_data = (struct dpdk_format_data_t *)
     803                                malloc(sizeof(struct dpdk_format_data_t));
     804        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
     805        FORMAT(libtrace)->nb_ports = 0;
     806        FORMAT(libtrace)->snaplen = 0; /* Use default */
     807        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
     808        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
     809        FORMAT(libtrace)->nic_numa_node = -1;
     810        FORMAT(libtrace)->promisc = -1;
     811        FORMAT(libtrace)->pktmbuf_pool = NULL;
    636812#if DPDK_USE_BLACKLIST
    637     FORMAT(libtrace)->nb_blacklist = 0;
    638 #endif
    639     FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    640     FORMAT(libtrace)->mempool_name[0] = 0;
    641 #if HAS_HW_TIMESTAMPS_82580
    642     FORMAT(libtrace)->ts_first_sys = 0;
    643     FORMAT(libtrace)->ts_last_sys = 0;
    644     FORMAT(libtrace)->wrap_count = 0;
    645 #endif
    646 
    647     if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    648         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    649         free(libtrace->format_data);
    650         libtrace->format_data = NULL;
    651         return -1;
    652     }
    653     return 0;
    654 };
     813        FORMAT(libtrace)->nb_blacklist = 0;
     814#endif
     815        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
     816        FORMAT(libtrace)->mempool_name[0] = 0;
     817        memset(FORMAT(libtrace)->burst_pkts, 0,
     818               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     819        FORMAT(libtrace)->burst_size = 0;
     820        FORMAT(libtrace)->burst_offset = 0;
     821
     822        /* Make our first stream */
     823        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
     824        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
     825
     826        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     827                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     828                free(libtrace->format_data);
     829                libtrace->format_data = NULL;
     830                return -1;
     831        }
     832        return 0;
     833}
    655834
    656835static int dpdk_init_output(libtrace_out_t *libtrace)
    657836{
    658     char err[500];
    659     err[0] = 0;
    660    
    661     libtrace->format_data = (struct dpdk_format_data_t *)
    662                             malloc(sizeof(struct dpdk_format_data_t));
    663     FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    664     FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
    665     FORMAT(libtrace)->nb_ports = 0;
    666     FORMAT(libtrace)->snaplen = 0; /* Use default */
    667     FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
    668     FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
    669     FORMAT(libtrace)->promisc = -1;
    670     FORMAT(libtrace)->pktmbuf_pool = NULL;
     837        char err[500];
     838        err[0] = 0;
     839
     840        libtrace->format_data = (struct dpdk_format_data_t *)
     841                                malloc(sizeof(struct dpdk_format_data_t));
     842        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
     843        FORMAT(libtrace)->nb_ports = 0;
     844        FORMAT(libtrace)->snaplen = 0; /* Use default */
     845        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
     846        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
     847        FORMAT(libtrace)->nic_numa_node = -1;
     848        FORMAT(libtrace)->promisc = -1;
     849        FORMAT(libtrace)->pktmbuf_pool = NULL;
    671850#if DPDK_USE_BLACKLIST
    672     FORMAT(libtrace)->nb_blacklist = 0;
    673 #endif
    674     FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    675     FORMAT(libtrace)->mempool_name[0] = 0;
    676 #if HAS_HW_TIMESTAMPS_82580
    677     FORMAT(libtrace)->ts_first_sys = 0;
    678     FORMAT(libtrace)->ts_last_sys = 0;
    679     FORMAT(libtrace)->wrap_count = 0;
    680 #endif
    681 
    682     if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    683         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    684         free(libtrace->format_data);
    685         libtrace->format_data = NULL;
    686         return -1;
    687     }
    688     return 0;
    689 };
     851        FORMAT(libtrace)->nb_blacklist = 0;
     852#endif
     853        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
     854        FORMAT(libtrace)->mempool_name[0] = 0;
     855        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     856        FORMAT(libtrace)->burst_size = 0;
     857        FORMAT(libtrace)->burst_offset = 0;
     858
     859        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     860                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     861                free(libtrace->format_data);
     862                libtrace->format_data = NULL;
     863                return -1;
     864        }
     865        return 0;
     866}
    690867
    691868/**
    692869 * Note here snaplen excludes the MAC checksum. Packets over
    693870 * the requested snaplen will be dropped. (Excluding MAC checksum)
    694  * 
     871 *
    695872 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
    696873 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
    697874 * is set the maximum size of the returned packet would be 1518 otherwise
    698875 * 1514 would be the largest size possibly returned.
    699  * 
     876 *
    700877 */
    701878static int dpdk_config_input (libtrace_t *libtrace,
    702                                         trace_option_t option,
    703                                         void *data) {
    704     switch (option) {
    705         case TRACE_OPTION_SNAPLEN:
    706             /* Only support changing snaplen before a call to start is
    707              * made */
    708             if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
    709                 FORMAT(libtrace)->snaplen=*(int*)data;
    710             else
    711                 return -1;
    712             return 0;
    713                 case TRACE_OPTION_PROMISC:
    714                         FORMAT(libtrace)->promisc=*(int*)data;
    715             return 0;
    716         case TRACE_OPTION_FILTER:
    717             /* TODO filtering */
    718             break;
    719         case TRACE_OPTION_META_FREQ:
    720             break;
    721         case TRACE_OPTION_EVENT_REALTIME:
    722             break;
    723         /* Avoid default: so that future options will cause a warning
    724          * here to remind us to implement it, or flag it as
    725          * unimplementable
    726          */
    727     }
     879                              trace_option_t option,
     880                              void *data) {
     881        switch (option) {
     882        case TRACE_OPTION_SNAPLEN:
     883                /* Only support changing snaplen before a call to start is
     884                 * made */
     885                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
     886                        FORMAT(libtrace)->snaplen=*(int*)data;
     887                else
     888                        return -1;
     889                return 0;
     890        case TRACE_OPTION_PROMISC:
     891                FORMAT(libtrace)->promisc=*(int*)data;
     892                return 0;
     893        case TRACE_OPTION_HASHER:
     894                switch (*((enum hasher_types *) data))
     895                {
     896                case HASHER_BALANCE:
     897                case HASHER_UNIDIRECTIONAL:
     898                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     899                        return 0;
     900                case HASHER_BIDIRECTIONAL:
     901                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     902                        return 0;
     903                case HASHER_CUSTOM:
     904                        // We don't support these
     905                        return -1;
     906                }
     907                break;
     908        case TRACE_OPTION_FILTER:
     909                /* TODO filtering */
     910        case TRACE_OPTION_META_FREQ:
     911        case TRACE_OPTION_EVENT_REALTIME:
     912                break;
     913        /* Avoid default: so that future options will cause a warning
     914         * here to remind us to implement it, or flag it as
     915         * unimplementable
     916         */
     917        }
    728918
    729919        /* Don't set an error - trace_config will try to deal with the
    730920         * option and will set an error if it fails */
    731     return -1;
     921        return -1;
    732922}
    733923
    734924/* Can set jumbo frames/ or limit the size of a frame by setting both
    735925 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
    736  * 
     926 *
    737927 */
    738928static struct rte_eth_conf port_conf = {
    739929        .rxmode = {
     930                .mq_mode = ETH_RSS,
    740931                .split_hdr_size = 0,
    741932                .header_split   = 0, /**< Header Split disabled */
     
    743934                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
    744935                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
    745         .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
     936                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
    746937#if GET_MAC_CRC_CHECKSUM
    747938/* So it appears that if hw_strip_crc is turned off the driver will still
     
    756947 * always cut off the checksum in the future
    757948 */
    758         .hw_strip_crc   = 1, /**< CRC stripped by hardware */
     949                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
    759950#endif
    760951        },
     
    762953                .mq_mode = ETH_DCB_NONE,
    763954        },
     955        .rx_adv_conf = {
     956                .rss_conf = {
     957                        // .rss_key = &rss_key, // We set this per format
     958                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     959                },
     960        },
     961        .intr_conf = {
     962                .lsc = 1
     963        }
    764964};
    765965
     
    770970                .wthresh = 4,/* RX_WTHRESH writeback */
    771971        },
    772     .rx_free_thresh = 0,
    773     .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
     972        .rx_free_thresh = 0,
     973        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
    774974};
    775975
    776976static const struct rte_eth_txconf tx_conf = {
    777977        .tx_thresh = {
    778         /**
    779         * TX_PTHRESH prefetch
    780         * Set on the NIC, if the number of unprocessed descriptors to queued on
    781         * the card fall below this try grab at least hthresh more unprocessed
    782         * descriptors.
    783         */
     978                /*
     979                * TX_PTHRESH prefetch
     980                * Set on the NIC, if the number of unprocessed descriptors to queued on
     981                * the card fall below this try grab at least hthresh more unprocessed
     982                * descriptors.
     983                */
    784984                .pthresh = 36,
    785985
    786         /* TX_HTHRESH host
    787         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
    788         */
     986                /* TX_HTHRESH host
     987                * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
     988                */
    789989                .hthresh = 0,
    790        
    791         /* TX_WTHRESH writeback
    792         * Set on the NIC, the number of sent descriptors before writing back
    793         * status to confirm the transmission. This is done more efficiently as
    794         * a bulk DMA-transfer rather than writing one at a time.
    795         * Similar to tx_free_thresh however this is applied to the NIC, where
    796         * as tx_free_thresh is when DPDK will check these. This is extended
    797         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
    798         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
    799         */
     990
     991                /* TX_WTHRESH writeback
     992                * Set on the NIC, the number of sent descriptors before writing back
     993                * status to confirm the transmission. This is done more efficiently as
     994                * a bulk DMA-transfer rather than writing one at a time.
     995                * Similar to tx_free_thresh however this is applied to the NIC, where
     996                * as tx_free_thresh is when DPDK will check these. This is extended
     997                * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
     998                * descriptors rather only every n'th item, reducing DMA memory bandwidth.
     999                */
    8001000                .wthresh = 4,
    8011001        },
    8021002
    803     /* Used internally by DPDK rather than passed to the NIC. The number of
    804     * packet descriptors to send before checking for any responses written
    805     * back (to confirm the transmission). Default = 32 if set to 0)
    806     */
     1003        /* Used internally by DPDK rather than passed to the NIC. The number of
     1004        * packet descriptors to send before checking for any responses written
     1005        * back (to confirm the transmission). Default = 32 if set to 0)
     1006        */
    8071007        .tx_free_thresh = 0,
    8081008
    809     /* This is the Report Status threshold, used by 10Gbit cards,
    810      * This signals the card to only write back status (such as
    811     * transmission successful) after this minimum number of transmit
    812     * descriptors are seen. The default is 32 (if set to 0) however if set
    813     * to greater than 1 TX wthresh must be set to zero, because this is kindof
    814     * a replacement. See the dpdk programmers guide for more restrictions.
    815     */
     1009        /* This is the Report Status threshold, used by 10Gbit cards,
     1010         * This signals the card to only write back status (such as
     1011        * transmission successful) after this minimum number of transmit
     1012        * descriptors are seen. The default is 32 (if set to 0) however if set
     1013        * to greater than 1 TX wthresh must be set to zero, because this is kindof
     1014        * a replacement. See the dpdk programmers guide for more restrictions.
     1015        */
    8161016        .tx_rs_thresh = 1,
    8171017};
    8181018
    819 /* Attach memory to the port and start the port or restart the port.
    820  */
    821 static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
    822     int ret; /* Check return values for errors */
    823     struct rte_eth_link link_info; /* Wait for link */
    824    
    825     /* Already started */
    826     if (format_data->paused == DPDK_RUNNING)
    827         return 0;
    828 
    829     /* First time started we need to alloc our memory, doing this here
    830      * rather than in environment setup because we don't have snaplen then */
    831     if (format_data->paused == DPDK_NEVER_STARTED) {
    832         if (format_data->snaplen == 0) {
    833             format_data->snaplen = RX_MBUF_SIZE;
    834             port_conf.rxmode.jumbo_frame = 0;
    835             port_conf.rxmode.max_rx_pkt_len = 0;
    836         } else {
    837             /* Use jumbo frames */
    838             port_conf.rxmode.jumbo_frame = 1;
    839             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    840         }
    841 
    842         /* This is additional overhead so make sure we allow space for this */
     1019/**
     1020 * A callback for a link state change (LSC).
     1021 *
     1022 * Packets may be received before this notification. In fact the DPDK IGXBE
     1023 * driver likes to put a delay upto 5sec before sending this.
     1024 *
     1025 * We use this to ensure the link speed is correct for our timestamp
     1026 * calculations. Because packets might be received before the link up we still
     1027 * update this when the packet is received.
     1028 *
     1029 * @param port The DPDK port
     1030 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
     1031 * @param cb_arg The dpdk_format_data_t structure associated with the format
     1032 */
     1033static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
     1034                              void *cb_arg) {
     1035        struct dpdk_format_data_t * format_data = cb_arg;
     1036        struct rte_eth_link link_info;
     1037        assert(event == RTE_ETH_EVENT_INTR_LSC);
     1038        assert(port == format_data->port);
     1039
     1040        rte_eth_link_get_nowait(port, &link_info);
     1041
     1042        if (link_info.link_status)
     1043                format_data->link_speed = link_info.link_speed;
     1044        else
     1045                format_data->link_speed = 0;
     1046
     1047#if DEBUG
     1048        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
     1049                link_info.link_status ? "up" : "down",
     1050                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
     1051                                          "full-duplex" : "half-duplex",
     1052                (int) link_info.link_speed);
     1053#endif
     1054
     1055        /* Turns out DPDK drivers might not come back up if the link speed
     1056         * changes. So we reset the autoneg procedure. This is very unsafe
     1057         * we have have threads reading packets and we stop the port. */
     1058#if 0
     1059        if (!link_info.link_status) {
     1060                int ret;
     1061                rte_eth_dev_stop(port);
     1062                ret = rte_eth_dev_start(port);
     1063                if (ret < 0) {
     1064                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
     1065                                strerror(-ret));
     1066                }
     1067        }
     1068#endif
     1069}
     1070
     1071/** Reserve a DPDK lcore ID for a thread globally.
     1072 *
     1073 * @param real If true allocate a real lcore, otherwise allocate a core which
     1074 * does not exist on the local machine.
     1075 * @param socket the prefered NUMA socket - only used if a real core is requested
     1076 * @return a valid core, which can later be used with dpdk_register_lcore() or a
     1077 * -1 if have run out of cores.
     1078 *
     1079 * If any thread is reading or freeing packets we need to register it here
     1080 * due to TLS caches in the memory pools.
     1081 */
     1082static int dpdk_reserve_lcore(bool real, int socket) {
     1083        int new_id = -1;
     1084        int i;
     1085        struct rte_config *cfg = rte_eal_get_configuration();
     1086
     1087        pthread_mutex_lock(&dpdk_lock);
     1088        /* If 'reading packets' fill in cores from 0 up and bind affinity
     1089         * otherwise start from the MAX core (which is also the master) and work backwards
     1090         * in this case physical cores on the system will not exist so we don't bind
     1091         * these to any particular physical core */
     1092        if (real) {
     1093#if HAVE_LIBNUMA
     1094                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1095                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
     1096                                new_id = i;
     1097                                if (!lcore_config[i].detected)
     1098                                        new_id = -1;
     1099                                break;
     1100                        }
     1101                }
     1102#endif
     1103                /* Retry without the the numa restriction */
     1104                if (new_id == -1) {
     1105                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1106                                if (!rte_lcore_is_enabled(i)) {
     1107                                        new_id = i;
     1108                                        if (!lcore_config[i].detected)
     1109                                                fprintf(stderr, "Warning the"
     1110                                                        " number of 'reading' "
     1111                                                        "threads exceed cores\n");
     1112                                        break;
     1113                                }
     1114                        }
     1115                }
     1116        } else {
     1117                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1118                        if (!rte_lcore_is_enabled(i)) {
     1119                                new_id = i;
     1120                                break;
     1121                        }
     1122                }
     1123        }
     1124
     1125        if (new_id != -1) {
     1126                /* Enable the core in global DPDK structs */
     1127                cfg->lcore_role[new_id] = ROLE_RTE;
     1128                cfg->lcore_count++;
     1129        }
     1130
     1131        pthread_mutex_unlock(&dpdk_lock);
     1132        return new_id;
     1133}
     1134
     1135/** Register a thread as a lcore
     1136 * @param libtrace any error is set against libtrace on exit
     1137 * @param real If this is a true lcore we will bind its affinty to the
     1138 * requested core.
     1139 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
     1140 * @return 0, if successful otherwise -1 if an error occured (details are stored
     1141 * in libtrace)
     1142 *
     1143 * @note This must be called from the thread being registered.
     1144 */
     1145static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
     1146        int ret;
     1147        RTE_PER_LCORE(_lcore_id) = lcore;
     1148
     1149        /* Set affinity bind to corresponding core */
     1150        if (real) {
     1151                cpu_set_t cpuset;
     1152                CPU_ZERO(&cpuset);
     1153                CPU_SET(rte_lcore_id(), &cpuset);
     1154                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1155                if (ret != 0) {
     1156                        trace_set_err(libtrace, errno, "Warning "
     1157                                      "pthread_setaffinity_np failed");
     1158                        return -1;
     1159                }
     1160        }
     1161
     1162        return 0;
     1163}
     1164
     1165/** Allocates a new dpdk packet buffer memory pool.
     1166 *
     1167 * @param n The number of threads
     1168 * @param pkt_size The packet size we need ot store
     1169 * @param socket_id The NUMA socket id
     1170 * @param A new mempool, if NULL query the DPDK library for the error code
     1171 * see rte_mempool_create() documentation.
     1172 *
     1173 * This allocates a new pool or recycles an existing memory pool.
     1174 * Call dpdk_free_memory() to free the memory.
     1175 * We cannot delete memory so instead we store the pools, allowing them to be
     1176 * re-used.
     1177 */
     1178static struct rte_mempool *dpdk_alloc_memory(unsigned n,
     1179                                             unsigned pkt_size,
     1180                                             int socket_id) {
     1181        struct rte_mempool *ret;
     1182        size_t j,k;
     1183        char name[MEMPOOL_NAME_LEN];
     1184
     1185        /* Add on packet size overheads */
     1186        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1187
     1188        pthread_mutex_lock(&dpdk_lock);
     1189
     1190        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
     1191                /* Best guess go for zero */
     1192                socket_id = 0;
     1193        }
     1194
     1195        /* Find a valid pool */
     1196        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
     1197                if (mem_pools[socket_id][j]->size >= n &&
     1198                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
     1199                        break;
     1200                }
     1201        }
     1202
     1203        /* Find the end (+1) of the list */
     1204        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
     1205
     1206        if (mem_pools[socket_id][j]) {
     1207                ret = mem_pools[socket_id][j];
     1208                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
     1209                mem_pools[socket_id][k-1] = NULL;
     1210                mem_pools[socket_id][j] = NULL;
     1211        } else {
     1212                static uint32_t test = 10;
     1213                test++;
     1214                snprintf(name, MEMPOOL_NAME_LEN,
     1215                         "libtrace_pool_%"PRIu32, test);
     1216
     1217                ret = rte_mempool_create(name, n, pkt_size,
     1218                                         128, sizeof(struct rte_pktmbuf_pool_private),
     1219                                         rte_pktmbuf_pool_init, NULL,
     1220                                         rte_pktmbuf_init, NULL,
     1221                                         socket_id, 0);
     1222        }
     1223
     1224        pthread_mutex_unlock(&dpdk_lock);
     1225        return ret;
     1226}
     1227
     1228/** Stores the memory against the DPDK library.
     1229 *
     1230 * @param mempool The mempool to free
     1231 * @param socket_id The NUMA socket this mempool was allocated upon.
     1232 *
     1233 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
     1234 * store the memory shared globally against the format.
     1235 */
     1236static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
     1237        size_t i;
     1238        pthread_mutex_lock(&dpdk_lock);
     1239
     1240        /* We should have all entries back in the mempool */
     1241        rte_mempool_audit(mempool);
     1242        if (!rte_mempool_full(mempool)) {
     1243                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
     1244                        "free all packets before finishing a trace\n",
     1245                        rte_mempool_count(mempool), mempool->size);
     1246        }
     1247
     1248        /* Find the end (+1) of the list */
     1249        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
     1250
     1251        if (i >= RTE_MAX_LCORE) {
     1252                fprintf(stderr, "Too many memory pools, dropping this one\n");
     1253        } else {
     1254                mem_pools[socket_id][i] = mempool;
     1255        }
     1256
     1257        pthread_mutex_unlock(&dpdk_lock);
     1258}
     1259
     1260/* Attach memory to the port and start (or restart) the port/s.
     1261 */
     1262static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
     1263                              char *err, int errlen, uint16_t rx_queues) {
     1264        int ret, i;
     1265        struct rte_eth_link link_info; /* Wait for link */
     1266        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
     1267
     1268        /* Already started */
     1269        if (format_data->paused == DPDK_RUNNING)
     1270                return 0;
     1271
     1272        /* First time started we need to alloc our memory, doing this here
     1273         * rather than in environment setup because we don't have snaplen then */
     1274        if (format_data->paused == DPDK_NEVER_STARTED) {
     1275                if (format_data->snaplen == 0) {
     1276                        format_data->snaplen = RX_MBUF_SIZE;
     1277                        port_conf.rxmode.jumbo_frame = 0;
     1278                        port_conf.rxmode.max_rx_pkt_len = 0;
     1279                } else {
     1280                        /* Use jumbo frames */
     1281                        port_conf.rxmode.jumbo_frame = 1;
     1282                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1283                }
     1284
    8431285#if GET_MAC_CRC_CHECKSUM
    844         format_data->snaplen += ETHER_CRC_LEN;
     1286                /* This is additional overhead so make sure we allow space for this */
     1287                format_data->snaplen += ETHER_CRC_LEN;
    8451288#endif
    8461289#if HAS_HW_TIMESTAMPS_82580
    847         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    848 #endif
    849 
    850         /* Create the mbuf pool, which is the place our packets are allocated
    851          * from - TODO figure out if there is is a free function (I cannot see one)
    852          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    853          * allocate however that extra 1 packet is not used.
    854         * (I assume <= vs < error some where in DPDK code)
    855          * TX requires nb_tx_buffers + 1 in the case the queue is full
    856         * so that will fill the new buffer and wait until slots in the
    857         * ring become available.
    858         */
     1290                format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1291#endif
     1292
     1293                /* Create the mbuf pool, which is the place packets are allocated
     1294                 * from - There is no free function (I cannot see one).
     1295                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1296                 * allocate however that extra 1 packet is not used.
     1297                * (I assume <= vs < error some where in DPDK code)
     1298                 * TX requires nb_tx_buffers + 1 in the case the queue is full
     1299                * so that will fill the new buffer and wait until slots in the
     1300                * ring become available.
     1301                */
    8591302#if DEBUG
    860     fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    861 #endif
    862         format_data->pktmbuf_pool =
    863             rte_mempool_create(format_data->mempool_name,
    864                        format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
    865                        format_data->snaplen + sizeof(struct rte_mbuf)
    866                                         + RTE_PKTMBUF_HEADROOM,
    867                        8, sizeof(struct rte_pktmbuf_pool_private),
    868                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    869                        rte_socket_id(), MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
    870 
    871         if (format_data->pktmbuf_pool == NULL) {
    872             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
    873                         "pool failed: %s", strerror(rte_errno));
    874             return -1;
    875         }
    876     }
    877    
    878     /* ----------- Now do the setup for the port mapping ------------ */
    879     /* Order of calls must be
    880      * rte_eth_dev_configure()
    881      * rte_eth_tx_queue_setup()
    882      * rte_eth_rx_queue_setup()
    883      * rte_eth_dev_start()
    884      * other rte_eth calls
    885      */
    886    
    887     /* This must be called first before another *eth* function
    888      * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    889     ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
    890     if (ret < 0) {
    891         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    892                             " %"PRIu8" : %s", format_data->port,
    893                             strerror(-ret));
    894         return -1;
    895     }
    896     /* Initialise the TX queue a minimum value if using this port for
    897      * receiving. Otherwise a larger size if writing packets.
    898      */
    899     ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    900                         format_data->nb_tx_buf, rte_socket_id(), &tx_conf);
    901     if (ret < 0) {
    902         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    903                             " %"PRIu8" : %s", format_data->port,
    904                             strerror(-ret));
    905         return -1;
    906     }
    907     /* Initialise the RX queue with some packets from memory */
    908     ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
    909                             format_data->nb_rx_buf, rte_socket_id(),
    910                             &rx_conf, format_data->pktmbuf_pool);
    911     if (ret < 0) {
    912         snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
    913                     " %"PRIu8" : %s", format_data->port,
    914                     strerror(-ret));
    915         return -1;
    916     }
    917    
    918     /* Start device */
    919     ret = rte_eth_dev_start(format_data->port);
    920     if (ret < 0) {
    921         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    922                     strerror(-ret));
    923         return -1;
    924     }
    925 
    926     /* Default promiscuous to on */
    927     if (format_data->promisc == -1)
    928         format_data->promisc = 1;
    929    
    930     if (format_data->promisc == 1)
    931         rte_eth_promiscuous_enable(format_data->port);
    932     else
    933         rte_eth_promiscuous_disable(format_data->port);
    934    
    935     /* Wait for the link to come up */
    936     rte_eth_link_get(format_data->port, &link_info);
     1303                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
     1304#endif
     1305                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
     1306                                                              format_data->snaplen,
     1307                                                              format_data->nic_numa_node);
     1308
     1309                if (format_data->pktmbuf_pool == NULL) {
     1310                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1311                                 "pool failed: %s", strerror(rte_errno));
     1312                        return -1;
     1313                }
     1314        }
     1315
     1316        /* ----------- Now do the setup for the port mapping ------------ */
     1317        /* Order of calls must be
     1318         * rte_eth_dev_configure()
     1319         * rte_eth_tx_queue_setup()
     1320         * rte_eth_rx_queue_setup()
     1321         * rte_eth_dev_start()
     1322         * other rte_eth calls
     1323         */
     1324
     1325        /* This must be called first before another *eth* function
     1326         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
     1327        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     1328        if (ret < 0) {
     1329                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1330                         " %"PRIu8" : %s", format_data->port,
     1331                         strerror(-ret));
     1332                return -1;
     1333        }
    9371334#if DEBUG
    938     fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    939             (int) link_info.link_duplex, (int) link_info.link_speed);
    940 #endif
    941 
    942     /* We have now successfully started/unpaused */
    943     format_data->paused = DPDK_RUNNING;
    944    
    945     return 0;
     1335        fprintf(stderr, "Doing dev configure\n");
     1336#endif
     1337        /* Initialise the TX queue a minimum value if using this port for
     1338         * receiving. Otherwise a larger size if writing packets.
     1339         */
     1340        ret = rte_eth_tx_queue_setup(format_data->port,
     1341                                     0 /* queue XXX */,
     1342                                     format_data->nb_tx_buf,
     1343                                     SOCKET_ID_ANY,
     1344                                     &tx_conf);
     1345        if (ret < 0) {
     1346                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
     1347                         " on port %"PRIu8" : %s", format_data->port,
     1348                         strerror(-ret));
     1349                return -1;
     1350        }
     1351
     1352        /* Attach memory to our RX queues */
     1353        for (i=0; i < rx_queues; i++) {
     1354                dpdk_per_stream_t *stream;
     1355#if DEBUG
     1356                fprintf(stderr, "Configuring queue %d\n", i);
     1357#endif
     1358
     1359                /* Add storage for the stream */
     1360                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
     1361                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
     1362                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
     1363                stream->queue_id = i;
     1364
     1365                if (stream->lcore == -1)
     1366                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
     1367
     1368                if (stream->lcore == -1) {
     1369                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
     1370                                 ". Too many threads?");
     1371                        return -1;
     1372                }
     1373
     1374                if (stream->mempool == NULL) {
     1375                        stream->mempool = dpdk_alloc_memory(
     1376                                                  format_data->nb_rx_buf*2,
     1377                                                  format_data->snaplen,
     1378                                                  rte_lcore_to_socket_id(stream->lcore));
     1379
     1380                        if (stream->mempool == NULL) {
     1381                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1382                                         "pool failed: %s", strerror(rte_errno));
     1383                                return -1;
     1384                        }
     1385                }
     1386
     1387                /* Initialise the RX queue with some packets from memory */
     1388                ret = rte_eth_rx_queue_setup(format_data->port,
     1389                                             stream->queue_id,
     1390                                             format_data->nb_rx_buf,
     1391                                             format_data->nic_numa_node,
     1392                                             &rx_conf,
     1393                                             stream->mempool);
     1394                if (ret < 0) {
     1395                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
     1396                                 " RX queue on port %"PRIu8" : %s",
     1397                                 format_data->port,
     1398                                 strerror(-ret));
     1399                        return -1;
     1400                }
     1401        }
     1402
     1403#if DEBUG
     1404        fprintf(stderr, "Doing start device\n");
     1405#endif
     1406        rte_eth_stats_reset(format_data->port);
     1407        /* Start device */
     1408        ret = rte_eth_dev_start(format_data->port);
     1409        if (ret < 0) {
     1410                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1411                         strerror(-ret));
     1412                return -1;
     1413        }
     1414
     1415        /* Default promiscuous to on */
     1416        if (format_data->promisc == -1)
     1417                format_data->promisc = 1;
     1418
     1419        if (format_data->promisc == 1)
     1420                rte_eth_promiscuous_enable(format_data->port);
     1421        else
     1422                rte_eth_promiscuous_disable(format_data->port);
     1423
     1424        /* We have now successfully started/unpased */
     1425        format_data->paused = DPDK_RUNNING;
     1426
     1427
     1428        /* Register a callback for link state changes */
     1429        ret = rte_eth_dev_callback_register(format_data->port,
     1430                                            RTE_ETH_EVENT_INTR_LSC,
     1431                                            dpdk_lsc_callback,
     1432                                            format_data);
     1433#if DEBUG
     1434        if (ret)
     1435                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
     1436                        ret, strerror(-ret));
     1437#endif
     1438
     1439        /* Get the current link status */
     1440        rte_eth_link_get_nowait(format_data->port, &link_info);
     1441        format_data->link_speed = link_info.link_speed;
     1442#if DEBUG
     1443        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
     1444                (int) link_info.link_duplex, (int) link_info.link_speed);
     1445#endif
     1446
     1447        return 0;
    9461448}
    9471449
    9481450static int dpdk_start_input (libtrace_t *libtrace) {
    949     char err[500];
    950     err[0] = 0;
    951 
    952     if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    953         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    954         free(libtrace->format_data);
    955         libtrace->format_data = NULL;
    956         return -1;
    957     }
    958     return 0;
     1451        char err[500];
     1452        err[0] = 0;
     1453
     1454        /* Make sure we don't reserve an extra thread for this */
     1455        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
     1456
     1457        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
     1458                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1459                free(libtrace->format_data);
     1460                libtrace->format_data = NULL;
     1461                return -1;
     1462        }
     1463        return 0;
     1464}
     1465
     1466static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
     1467        struct rte_eth_dev_info dev_info;
     1468        rte_eth_dev_info_get(port_id, &dev_info);
     1469        return dev_info.max_rx_queues;
     1470}
     1471
     1472static inline size_t dpdk_processor_count () {
     1473        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1474        if (nb_cpu <= 0)
     1475                return 1;
     1476        else
     1477                return (size_t) nb_cpu;
     1478}
     1479
     1480static int dpdk_pstart_input (libtrace_t *libtrace) {
     1481        char err[500];
     1482        int i=0, phys_cores=0;
     1483        int tot = libtrace->perpkt_thread_count;
     1484        libtrace_list_node_t *n;
     1485        err[0] = 0;
     1486
     1487        if (rte_lcore_id() != rte_get_master_lcore())
     1488                fprintf(stderr, "Warning dpdk_pstart_input should be called"
     1489                        " from the master DPDK thread!\n");
     1490
     1491        /* If the master is not on the last thread we move it there */
     1492        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1493                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
     1494                        return -1;
     1495        }
     1496
     1497        /* Don't exceed the number of cores in the system/detected by dpdk
     1498         * We don't have to force this but performance wont be good if we don't */
     1499        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1500                if (lcore_config[i].detected) {
     1501                        if (rte_lcore_is_enabled(i)) {
     1502#if DEBUG
     1503                                fprintf(stderr, "Found core %d already in use!\n", i);
     1504#endif
     1505                        } else {
     1506                                phys_cores++;
     1507                        }
     1508                }
     1509        }
     1510        /* If we are restarting we have already allocated some threads as such
     1511         * we add these back to the count for this calculation */
     1512        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
     1513                dpdk_per_stream_t * stream = n->data;
     1514                if (stream->lcore != -1)
     1515                        phys_cores++;
     1516        }
     1517
     1518        tot = MIN(libtrace->perpkt_thread_count,
     1519                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1520        tot = MIN(tot, phys_cores);
     1521
     1522#if DEBUG
     1523        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
     1524                libtrace->perpkt_thread_count, phys_cores);
     1525#endif
     1526
     1527        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1528                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1529                free(libtrace->format_data);
     1530                libtrace->format_data = NULL;
     1531                return -1;
     1532        }
     1533
     1534        /* Make sure we only start the number that we should */
     1535        libtrace->perpkt_thread_count = tot;
     1536        return 0;
     1537}
     1538
     1539/**
     1540 * Register a thread with the DPDK system,
     1541 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1542 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1543 * gives it.
     1544 *
     1545 * We then allow a mapper thread to be started on every real core as DPDK would,
     1546 * we also bind these to the corresponding CPU cores.
     1547 *
     1548 * @param libtrace A pointer to the trace
     1549 * @param reading True if the thread will be used to read packets, i.e. will
     1550 *                call pread_packet(), false if thread used to process packet
     1551 *                in any other manner including statistics functions.
     1552 */
     1553static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1554{
     1555#if DEBUG
     1556        char name[99];
     1557        name[0] = 0;
     1558#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
     1559        pthread_getname_np(pthread_self(),
     1560                           name, sizeof(name));
     1561#endif
     1562#endif
     1563        if (reading) {
     1564                dpdk_per_stream_t *stream;
     1565                /* Attach our thread */
     1566                if(t->type == THREAD_PERPKT) {
     1567                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
     1568                        if (t->format_data == NULL) {
     1569                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     1570                                              "Too many threads registered");
     1571                                return -1;
     1572                        }
     1573                } else {
     1574                        t->format_data = FORMAT_DATA_FIRST(libtrace);
     1575                }
     1576                stream = t->format_data;
     1577#if DEBUG
     1578                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
     1579#endif
     1580                return dpdk_register_lcore(libtrace, true, stream->lcore);
     1581        } else {
     1582                int lcore = dpdk_reserve_lcore(reading, 0);
     1583                if (lcore == -1) {
     1584                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
     1585                                      " for DPDK");
     1586                        return -1;
     1587                }
     1588#if DEBUG
     1589                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
     1590#endif
     1591                return dpdk_register_lcore(libtrace, false, lcore);
     1592        }
     1593
     1594        return 0;
     1595}
     1596
     1597/**
     1598 * Unregister a thread with the DPDK system.
     1599 *
     1600 * Only previously registered threads should be calling this just before
     1601 * they are destroyed.
     1602 */
     1603static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
     1604{
     1605        struct rte_config *cfg = rte_eal_get_configuration();
     1606
     1607        assert(rte_lcore_id() < RTE_MAX_LCORE);
     1608        pthread_mutex_lock(&dpdk_lock);
     1609        /* Skip if master */
     1610        if (rte_lcore_id() == rte_get_master_lcore()) {
     1611                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1612                pthread_mutex_unlock(&dpdk_lock);
     1613                return;
     1614        }
     1615
     1616        /* Disable this core in global DPDK structs */
     1617        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1618        cfg->lcore_count--;
     1619        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1620        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1621        pthread_mutex_unlock(&dpdk_lock);
     1622        return;
    9591623}
    9601624
    9611625static int dpdk_start_output(libtrace_out_t *libtrace)
    9621626{
    963     char err[500];
    964     err[0] = 0;
    965    
    966     if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    967         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    968         free(libtrace->format_data);
    969         libtrace->format_data = NULL;
    970         return -1;
    971     }
    972     return 0;
    973 }
    974 
    975 static int dpdk_pause_input(libtrace_t * libtrace){
    976     /* This stops the device, but can be restarted using rte_eth_dev_start() */
    977     if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
    978 #if DEBUG     
    979         fprintf(stderr, "Pausing port\n");
    980 #endif
    981         rte_eth_dev_stop(FORMAT(libtrace)->port);
    982         FORMAT(libtrace)->paused = DPDK_PAUSED;
    983         /* If we pause it the driver will be reset and likely our counter */
     1627        char err[500];
     1628        err[0] = 0;
     1629
     1630        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
     1631                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1632                free(libtrace->format_data);
     1633                libtrace->format_data = NULL;
     1634                return -1;
     1635        }
     1636        return 0;
     1637}
     1638
     1639static int dpdk_pause_input(libtrace_t * libtrace) {
     1640        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
     1641        /* This stops the device, but can be restarted using rte_eth_dev_start() */
     1642        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
     1643#if DEBUG
     1644                fprintf(stderr, "Pausing DPDK port\n");
     1645#endif
     1646                rte_eth_dev_stop(FORMAT(libtrace)->port);
     1647                FORMAT(libtrace)->paused = DPDK_PAUSED;
     1648                /* Empty the queue of packets */
     1649                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
     1650                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
     1651                }
     1652                FORMAT(libtrace)->burst_offset = 0;
     1653                FORMAT(libtrace)->burst_size = 0;
     1654
     1655                for (; tmp != NULL; tmp = tmp->next) {
     1656                        dpdk_per_stream_t *stream = tmp->data;
     1657                        stream->ts_last_sys = 0;
    9841658#if HAS_HW_TIMESTAMPS_82580
    985         FORMAT(libtrace)->ts_first_sys = 0;
    986         FORMAT(libtrace)->ts_last_sys = 0;
    987 #endif
    988     }
    989     return 0;
    990 }
    991 
    992 static int dpdk_write_packet(libtrace_out_t *trace,
    993                 libtrace_packet_t *packet){
    994     struct rte_mbuf* m_buff[1];
    995    
    996     int wirelen = trace_get_wire_length(packet);
    997     int caplen = trace_get_capture_length(packet);
    998    
    999     /* Check for a checksum and remove it */
    1000     if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
    1001                                             wirelen == caplen)
    1002         caplen -= ETHER_CRC_LEN;
    1003 
    1004     m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
    1005     if (m_buff[0] == NULL) {
    1006         trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
    1007         return -1;
    1008     } else {
    1009         int ret;
    1010         memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
    1011         do {
    1012             ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
    1013         } while (ret != 1);
    1014     }
    1015 
    1016     return 0;
     1659                        stream->ts_first_sys = 0;
     1660#endif
     1661                }
     1662
     1663        }
     1664        return 0;
     1665}
     1666
     1667static int dpdk_write_packet(libtrace_out_t *trace,
     1668                             libtrace_packet_t *packet){
     1669        struct rte_mbuf* m_buff[1];
     1670
     1671        int wirelen = trace_get_wire_length(packet);
     1672        int caplen = trace_get_capture_length(packet);
     1673
     1674        /* Check for a checksum and remove it */
     1675        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
     1676            wirelen == caplen)
     1677                caplen -= ETHER_CRC_LEN;
     1678
     1679        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
     1680        if (m_buff[0] == NULL) {
     1681                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
     1682                return -1;
     1683        } else {
     1684                int ret;
     1685                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
     1686                do {
     1687                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
     1688                } while (ret != 1);
     1689        }
     1690
     1691        return 0;
    10171692}
    10181693
    10191694static int dpdk_fin_input(libtrace_t * libtrace) {
    1020     /* Free our memory structures */
    1021     if (libtrace->format_data != NULL) {
    1022         /* Close the device completely, device cannot be restarted */
    1023         if (FORMAT(libtrace)->port != 0xFF)
    1024             rte_eth_dev_close(FORMAT(libtrace)->port);
    1025         /* filter here if we used it */
     1695        libtrace_list_node_t * n;
     1696        /* Free our memory structures */
     1697        if (libtrace->format_data != NULL) {
     1698
     1699                if (FORMAT(libtrace)->port != 0xFF)
     1700                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
     1701                                                        RTE_ETH_EVENT_INTR_LSC,
     1702                                                        dpdk_lsc_callback,
     1703                                                        FORMAT(libtrace));
     1704                /* Close the device completely, device cannot be restarted */
     1705                rte_eth_dev_close(FORMAT(libtrace)->port);
     1706
     1707                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
     1708                                 FORMAT(libtrace)->nic_numa_node);
     1709
     1710                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
     1711                        dpdk_per_stream_t * stream = n->data;
     1712                        if (stream->mempool)
     1713                                dpdk_free_memory(stream->mempool,
     1714                                                 rte_lcore_to_socket_id(stream->lcore));
     1715                }
     1716
     1717                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
     1718                /* filter here if we used it */
    10261719                free(libtrace->format_data);
    10271720        }
    10281721
    1029     /* Revert to the original PCI drivers */
    1030     /* No longer in DPDK
    1031     rte_eal_pci_exit(); */
    1032     return 0;
     1722        return 0;
    10331723}
    10341724
    10351725
    10361726static int dpdk_fin_output(libtrace_out_t * libtrace) {
    1037     /* Free our memory structures */
    1038     if (libtrace->format_data != NULL) {
    1039         /* Close the device completely, device cannot be restarted */
    1040         if (FORMAT(libtrace)->port != 0xFF)
    1041             rte_eth_dev_close(FORMAT(libtrace)->port);
    1042         /* filter here if we used it */
     1727        /* Free our memory structures */
     1728        if (libtrace->format_data != NULL) {
     1729                /* Close the device completely, device cannot be restarted */
     1730                if (FORMAT(libtrace)->port != 0xFF)
     1731                        rte_eth_dev_close(FORMAT(libtrace)->port);
     1732                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
     1733                /* filter here if we used it */
    10431734                free(libtrace->format_data);
    10441735        }
    10451736
    1046     /* Revert to the original PCI drivers */
    1047     /* No longer in DPDK
    1048     rte_eal_pci_exit(); */
    1049     return 0;
    1050 }
    1051 
    1052 /**
    1053  * Get the start of additional header that we added to a packet.
     1737        return 0;
     1738}
     1739
     1740/**
     1741 * Get the start of the additional header that we added to a packet.
    10541742 */
    10551743static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
    1056     uint8_t *hdrsize;
    1057     assert(packet);
    1058     assert(packet->buffer);
    1059     hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
    1060     /* The byte before the original packet data denotes the size in bytes
    1061      * of our additional header that we added sits before the 'size byte' */
    1062     hdrsize--;
    1063     return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
     1744        assert(packet);
     1745        assert(packet->buffer);
     1746        /* Our header sits straight after the mbuf header */
     1747        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
    10641748}
    10651749
    10661750static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
    1067     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1068     return hdr->cap_len;
     1751        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1752        return hdr->cap_len;
    10691753}
    10701754
    10711755static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
    1072     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1073     if (size > hdr->cap_len) {
    1074         /* Cannot make a packet bigger */
     1756        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1757        if (size > hdr->cap_len) {
     1758                /* Cannot make a packet bigger */
    10751759                return trace_get_capture_length(packet);
    10761760        }
    10771761
    1078     /* Reset the cached capture length first*/
    1079     packet->capture_length = -1;
    1080     hdr->cap_len = (uint32_t) size;
     1762        /* Reset the cached capture length first*/
     1763        packet->capture_length = -1;
     1764        hdr->cap_len = (uint32_t) size;
    10811765        return trace_get_capture_length(packet);
    10821766}
    10831767
    10841768static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
    1085     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1086     int org_cap_size; /* The original capture size */
    1087     if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
    1088         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1089                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
    1090                             sizeof(struct hw_timestamp_82580);
    1091     } else {
    1092         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1093                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
    1094     }
    1095     if (hdr->flags & INCLUDES_CHECKSUM) {
    1096         return org_cap_size;
    1097     } else {
    1098         /* DPDK packets are always TRACE_TYPE_ETH packets */
    1099         return org_cap_size + ETHER_CRC_LEN;
    1100     }
    1101 }
     1769        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1770        int org_cap_size; /* The original capture size */
     1771        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
     1772                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
     1773                               sizeof(struct hw_timestamp_82580);
     1774        } else {
     1775                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
     1776        }
     1777        if (hdr->flags & INCLUDES_CHECKSUM) {
     1778                return org_cap_size;
     1779        } else {
     1780                /* DPDK packets are always TRACE_TYPE_ETH packets */
     1781                return org_cap_size + ETHER_CRC_LEN;
     1782        }
     1783}
     1784
    11021785static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
    1103     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1104     if (hdr->flags & INCLUDES_HW_TIMESTAMP)
    1105         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
    1106                 sizeof(struct hw_timestamp_82580);
    1107     else
    1108         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1786        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1787        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
     1788                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
     1789                                sizeof(struct hw_timestamp_82580);
     1790        else
     1791                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
    11091792}
    11101793
    11111794static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
    1112                 libtrace_packet_t *packet, void *buffer,
    1113                 libtrace_rt_types_t rt_type, uint32_t flags) {
    1114     assert(packet);
    1115     if (packet->buffer != buffer &&
    1116         packet->buf_control == TRACE_CTRL_PACKET) {
    1117         free(packet->buffer);
    1118     }
    1119 
    1120     if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    1121         packet->buf_control = TRACE_CTRL_PACKET;
    1122     } else
    1123         packet->buf_control = TRACE_CTRL_EXTERNAL;
    1124 
    1125     packet->buffer = buffer;
    1126     packet->header = buffer;
    1127 
    1128     /* Don't use pktmbuf_mtod will fail if the packet is a copy */
    1129     packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
    1130     packet->type = rt_type;
    1131     return 0;
    1132 }
    1133 
    1134 /*
    1135  * Does any extra preperation to a captured packet.
    1136  * This includes adding our extra header to it with the timestamp
    1137  */
    1138 static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
    1139                                                         struct rte_mbuf* pkt){
    1140     uint8_t * hdr_size;
    1141     struct dpdk_addt_hdr *hdr;
     1795                               libtrace_packet_t *packet, void *buffer,
     1796                               libtrace_rt_types_t rt_type, uint32_t flags) {
     1797        assert(packet);
     1798        if (packet->buffer != buffer &&
     1799            packet->buf_control == TRACE_CTRL_PACKET) {
     1800                free(packet->buffer);
     1801        }
     1802
     1803        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
     1804                packet->buf_control = TRACE_CTRL_PACKET;
     1805        else
     1806                packet->buf_control = TRACE_CTRL_EXTERNAL;
     1807
     1808        packet->buffer = buffer;
     1809        packet->header = buffer;
     1810
     1811        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
     1812        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
     1813        packet->type = rt_type;
     1814        return 0;
     1815}
     1816
     1817/**
     1818 * Given a packet size and a link speed, computes the
     1819 * time to transmit in nanoseconds.
     1820 *
     1821 * @param format_data The dpdk format data from which we get the link speed
     1822 *        and if unset updates it in a thread safe manner
     1823 * @param pkt_size The size of the packet in bytes
     1824 * @return The wire time in nanoseconds
     1825 */
     1826static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
     1827        uint32_t wire_time;
     1828        /* 20 extra bytes of interframe gap and preamble */
     1829# if GET_MAC_CRC_CHECKSUM
     1830        wire_time = ((pkt_size + 20) * 8000);
     1831# else
     1832        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
     1833# endif
     1834
     1835        /* Division is really slow and introduces a pipeline stall
     1836         * The compiler will optimise this into magical multiplication and shifting
     1837         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
     1838         */
     1839retry_calc_wiretime:
     1840        switch (format_data->link_speed) {
     1841        case ETH_LINK_SPEED_40G:
     1842                wire_time /=  ETH_LINK_SPEED_40G;
     1843                break;
     1844        case ETH_LINK_SPEED_20G:
     1845                wire_time /= ETH_LINK_SPEED_20G;
     1846                break;
     1847        case ETH_LINK_SPEED_10G:
     1848                wire_time /= ETH_LINK_SPEED_10G;
     1849                break;
     1850        case ETH_LINK_SPEED_1000:
     1851                wire_time /= ETH_LINK_SPEED_1000;
     1852                break;
     1853        case 0:
     1854                {
     1855                /* Maybe the link was down originally, but now it should be up */
     1856                struct rte_eth_link link = {0};
     1857                rte_eth_link_get_nowait(format_data->port, &link);
     1858                if (link.link_status && link.link_speed) {
     1859                        format_data->link_speed = link.link_speed;
     1860#ifdef DEBUG
     1861                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
     1862#endif
     1863                        goto retry_calc_wiretime;
     1864                }
     1865                /* We don't know the link speed, make sure numbers are counting up */
     1866                wire_time = 1;
     1867                break;
     1868                }
     1869        default:
     1870                wire_time /= format_data->link_speed;
     1871        }
     1872        return wire_time;
     1873}
     1874
     1875/**
     1876 * Does any extra preperation to all captured packets
     1877 * This includes adding our extra header to it with the timestamp,
     1878 * and any snapping
     1879 *
     1880 * @param format_data The DPDK format data
     1881 * @param plc The DPDK per lcore format data
     1882 * @param pkts An array of size nb_pkts of DPDK packets
     1883 */
     1884static inline void dpdk_ready_pkts(libtrace_t *libtrace,
     1885                                   struct dpdk_per_stream_t *plc,
     1886                                   struct rte_mbuf **pkts,
     1887                                   size_t nb_pkts) {
     1888        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
     1889        struct dpdk_addt_hdr *hdr;
     1890        size_t i;
     1891        uint64_t cur_sys_time_ns;
    11421892#if HAS_HW_TIMESTAMPS_82580
    1143     struct hw_timestamp_82580 *hw_ts;
    1144     struct timeval cur_sys_time;
    1145     uint64_t cur_sys_time_ns;
    1146     uint64_t estimated_wraps;
    1147    
    1148     /* Using gettimeofday because it's most likely to be a vsyscall
    1149      * We don't want to slow down anything with systemcalls we dont need
    1150      * accauracy */
    1151     gettimeofday(&cur_sys_time, NULL);
     1893        struct hw_timestamp_82580 *hw_ts;
     1894        uint64_t estimated_wraps;
    11521895#else
    1153 # if USE_CLOCK_GETTIME
    1154     struct timespec cur_sys_time;
    1155    
    1156     /* This looks terrible and I feel bad doing it. But it's OK
    1157      * on new kernels, because this is a vsyscall */
    1158     clock_gettime(CLOCK_REALTIME, &cur_sys_time);
    1159 # else
    1160     struct timeval cur_sys_time;
    1161     /* Should be a vsyscall */
    1162     gettimeofday(&cur_sys_time, NULL);
    1163 # endif
    1164 #endif
    1165 
    1166     /* Record the size of our header */
    1167     hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
    1168     *hdr_size = sizeof(struct dpdk_addt_hdr);
    1169     /* Now put our header in front of that size */
    1170     hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
    1171     memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
    1172    
     1896
     1897#endif
     1898
     1899#if USE_CLOCK_GETTIME
     1900        struct timespec cur_sys_time = {0};
     1901        /* This looks terrible and I feel bad doing it. But it's OK
     1902         * on new kernels, because this is a fast vsyscall */
     1903        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
     1904        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
     1905#else
     1906        struct timeval cur_sys_time = {0};
     1907        /* Also a fast vsyscall */
     1908        gettimeofday(&cur_sys_time, NULL);
     1909        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
     1910#endif
     1911
     1912        /* The system clock is not perfect so when running
     1913         * at linerate we could timestamp a packet in the past.
     1914         * To avoid this we munge the timestamp to appear 1ns
     1915         * after the previous packet. We should eventually catch up
     1916         * to system time since a 64byte packet on a 10G link takes 67ns.
     1917         *
     1918         * Note with parallel readers timestamping packets
     1919         * with duplicate stamps or out of order is unavoidable without
     1920         * hardware timestamping from the NIC.
     1921         */
     1922#if !HAS_HW_TIMESTAMPS_82580
     1923        if (plc->ts_last_sys >= cur_sys_time_ns) {
     1924                cur_sys_time_ns = plc->ts_last_sys + 1;
     1925        }
     1926#endif
     1927
     1928        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
     1929        for (i = 0 ; i < nb_pkts ; ++i) {
     1930
     1931                /* We put our header straight after the dpdk header */
     1932                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
     1933                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
     1934
    11731935#if GET_MAC_CRC_CHECKSUM
    1174     /* Add back in the CRC sum */
    1175     rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
    1176     rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
    1177     hdr->flags |= INCLUDES_CHECKSUM;
    1178 #endif
     1936                /* Add back in the CRC sum */
     1937                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
     1938                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
     1939                hdr->flags |= INCLUDES_CHECKSUM;
     1940#endif
     1941
     1942                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
    11791943
    11801944#if HAS_HW_TIMESTAMPS_82580
    1181     /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
    1182      *
    1183      *        +----------+---+   +--------------+
    1184      *  82580 |    24    | 8 |   |      32      |
    1185      *        +----------+---+   +--------------+
    1186      *          reserved  \______ 40 bits _____/
    1187      *
    1188      * The 40 bit 82580 SYSTIM overflows every
    1189      *   2^40 * 10^-9 /  60  = 18.3 minutes.
    1190      *
    1191      * NOTE picture is in Big Endian order, in memory it's acutally in Little
    1192      * Endian (for the full 64 bits) i.e. picture is mirrored
    1193      */
    1194    
    1195     /* The timestamp is sitting before our packet and is included in pkt_len */
    1196     hdr->flags |= INCLUDES_HW_TIMESTAMP;
    1197     hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
    1198    
    1199     /* Despite what the documentation says this is in Little
    1200      * Endian byteorder. Mask the reserved section out.
    1201      */
    1202     hdr->timestamp = le64toh(hw_ts->timestamp) &
    1203                 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
    1204                
    1205     cur_sys_time_ns = TV_TO_NS(cur_sys_time);
    1206     if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
    1207         FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
    1208         FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
    1209     }
    1210    
    1211     /* This will have serious problems if packets aren't read quickly
    1212      * that is within a couple of seconds because our clock cycles every
    1213      * 18 seconds */
    1214     estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
    1215                             / (1ull<<TS_NBITS_82580);
    1216    
    1217     /* Estimated_wraps gives the number of times the counter should have
    1218      * wrapped (however depending on value last time it could have wrapped
    1219      * twice more (if hw clock is close to its max value) or once less (allowing
    1220      * for a bit of variance between hw and sys clock). But if the clock
    1221      * shouldn't have wrapped once then don't allow it to go backwards in time */
    1222     if (unlikely(estimated_wraps >= 2)) {
    1223         /* 2 or more wrap arounds add all but the very last wrap */
    1224         FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
    1225     }
    1226    
    1227     /* Set the timestamp to the lowest possible value we're considering */
    1228     hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
    1229                         FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
    1230    
    1231     /* In most runs only the first if() will need evaluating - i.e our
    1232      * estimate is correct. */
    1233     if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
    1234                                 hdr->timestamp, MAXSKEW_82580))) {
    1235         /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
    1236         FORMAT(libtrace)->wrap_count++;
    1237         hdr->timestamp += (1ull<<TS_NBITS_82580);
    1238         if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1239                                 hdr->timestamp, MAXSKEW_82580)) {
    1240             /* Failed to match estimated_wraps */
    1241             FORMAT(libtrace)->wrap_count++;
    1242             hdr->timestamp += (1ull<<TS_NBITS_82580);
    1243             if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1244                                 hdr->timestamp, MAXSKEW_82580)) {
    1245                 if (estimated_wraps == 0) {
    1246                     /* 0 case Failed to match estimated_wraps+2 */
    1247                     printf("WARNING - Hardware Timestamp failed to"
    1248                                             " match using systemtime!\n");
    1249                     hdr->timestamp = cur_sys_time_ns;
    1250                 } else {
    1251                     /* Failed to match estimated_wraps+1 */
    1252                     FORMAT(libtrace)->wrap_count++;
    1253                     hdr->timestamp += (1ull<<TS_NBITS_82580);
    1254                     if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1255                                 hdr->timestamp, MAXSKEW_82580)) {
    1256                         /* Failed to match estimated_wraps+2 */
    1257                         printf("WARNING - Hardware Timestamp failed to"
    1258                                             " match using systemtime!!\n");
    1259                     }
    1260                 }
    1261             }
    1262         }
    1263     }
    1264 
    1265     /* Log our previous for the next loop */
    1266     FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
    1267 
     1945                /* The timestamp is sitting before our packet and is included in pkt_len */
     1946                hdr->flags |= INCLUDES_HW_TIMESTAMP;
     1947                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
     1948                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
     1949
     1950                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
     1951                 *
     1952                 *        +----------+---+   +--------------+
     1953                 *  82580 |    24    | 8 |   |      32      |
     1954                 *        +----------+---+   +--------------+
     1955                 *          reserved  \______ 40 bits _____/
     1956                 *
     1957                 * The 40 bit 82580 SYSTIM overflows every
     1958                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
     1959                 *
     1960                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
     1961                 * Endian (for the full 64 bits) i.e. picture is mirrored
     1962                 */
     1963
     1964                /* Despite what the documentation says this is in Little
     1965                 * Endian byteorder. Mask the reserved section out.
     1966                 */
     1967                hdr->timestamp = le64toh(hw_ts->timestamp) &
     1968                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
     1969
     1970                if (unlikely(plc->ts_first_sys == 0)) {
     1971                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
     1972                        plc->ts_last_sys = plc->ts_first_sys;
     1973                }
     1974
     1975                /* This will have serious problems if packets aren't read quickly
     1976                 * that is within a couple of seconds because our clock cycles every
     1977                 * 18 seconds */
     1978                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
     1979                                  / (1ull<<TS_NBITS_82580);
     1980
     1981                /* Estimated_wraps gives the number of times the counter should have
     1982                 * wrapped (however depending on value last time it could have wrapped
     1983                 * twice more (if hw clock is close to its max value) or once less (allowing
     1984                 * for a bit of variance between hw and sys clock). But if the clock
     1985                 * shouldn't have wrapped once then don't allow it to go backwards in time */
     1986                if (unlikely(estimated_wraps >= 2)) {
     1987                        /* 2 or more wrap arounds add all but the very last wrap */
     1988                        plc->wrap_count += estimated_wraps - 1;
     1989                }
     1990
     1991                /* Set the timestamp to the lowest possible value we're considering */
     1992                hdr->timestamp += plc->ts_first_sys +
     1993                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
     1994
     1995                /* In most runs only the first if() will need evaluating - i.e our
     1996                 * estimate is correct. */
     1997                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
     1998                                              hdr->timestamp, MAXSKEW_82580))) {
     1999                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
     2000                        plc->wrap_count++;
     2001                        hdr->timestamp += (1ull<<TS_NBITS_82580);
     2002                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2003                                             hdr->timestamp, MAXSKEW_82580)) {
     2004                                /* Failed to match estimated_wraps */
     2005                                plc->wrap_count++;
     2006                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     2007                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2008                                                     hdr->timestamp, MAXSKEW_82580)) {
     2009                                        if (estimated_wraps == 0) {
     2010                                                /* 0 case Failed to match estimated_wraps+2 */
     2011                                                printf("WARNING - Hardware Timestamp failed to"
     2012                                                       " match using systemtime!\n");
     2013                                                hdr->timestamp = cur_sys_time_ns;
     2014                                        } else {
     2015                                                /* Failed to match estimated_wraps+1 */
     2016                                                plc->wrap_count++;
     2017                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     2018                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2019                                                                     hdr->timestamp, MAXSKEW_82580)) {
     2020                                                        /* Failed to match estimated_wraps+2 */
     2021                                                        printf("WARNING - Hardware Timestamp failed to"
     2022                                                               " match using systemtime!!\n");
     2023                                                }
     2024                                        }
     2025                                }
     2026                        }
     2027                }
    12682028#else
    1269 # if USE_CLOCK_GETTIME
    1270     hdr->timestamp = TS_TO_NS(cur_sys_time);
    1271 # else
    1272     hdr->timestamp = TV_TO_NS(cur_sys_time);
    1273 # endif
    1274 #endif
    1275 
    1276     /* Intels samples prefetch into level 0 cache lets assume it is a good
    1277      * idea and do the same */
    1278     rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
    1279     packet->buffer = pkt;
    1280     dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
    1281 
    1282     /* Set our capture length for the first time */
    1283     hdr->cap_len = dpdk_get_wire_length(packet);
    1284     if (!(hdr->flags & INCLUDES_CHECKSUM)) {
    1285         hdr->cap_len -= ETHER_CRC_LEN;
    1286     }
    1287    
    1288 
    1289     return dpdk_get_framing_length(packet) +
    1290                         dpdk_get_capture_length(packet);
     2029
     2030                hdr->timestamp = cur_sys_time_ns;
     2031                /* Offset the next packet by the wire time of previous */
     2032                calculate_wire_time(format_data, hdr->cap_len);
     2033
     2034#endif
     2035        }
     2036
     2037        plc->ts_last_sys = cur_sys_time_ns;
     2038        return;
     2039}
     2040
     2041
     2042static void dpdk_fin_packet(libtrace_packet_t *packet)
     2043{
     2044        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     2045                rte_pktmbuf_free(packet->buffer);
     2046                packet->buffer = NULL;
     2047        }
     2048}
     2049
     2050/** Reads at least one packet or returns an error
     2051 */
     2052static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
     2053                                           dpdk_per_stream_t *stream,
     2054                                           libtrace_message_queue_t *mesg,
     2055                                           struct rte_mbuf* pkts_burst[],
     2056                                           size_t nb_packets) {
     2057        size_t nb_rx; /* Number of rx packets we've recevied */
     2058        while (1) {
     2059                /* Poll for a batch of packets */
     2060                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
     2061                                         stream->queue_id, pkts_burst, nb_packets);
     2062                if (nb_rx > 0) {
     2063                        /* Got some packets - otherwise we keep spining */
     2064                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
     2065                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     2066                        return nb_rx;
     2067                }
     2068                /* Check the message queue this could be less than 0 */
     2069                if (mesg && libtrace_message_queue_count(mesg) > 0)
     2070                        return READ_MESSAGE;
     2071                if (libtrace_halt)
     2072                        return READ_EOF;
     2073                /* Wait a while, polling on memory degrades performance
     2074                 * This relieves the pressure on memory allowing the NIC to DMA */
     2075                rte_delay_us(10);
     2076        }
     2077
     2078        /* We'll never get here - but if we did it would be bad */
     2079        return READ_ERROR;
     2080}
     2081
     2082static int dpdk_pread_packets (libtrace_t *libtrace,
     2083                                    libtrace_thread_t *t,
     2084                                    libtrace_packet_t **packets,
     2085                                    size_t nb_packets) {
     2086        int nb_rx; /* Number of rx packets we've recevied */
     2087        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
     2088        int i;
     2089        dpdk_per_stream_t *stream = t->format_data;
     2090
     2091        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
     2092                                         pkts_burst, nb_packets);
     2093
     2094        if (nb_rx > 0) {
     2095                for (i = 0; i < nb_rx; ++i) {
     2096                        if (packets[i]->buffer != NULL) {
     2097                                /* The packet should always be finished */
     2098                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
     2099                                free(packets[i]->buffer);
     2100                        }
     2101                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
     2102                        packets[i]->type = TRACE_RT_DATA_DPDK;
     2103                        packets[i]->buffer = pkts_burst[i];
     2104                        packets[i]->trace = libtrace;
     2105                        packets[i]->error = 1;
     2106                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
     2107                }
     2108        }
     2109
     2110        return nb_rx;
    12912111}
    12922112
    12932113static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    1294     int nb_rx; /* Number of rx packets we've recevied */
    1295     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
    1296 
    1297     /* Free the last packet buffer */
    1298     if (packet->buffer != NULL) {
    1299         /* Buffer is owned by DPDK */
    1300         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1301             rte_pktmbuf_free(packet->buffer);
    1302             packet->buffer = NULL;
    1303         } else
    1304         /* Buffer is owned by packet i.e. has been malloc'd */
    1305         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1306             free(packet->buffer);
    1307             packet->buffer = NULL;
    1308         }
    1309     }
    1310    
    1311     packet->buf_control = TRACE_CTRL_EXTERNAL;
    1312     packet->type = TRACE_RT_DATA_DPDK;
    1313    
    1314     /* Wait for a packet */
    1315     while (1) {
    1316         /* Poll for a single packet */
    1317         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    1318                             FORMAT(libtrace)->queue_id, pkts_burst, 1);
    1319         if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    1320             return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    1321         }
    1322         if (libtrace_halt) {
    1323             return 0;
    1324         }
    1325     }
    1326    
    1327     /* We'll never get here - but if we did it would be bad */
    1328     return -1;
     2114        int nb_rx; /* Number of rx packets we've received */
     2115        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
     2116
     2117        /* Free the last packet buffer */
     2118        if (packet->buffer != NULL) {
     2119                /* The packet should always be finished */
     2120                assert(packet->buf_control == TRACE_CTRL_PACKET);
     2121                free(packet->buffer);
     2122                packet->buffer = NULL;
     2123        }
     2124
     2125        packet->buf_control = TRACE_CTRL_EXTERNAL;
     2126        packet->type = TRACE_RT_DATA_DPDK;
     2127
     2128        /* Check if we already have some packets buffered */
     2129        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
     2130                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
     2131                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2132                return 1; // TODO should be bytes read, which essentially useless anyway
     2133        }
     2134
     2135        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
     2136                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
     2137
     2138        if (nb_rx > 0) {
     2139                FORMAT(libtrace)->burst_size = nb_rx;
     2140                FORMAT(libtrace)->burst_offset = 1;
     2141                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
     2142                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2143                return 1;
     2144        }
     2145        return nb_rx;
    13292146}
    13302147
    13312148static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
    1332     struct timeval tv;
    1333     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1334    
    1335     tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    1336     tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
    1337     return tv;
     2149        struct timeval tv;
     2150        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2151
     2152        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
     2153        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
     2154        return tv;
    13382155}
    13392156
    13402157static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
    1341     struct timespec ts;
    1342     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1343    
    1344     ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    1345     ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
    1346     return ts;
     2158        struct timespec ts;
     2159        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2160
     2161        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
     2162        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
     2163        return ts;
    13472164}
    13482165
    13492166static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
    1350     return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
     2167        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
    13512168}
    13522169
    13532170static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
    1354     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1355     return (libtrace_direction_t) hdr->direction;
     2171        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2172        return (libtrace_direction_t) hdr->direction;
    13562173}
    13572174
    13582175static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
    1359     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1360     hdr->direction = (uint8_t) direction;
    1361     return (libtrace_direction_t) hdr->direction;
    1362 }
    1363 
    1364 /*
    1365  * NOTE: Drops could occur for other reasons than running out of buffer
    1366  * space. Such as failed MAC checksums and oversized packets.
    1367  */
    1368 static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
    1369     struct rte_eth_stats stats = {0};
    1370    
    1371     if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1372         return UINT64_MAX;
    1373     /* Grab the current stats */
    1374     rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1375    
    1376     /* Get the drop counter */
    1377     return (uint64_t) stats.ierrors;
    1378 }
    1379 
    1380 static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
    1381     struct rte_eth_stats stats = {0};
    1382    
    1383     if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1384         return UINT64_MAX;
    1385     /* Grab the current stats */
    1386     rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1387    
    1388     /* Get the drop counter */
    1389     return (uint64_t) stats.ipackets;
    1390 }
    1391 
    1392 /*
    1393  * This is the number of packets filtered by the NIC
    1394  * and maybe ahead of number read using libtrace.
    1395  *
    1396  * XXX we are yet to implement any filtering, but if it was this should
    1397  * get the result. So this will just return 0 for now.
    1398  */
    1399 static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
    1400     struct rte_eth_stats stats = {0};
    1401    
    1402     if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1403         return UINT64_MAX;
    1404     /* Grab the current stats */
    1405     rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1406    
    1407     /* Get the drop counter */
    1408     return (uint64_t) stats.fdirmiss;
     2176        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2177        hdr->direction = (uint8_t) direction;
     2178        return (libtrace_direction_t) hdr->direction;
     2179}
     2180
     2181static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
     2182        struct rte_eth_stats dev_stats = {0};
     2183
     2184        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
     2185                return;
     2186
     2187        /* Grab the current stats */
     2188        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
     2189
     2190        stats->captured_valid = true;
     2191        stats->captured = dev_stats.ipackets;
     2192
     2193        /* Not that we support adding filters but if we did this
     2194         * would work */
     2195        stats->filtered += dev_stats.fdirmiss;
     2196
     2197        stats->dropped_valid = true;
     2198        stats->dropped = dev_stats.imissed;
     2199
     2200        /* DPDK errors includes drops */
     2201        stats->errors_valid = true;
     2202        stats->errors = dev_stats.ierrors - dev_stats.imissed;
     2203
     2204        stats->received_valid = true;
     2205        stats->received = dev_stats.ipackets + dev_stats.imissed;
     2206
    14092207}
    14102208
     
    14142212 */
    14152213static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
    1416                                         libtrace_packet_t *packet) {
    1417     libtrace_eventobj_t event = {0,0,0.0,0};
    1418     int nb_rx; /* Number of receive packets we've read */
    1419     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
    1420    
    1421     do {
    1422    
    1423         /* See if we already have a packet waiting */
    1424         nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
    1425                         FORMAT(trace)->queue_id, pkts_burst, 1);
    1426        
    1427         if (nb_rx > 0) {
    1428             /* Free the last packet buffer */
    1429             if (packet->buffer != NULL) {
    1430                 /* Buffer is owned by DPDK */
    1431                 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1432                     rte_pktmbuf_free(packet->buffer);
    1433                     packet->buffer = NULL;
    1434                 } else
    1435                 /* Buffer is owned by packet i.e. has been malloc'd */
    1436                 if (packet->buf_control == TRACE_CTRL_PACKET) {
    1437                     free(packet->buffer);
    1438                     packet->buffer = NULL;
    1439                 }
    1440             }
    1441            
    1442             packet->buf_control = TRACE_CTRL_EXTERNAL;
    1443             packet->type = TRACE_RT_DATA_DPDK;
    1444             event.type = TRACE_EVENT_PACKET;
    1445             event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
    1446            
    1447             /* XXX - Check this passes the filter trace_read_packet normally
    1448              * does this for us but this wont */
    1449             if (trace->filter) {
    1450                 if (!trace_apply_filter(trace->filter, packet)) {
    1451                     /* Failed the filter so we loop for another packet */
    1452                     trace->filtered_packets ++;
    1453                     continue;
    1454                 }
    1455             }
    1456             trace->accepted_packets ++;
    1457         } else {
    1458             /* We only want to sleep for a very short time - we are non-blocking */
    1459             event.type = TRACE_EVENT_SLEEP;
    1460             event.seconds = 0.0001;
    1461             event.size = 0;
    1462         }
    1463        
    1464         /* If we get here we have our event */
    1465         break;
    1466     } while (1);
    1467 
    1468     return event;
    1469 }
    1470 
     2214                                            libtrace_packet_t *packet) {
     2215        libtrace_eventobj_t event = {0,0,0.0,0};
     2216        int nb_rx; /* Number of receive packets we've read */
     2217        struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
     2218
     2219        do {
     2220
     2221                /* See if we already have a packet waiting */
     2222                nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
     2223                                         FORMAT_DATA_FIRST(trace)->queue_id,
     2224                                         pkts_burst, 1);
     2225
     2226                if (nb_rx > 0) {
     2227                        /* Free the last packet buffer */
     2228                        if (packet->buffer != NULL) {
     2229                                /* The packet should always be finished */
     2230                                assert(packet->buf_control == TRACE_CTRL_PACKET);
     2231                                free(packet->buffer);
     2232                                packet->buffer = NULL;
     2233                        }
     2234
     2235                        packet->buf_control = TRACE_CTRL_EXTERNAL;
     2236                        packet->type = TRACE_RT_DATA_DPDK;
     2237                        event.type = TRACE_EVENT_PACKET;
     2238                        dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
     2239                        packet->buffer = FORMAT(trace)->burst_pkts[0];
     2240                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
     2241                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
     2242
     2243                        /* XXX - Check this passes the filter trace_read_packet normally
     2244                         * does this for us but this wont */
     2245                        if (trace->filter) {
     2246                                if (!trace_apply_filter(trace->filter, packet)) {
     2247                                        /* Failed the filter so we loop for another packet */
     2248                                        trace->filtered_packets ++;
     2249                                        continue;
     2250                                }
     2251                        }
     2252                        trace->accepted_packets ++;
     2253                } else {
     2254                        /* We only want to sleep for a very short time - we are non-blocking */
     2255                        event.type = TRACE_EVENT_SLEEP;
     2256                        event.seconds = 0.0001;
     2257                        event.size = 0;
     2258                }
     2259
     2260                /* If we get here we have our event */
     2261                break;
     2262        } while (1);
     2263
     2264        return event;
     2265}
    14712266
    14722267static void dpdk_help(void) {
    1473     printf("dpdk format module: $Revision: 1752 $\n");
    1474     printf("Supported input URIs:\n");
    1475     printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
    1476     printf("\tThe -<coreid> is optional \n");
    1477     printf("\t e.g. dpdk:0000:01:00.1\n");
    1478     printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
    1479     printf("\t By default the last CPU core is used if not otherwise specified.\n");
    1480     printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
    1481     printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
    1482     printf("\n");
    1483     printf("Supported output URIs:\n");
    1484     printf("\tSame format as the input URI.\n");
    1485     printf("\t e.g. dpdk:0000:01:00.1\n");
    1486     printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
    1487     printf("\n");
    1488 }
    1489 
    1490  static struct libtrace_format_t dpdk = {
     2268        printf("dpdk format module: $Revision: 1752 $\n");
     2269        printf("Supported input URIs:\n");
     2270        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
     2271        printf("\tThe -<coreid> is optional \n");
     2272        printf("\t e.g. dpdk:0000:01:00.1\n");
     2273        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
     2274        printf("\t By default the last CPU core is used if not otherwise specified.\n");
     2275        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
     2276        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
     2277        printf("\n");
     2278        printf("Supported output URIs:\n");
     2279        printf("\tSame format as the input URI.\n");
     2280        printf("\t e.g. dpdk:0000:01:00.1\n");
     2281        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
     2282        printf("\n");
     2283}
     2284
     2285static struct libtrace_format_t dpdk = {
    14912286        "dpdk",
    1492         "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
     2287        "$Id$",
    14932288        TRACE_FORMAT_DPDK,
    1494         NULL,                   /* probe filename */
    1495         NULL,                               /* probe magic */
    1496         dpdk_init_input,            /* init_input */
    1497         dpdk_config_input,          /* config_input */
    1498         dpdk_start_input,           /* start_input */
    1499         dpdk_pause_input,           /* pause_input */
    1500         dpdk_init_output,           /* init_output */
    1501         NULL,                               /* config_output */
    1502         dpdk_start_output,          /* start_ouput */
    1503         dpdk_fin_input,             /* fin_input */
    1504         dpdk_fin_output,        /* fin_output */
    1505         dpdk_read_packet,           /* read_packet */
    1506         dpdk_prepare_packet,    /* prepare_packet */
    1507         NULL,                               /* fin_packet */
    1508         dpdk_write_packet,          /* write_packet */
    1509         dpdk_get_link_type,         /* get_link_type */
    1510         dpdk_get_direction,         /* get_direction */
    1511         dpdk_set_direction,         /* set_direction */
    1512         NULL,                               /* get_erf_timestamp */
    1513         dpdk_get_timeval,           /* get_timeval */
    1514         dpdk_get_timespec,          /* get_timespec */
    1515         NULL,                               /* get_seconds */
    1516         NULL,                               /* seek_erf */
    1517         NULL,                               /* seek_timeval */
    1518         NULL,                               /* seek_seconds */
    1519         dpdk_get_capture_length,/* get_capture_length */
    1520         dpdk_get_wire_length,   /* get_wire_length */
    1521         dpdk_get_framing_length,/* get_framing_length */
    1522         dpdk_set_capture_length,/* set_capture_length */
    1523         NULL,                               /* get_received_packets */
    1524         dpdk_get_filtered_packets,/* get_filtered_packets */
    1525         dpdk_get_dropped_packets,/* get_dropped_packets */
    1526     dpdk_get_captured_packets,/* get_captured_packets */
    1527         NULL,                       /* get_fd */
    1528         dpdk_trace_event,               /* trace_event */
    1529     dpdk_help,              /* help */
    1530         NULL
     2289        NULL,                               /* probe filename */
     2290        NULL,                               /* probe magic */
     2291        dpdk_init_input,                    /* init_input */
     2292        dpdk_config_input,                  /* config_input */
     2293        dpdk_start_input,                   /* start_input */
     2294        dpdk_pause_input,                   /* pause_input */
     2295        dpdk_init_output,                   /* init_output */
     2296        NULL,                               /* config_output */
     2297        dpdk_start_output,                  /* start_ouput */
     2298        dpdk_fin_input,                     /* fin_input */
     2299        dpdk_fin_output,                    /* fin_output */
     2300        dpdk_read_packet,                   /* read_packet */
     2301        dpdk_prepare_packet,                /* prepare_packet */
     2302        dpdk_fin_packet,                    /* fin_packet */
     2303        dpdk_write_packet,                  /* write_packet */
     2304        dpdk_get_link_type,                 /* get_link_type */
     2305        dpdk_get_direction,                 /* get_direction */
     2306        dpdk_set_direction,                 /* set_direction */
     2307        NULL,                               /* get_erf_timestamp */
     2308        dpdk_get_timeval,                   /* get_timeval */
     2309        dpdk_get_timespec,                  /* get_timespec */
     2310        NULL,                               /* get_seconds */
     2311        NULL,                               /* seek_erf */
     2312        NULL,                               /* seek_timeval */
     2313        NULL,                               /* seek_seconds */
     2314        dpdk_get_capture_length,            /* get_capture_length */
     2315        dpdk_get_wire_length,               /* get_wire_length */
     2316        dpdk_get_framing_length,            /* get_framing_length */
     2317        dpdk_set_capture_length,            /* set_capture_length */
     2318        NULL,                               /* get_received_packets */
     2319        NULL,                               /* get_filtered_packets */
     2320        NULL,                               /* get_dropped_packets */
     2321        dpdk_get_stats,                     /* get_statistics */
     2322        NULL,                               /* get_fd */
     2323        dpdk_trace_event,                   /* trace_event */
     2324        dpdk_help,                          /* help */
     2325        NULL,                               /* next pointer */
     2326        {true, 8},                          /* Live, NICs typically have 8 threads */
     2327        dpdk_pstart_input,                  /* pstart_input */
     2328        dpdk_pread_packets,                 /* pread_packets */
     2329        dpdk_pause_input,                   /* ppause */
     2330        dpdk_fin_input,                     /* p_fin */
     2331        dpdk_pregister_thread,              /* pregister_thread */
     2332        dpdk_punregister_thread,            /* punregister_thread */
     2333        NULL                                /* get thread stats */
    15312334};
    15322335
  • lib/format_duck.c

    rc5ac872 r5ab626a  
    362362        NULL,                           /* get_filtered_packets */
    363363        NULL,                           /* get_dropped_packets */
    364         NULL,                           /* get_captured_packets */
     364        NULL,                           /* get_statistics */
    365365        NULL,                           /* get_fd */
    366366        NULL,                           /* trace_event */
    367367        duck_help,                      /* help */
    368         NULL                            /* next pointer */
     368        NULL,                            /* next pointer */
     369        NON_PARALLEL(false)
    369370};
    370371
  • lib/format_erf.c

    rb13caea rd420777  
    856856        NULL,                           /* get_filtered_packets */
    857857        erf_get_dropped_packets,        /* get_dropped_packets */
    858         NULL,                           /* get_captured_packets */
     858        NULL,                           /* get_statistics */
    859859        NULL,                           /* get_fd */
    860860        erf_event,                      /* trace_event */
    861861        erf_help,                       /* help */
    862         NULL                            /* next pointer */
     862        NULL,                           /* next pointer */
     863        NON_PARALLEL(false)
    863864};
    864865
     
    899900        NULL,                           /* get_filtered_packets */
    900901        erf_get_dropped_packets,        /* get_dropped_packets */
    901         NULL,                           /* get_captured_packets */
     902        NULL,                           /* get_statistics */
    902903        NULL,                           /* get_fd */
    903904        erf_event,                      /* trace_event */
    904905        erf_help,                       /* help */
    905         NULL                            /* next pointer */
     906        NULL,                           /* next pointer */
     907        NON_PARALLEL(false)
    906908};
    907909
  • lib/format_legacy.c

    r1ca603b r5ab626a  
    548548        NULL,                           /* get_filtered_packets */
    549549        NULL,                           /* get_dropped_packets */
    550         NULL,                           /* get_captured_packets */
     550        NULL,                           /* get_statistics */
    551551        NULL,                           /* get_fd */
    552552        trace_event_trace,              /* trace_event */
    553553        legacyatm_help,                 /* help */
    554         NULL                            /* next pointer */
     554        NULL,                           /* next pointer */
     555        NON_PARALLEL(false)
    555556};
    556557
     
    591592        NULL,                           /* get_filtered_packets */
    592593        NULL,                           /* get_dropped_packets */
    593         NULL,                           /* get_captured_packets */
     594        NULL,                           /* get_statistics */
    594595        NULL,                           /* get_fd */
    595596        trace_event_trace,              /* trace_event */
    596597        legacyeth_help,                 /* help */
    597         NULL                            /* next pointer */
     598        NULL,                           /* next pointer */
     599        NON_PARALLEL(false)
    598600};
    599601
     
    634636        NULL,                           /* get_filtered_packets */
    635637        NULL,                           /* get_dropped_packets */
    636         NULL,                           /* get_captured_packets */
     638        NULL,                           /* get_statistics */
    637639        NULL,                           /* get_fd */
    638640        trace_event_trace,              /* trace_event */
    639641        legacypos_help,                 /* help */
    640642        NULL,                           /* next pointer */
     643        NON_PARALLEL(false)
    641644};
    642645
     
    677680        NULL,                           /* get_filtered_packets */
    678681        NULL,                           /* get_dropped_packets */
    679         NULL,                           /* get_captured_packets */
     682        NULL,                           /* get_statistics */
    680683        NULL,                           /* get_fd */
    681684        trace_event_trace,              /* trace_event */
    682685        legacynzix_help,                /* help */
    683686        NULL,                           /* next pointer */
     687        NON_PARALLEL(false)
    684688};
    685689       
  • lib/format_pcap.c

    r4649fea r5ab626a  
    830830        NULL,                           /* get_filtered_packets */
    831831        NULL,                           /* get_dropped_packets */
    832         NULL,                           /* get_captured_packets */
     832        NULL,                           /* get_statistics */
    833833        NULL,                           /* get_fd */
    834834        trace_event_trace,              /* trace_event */
    835835        pcap_help,                      /* help */
    836         NULL                            /* next pointer */
     836        NULL,                   /* next pointer */
     837        NON_PARALLEL(false)
    837838};
    838839
     
    873874        NULL,                           /* get_filtered_packets */
    874875        pcap_get_dropped_packets,       /* get_dropped_packets */
    875         NULL,                           /* get_captured_packets */
     876        NULL,                           /* get_statistics */
    876877        pcap_get_fd,                    /* get_fd */
    877878        trace_event_device,             /* trace_event */
    878879        pcapint_help,                   /* help */
    879         NULL                            /* next pointer */
     880        NULL,                   /* next pointer */
     881        NON_PARALLEL(true)