Changes in / [584d907:e0be8382]


Ignore:
Files:
43 added
1 deleted
28 edited

Legend:

Unmodified
Added
Removed
  • README

    r3e5518a r3e5518a  
     1This fork of Libtrace aims to support parallel packet processing.
     2
     3This is still work in progress and is full of bugs, some of the original
     4Libtrace functions might not function correctly breaking the supplied tools.
     5
    16libtrace 3.0.22
    27
  • configure.in

    r3e5518a r6c09048  
    392392fi
    393393
     394# If we use DPDK we might be able to use libnuma
     395AC_CHECK_LIB(numa, numa_node_to_cpus, have_numa=1, have_numa=0)
     396
    394397# Checks for various "optional" libraries
    395398AC_CHECK_LIB(pthread, pthread_create, have_pthread=1, have_pthread=0)
     
    411414AC_CHECK_LIB(rt, clock_gettime, have_clock_gettime=1, have_clock_gettime=0)
    412415LIBS=
     416
     417if test "$have_numa" = 1; then
     418        LIBTRACE_LIBS="$LIBTRACE_LIBS -lnuma"
     419        AC_DEFINE(HAVE_LIBNUMA, 1, [Set to 1 if libnuma is supported])
     420        with_numa=yes
     421else
     422        AC_DEFINE(HAVE_LIBNUMA, 0, [Set to 1 if libnuma is supported])
     423        with_numa=no
     424fi
    413425
    414426if test "$dlfound" = 0; then
     
    445457
    446458if test "$have_clock_gettime" = 1; then
    447     LIBTRACE_LIBS="$LIBTRACE_LIBS -lrt"
     459        LIBTRACE_LIBS="$LIBTRACE_LIBS -lrt"
     460        AC_DEFINE(HAVE_CLOCK_GETTIME, 1, [Set to 1 if clock_gettime is supported])
     461        with_clock_gettime=yes
     462else
     463        AC_DEFINE(HAVE_CLOCK_GETTIME, 0, [Set to 1 if clock_gettime is supported])
     464        with_clock_gettime=no
    448465fi
    449466
     
    687704
    688705if test x"$libtrace_dpdk" = xtrue; then
    689     AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes])
     706        AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes])
     707        reportopt "Compiled with DPDK trace NUMA support" $with_numa
     708        reportopt "Compiled with clock_gettime support" $with_clock_gettime
    690709elif test x"$want_dpdk" != "xno"; then
    691710#   We don't officially support DPDK so only report failure if the user
    692711#   explicitly asked for DPDK. That way, we can hopefully keep it hidden
    693712#   from most users for now...
    694        
    695     AC_MSG_NOTICE([Compiled with DPDK live capture support: No])
    696     AC_MSG_NOTICE([Note: Requires DPDK v1.5 or newer])
     713
     714        AC_MSG_NOTICE([Compiled with DPDK live capture support: No])
     715        AC_MSG_NOTICE([Note: Requires DPDK v1.5 or newer])
    697716fi
    698717reportopt "Compiled with LLVM BPF JIT support" $JIT
  • lib/Makefile.am

    r3fc3267 r6cf3ca0  
    22include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 
    33
    4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@
    5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@
     4AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread
     5AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread
    66
    77extra_DIST = format_template.c
    8 NATIVEFORMATS=format_linux.c
     8NATIVEFORMATS=format_linux_common.c format_linux_ring.c format_linux_int.c format_linux_common.h
    99BPFFORMATS=format_bpf.c
    1010
     
    2929NATIVEFORMATS+= format_dpdk.c
    3030# So we also make libtrace.mk in dpdk otherwise automake tries to expand
    31 # it to early which I cannot seem to stop unless we use a path that
     31# it too early which I cannot seem to stop unless we use a path that
    3232# doesn't exist currently
    3333export RTE_SDK=@RTE_SDK@
     
    4444endif
    4545
    46 libtrace_la_SOURCES = trace.c common.h \
     46libtrace_la_SOURCES = trace.c trace_parallel.c common.h \
    4747                format_erf.c format_pcap.c format_legacy.c \
    4848                format_rt.c format_helper.c format_helper.h format_pcapfile.c \
     
    5757                $(DAGSOURCE) format_erf.h \
    5858                $(BPFJITSOURCE) \
    59                 libtrace_arphrd.h
     59                libtrace_arphrd.h \
     60                data-struct/ring_buffer.c data-struct/vector.c \
     61                data-struct/message_queue.c data-struct/deque.c \
     62                data-struct/sliding_window.c data-struct/object_cache.c \
     63                data-struct/linked_list.c hash_toeplitz.c combiner_ordered.c \
     64                combiner_sorted.c combiner_unordered.c
    6065
    6166if DAG2_4
  • lib/format_atmhdr.c

    r5952ff0 r5ab626a  
    227227        NULL,                           /* get_filtered_packets */
    228228        NULL,                           /* get_dropped_packets */
    229         NULL,                           /* get_captured_packets */
     229        NULL,                           /* get_statistics */
    230230        NULL,                           /* get_fd */
    231231        trace_event_trace,              /* trace_event */
    232232        NULL,                           /* help */
    233         NULL                            /* next pointer */
     233        NULL,                            /* next pointer */
     234        NON_PARALLEL(false)
    234235};
    235236       
  • lib/format_bpf.c

    r08f5060 r5ab626a  
    610610        NULL,                   /* get_filtered_packets */
    611611        bpf_get_dropped_packets,/* get_dropped_packets */
    612         NULL,                   /* get_captured_packets */
     612        NULL,                   /* get_statistics */
    613613        bpf_get_fd,             /* get_fd */
    614614        trace_event_device,     /* trace_event */
    615615        bpf_help,               /* help */
    616         NULL
     616        NULL,                   /* next pointer */
     617        NON_PARALLEL(true)
    617618};
    618619#else   /* HAVE_DECL_BIOCSETIF */
     
    656657        bpf_get_framing_length, /* get_framing_length */
    657658        NULL,                   /* set_capture_length */
    658         NULL,/* get_received_packets */
     659        NULL,                   /* get_received_packets */
    659660        NULL,                   /* get_filtered_packets */
    660         NULL,/* get_dropped_packets */
    661         NULL,                   /* get_captured_packets */
     661        NULL,                   /* get_dropped_packets */
     662        NULL,                   /* get_statistics */
    662663        NULL,                   /* get_fd */
    663664        NULL,                   /* trace_event */
    664665        bpf_help,               /* help */
    665         NULL
     666        NULL,                   /* next pointer */
     667        NON_PARALLEL(true)
    666668};
    667669#endif  /* HAVE_DECL_BIOCSETIF */
  • lib/format_dag24.c

    rc70f59f r5ab626a  
    554554        NULL,                           /* get_filtered_packets */
    555555        dag_get_dropped_packets,        /* get_dropped_packets */
    556         NULL,                           /* get_captured_packets */
     556        NULL,                           /* get_statistics */
    557557        NULL,                           /* get_fd */
    558558        trace_event_dag,                /* trace_event */
    559559        dag_help,                       /* help */
    560         NULL                            /* next pointer */
     560        NULL,                            /* next pointer */
     561    NON_PARALLEL(true)
    561562};
    562563
  • lib/format_dag25.c

    rc5ac872 r6b98325  
    7979 */
    8080
    81 
    8281#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8382#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
    8484
    8585#define FORMAT_DATA DATA(libtrace)
     
    8787
    8888#define DUCK FORMAT_DATA->duck
     89
     90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
     91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
     92
    8993static struct libtrace_format_t dag;
    9094
     
    104108/* "Global" data that is stored for each DAG output trace */
    105109struct dag_format_data_out_t {
    106         /* String containing the DAG device name */
    107         char *device_name;
    108110        /* The DAG device being used for writing */
    109111        struct dag_dev_t *device;
     
    118120};
    119121
     122/* Data that is stored against each input stream */
     123struct dag_per_stream_t {
     124        /* DAG stream number */
     125        uint16_t dagstream;
     126        /* Pointer to the last unread byte in the DAG memory */
     127        uint8_t *top;
     128        /* Pointer to the first unread byte in the DAG memory */
     129        uint8_t *bottom;
     130        /* Amount of data processed from the bottom pointer */
     131        uint32_t processed;
     132        /* Number of packets seen by the stream */
     133        uint64_t pkt_count;
     134        /* Drop count for this particular stream */
     135        uint64_t drops;
     136        /* Boolean values to indicate if a particular interface has been seen
     137         * or not. This is limited to four interfaces, which is enough to
     138         * support all current DAG cards */
     139        uint8_t seeninterface[4];
     140};
     141
    120142/* "Global" data that is stored for each DAG input trace */
    121143struct dag_format_data_t {
    122 
    123         /* Data required for regular DUCK reporting */
     144        /* DAG device */
     145        struct dag_dev_t *device;
     146        /* Boolean flag indicating whether the trace is currently attached */
     147        int stream_attached;
     148        /* Data stored against each DAG input stream */
     149        libtrace_list_t *per_stream;
     150
     151        /* Data required for regular DUCK reporting.
     152         * We put this on a new cache line otherwise we have a lot of false
     153         * sharing caused by updating the last_pkt.
     154         * This should only ever be accessed by the first thread stream,
     155         * that includes both read and write operations.
     156         */
    124157        struct {
    125158                /* Timestamp of the last DUCK report */
    126159                uint32_t last_duck;
    127160                /* The number of seconds between each DUCK report */
    128                 uint32_t duck_freq;
     161                uint32_t duck_freq;
    129162                /* Timestamp of the last packet read from the DAG card */
    130                 uint32_t last_pkt;
     163                uint32_t last_pkt;
    131164                /* Dummy trace to ensure DUCK packets are dealt with using the
    132165                 * DUCK format functions */
    133                 libtrace_t *dummy_duck;
    134         } duck;
    135 
    136         /* String containing the DAG device name */
    137         char *device_name;
    138         /* The DAG device that we are reading from */
    139         struct dag_dev_t *device;
    140         /* The DAG stream that we are reading from */
    141         unsigned int dagstream;
    142         /* Boolean flag indicating whether the stream is currently attached */
    143         int stream_attached;
    144         /* Pointer to the first unread byte in the DAG memory hole */
    145         uint8_t *bottom;
    146         /* Pointer to the last unread byte in the DAG memory hole */
    147         uint8_t *top;
    148         /* The amount of data processed thus far from the bottom pointer */
    149         uint32_t processed;
    150         /* The number of packets that have been dropped */
    151         uint64_t drops;
    152 
    153         uint8_t seeninterface[4];
     166                libtrace_t *dummy_duck;
     167        } duck ALIGN_STRUCT(CACHE_LINE_SIZE);
    154168};
    155169
     
    207221
    208222/* Initialises the DAG output data structure */
    209 static void dag_init_format_out_data(libtrace_out_t *libtrace) {
    210         libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
     223static void dag_init_format_out_data(libtrace_out_t *libtrace)
     224{
     225        libtrace->format_data = (struct dag_format_data_out_t *)
     226                malloc(sizeof(struct dag_format_data_out_t));
    211227        // no DUCK on output
    212228        FORMAT_DATA_OUT->stream_attached = 0;
    213229        FORMAT_DATA_OUT->device = NULL;
    214         FORMAT_DATA_OUT->device_name = NULL;
    215230        FORMAT_DATA_OUT->dagstream = 0;
    216231        FORMAT_DATA_OUT->waiting = 0;
     
    219234
    220235/* Initialises the DAG input data structure */
    221 static void dag_init_format_data(libtrace_t *libtrace) {
     236static void dag_init_format_data(libtrace_t *libtrace)
     237{
     238        struct dag_per_stream_t stream_data;
     239
    222240        libtrace->format_data = (struct dag_format_data_t *)
    223241                malloc(sizeof(struct dag_format_data_t));
    224242        DUCK.last_duck = 0;
    225         DUCK.duck_freq = 0;
    226         DUCK.last_pkt = 0;
    227         DUCK.dummy_duck = NULL;
    228         FORMAT_DATA->stream_attached = 0;
    229         FORMAT_DATA->drops = 0;
    230         FORMAT_DATA->device_name = NULL;
    231         FORMAT_DATA->device = NULL;
    232         FORMAT_DATA->dagstream = 0;
    233         FORMAT_DATA->processed = 0;
    234         FORMAT_DATA->bottom = NULL;
    235         FORMAT_DATA->top = NULL;
    236         memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
     243        DUCK.duck_freq = 0;
     244        DUCK.last_pkt = 0;
     245        DUCK.dummy_duck = NULL;
     246
     247        FORMAT_DATA->per_stream =
     248                libtrace_list_init(sizeof(stream_data));
     249        assert(FORMAT_DATA->per_stream != NULL);
     250
     251        /* We'll start with just one instance of stream_data, and we'll
     252         * add more later if we need them */
     253        memset(&stream_data, 0, sizeof(stream_data));
     254        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
    237255}
    238256
     
    241259 *
    242260 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    243 static struct dag_dev_t *dag_find_open_device(char *dev_name) {
     261static struct dag_dev_t *dag_find_open_device(char *dev_name)
     262{
    244263        struct dag_dev_t *dag_dev;
    245264
     
    252271                        dag_dev->ref_count ++;
    253272                        return dag_dev;
    254 
    255273                }
    256274                dag_dev = dag_dev->next;
    257275        }
    258276        return NULL;
    259 
    260 
    261277}
    262278
     
    267283 *
    268284 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    269 static void dag_close_device(struct dag_dev_t *dev) {
     285static void dag_close_device(struct dag_dev_t *dev)
     286{
    270287        /* Need to remove from the device list */
    271 
    272288        assert(dev->ref_count == 0);
    273289
     
    292308 *
    293309 * NOTE: this function should only be called when opening a DAG device for
    294  * writing - there is little practical difference between this and the 
     310 * writing - there is little practical difference between this and the
    295311 * function below that covers the reading case, but we need the output trace
    296  * object to report errors properly so the two functions take slightly 
     312 * object to report errors properly so the two functions take slightly
    297313 * different arguments. This is really lame and there should be a much better
    298314 * way of doing this.
    299315 *
    300  * NOTE: This function assumes the open_dag_mutex is held by the caller 
     316 * NOTE: This function assumes the open_dag_mutex is held by the caller
    301317 */
    302 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
     318static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
     319                                                char *dev_name)
     320{
    303321        struct stat buf;
    304322        int fd;
     
    309327                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
    310328                return NULL;
    311 }
     329        }
    312330
    313331        /* Make sure it is the appropriate type of device */
     
    346364 *
    347365 * NOTE: this function should only be called when opening a DAG device for
    348  * reading - there is little practical difference between this and the 
     366 * reading - there is little practical difference between this and the
    349367 * function above that covers the writing case, but we need the input trace
    350  * object to report errors properly so the two functions take slightly 
     368 * object to report errors properly so the two functions take slightly
    351369 * different arguments. This is really lame and there should be a much better
    352370 * way of doing this.
     
    359377
    360378        /* Make sure the device exists */
    361         if (stat(dev_name, &buf) == -1) {
    362                 trace_set_err(libtrace,errno,"stat(%s)",dev_name);
    363                 return NULL;
    364         }
     379        if (stat(dev_name, &buf) == -1) {
     380                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
     381                return NULL;
     382        }
    365383
    366384        /* Make sure it is the appropriate type of device */
     
    368386                /* Try opening the DAG device */
    369387                if((fd = dag_open(dev_name)) < 0) {
    370                         trace_set_err(libtrace,errno,"Cannot open DAG %s",
    371                                         dev_name);
    372                         return NULL;
    373                 }
     388                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
     389                                      dev_name);
     390                        return NULL;
     391                }
    374392        } else {
    375393                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
    376                                 dev_name);
    377                 return NULL;
    378         }
     394                              dev_name);
     395                return NULL;
     396        }
    379397
    380398        /* Add the device to our device list - it is just a doubly linked
     
    397415
    398416/* Creates and initialises a DAG output trace */
    399 static int dag_init_output(libtrace_out_t *libtrace) {
     417static int dag_init_output(libtrace_out_t *libtrace)
     418{
     419        /* Upon successful creation, the device name is stored against the
     420         * device and free when it is free()d */
     421        char *dag_dev_name = NULL;
    400422        char *scan = NULL;
    401423        struct dag_dev_t *dag_device = NULL;
    402424        int stream = 1;
    403        
     425
    404426        /* XXX I don't know if this is important or not, but this function
    405427         * isn't present in all of the driver releases that this code is
     
    411433
    412434        dag_init_format_out_data(libtrace);
    413         /* Grab the mutex while we're likely to be messing with the device 
     435        /* Grab the mutex while we're likely to be messing with the device
    414436         * list */
    415437        pthread_mutex_lock(&open_dag_mutex);
    416        
     438
    417439        /* Specific streams are signified using a comma in the libtrace URI,
    418440         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
     
    420442         * If no stream is specified, we will write using stream 1 */
    421443        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    422                 FORMAT_DATA_OUT->device_name = strdup(libtrace->uridata);
     444                dag_dev_name = strdup(libtrace->uridata);
    423445        } else {
    424                 FORMAT_DATA_OUT->device_name =
    425                                 (char *)strndup(libtrace->uridata,
     446                dag_dev_name = (char *)strndup(libtrace->uridata,
    426447                                (size_t)(scan - libtrace->uridata));
    427448                stream = atoi(++scan);
     
    430451
    431452        /* See if our DAG device is already open */
    432         dag_device = dag_find_open_device(FORMAT_DATA_OUT->device_name);
     453        dag_device = dag_find_open_device(dag_dev_name);
    433454
    434455        if (dag_device == NULL) {
    435456                /* Device not yet opened - open it ourselves */
    436                 dag_device = dag_open_output_device(libtrace,
    437                                 FORMAT_DATA_OUT->device_name);
     457                dag_device = dag_open_output_device(libtrace, dag_dev_name);
     458        } else {
     459                /* Otherwise, just use the existing one */
     460                free(dag_dev_name);
     461                dag_dev_name = NULL;
    438462        }
    439463
    440464        /* Make sure we have successfully opened a DAG device */
    441465        if (dag_device == NULL) {
    442                 if (FORMAT_DATA_OUT->device_name) {
    443                         free(FORMAT_DATA_OUT->device_name);
    444                         FORMAT_DATA_OUT->device_name = NULL;
     466                if (dag_dev_name) {
     467                        free(dag_dev_name);
    445468                }
    446469                pthread_mutex_unlock(&open_dag_mutex);
     
    455478/* Creates and initialises a DAG input trace */
    456479static int dag_init_input(libtrace_t *libtrace) {
     480        /* Upon successful creation, the device name is stored against the
     481         * device and free when it is free()d */
     482        char *dag_dev_name = NULL;
    457483        char *scan = NULL;
    458484        int stream = 0;
     
    460486
    461487        dag_init_format_data(libtrace);
    462         /* Grab the mutex while we're likely to be messing with the device 
     488        /* Grab the mutex while we're likely to be messing with the device
    463489         * list */
    464490        pthread_mutex_lock(&open_dag_mutex);
    465        
    466        
    467         /* Specific streams are signified using a comma in the libtrace URI,
     491
     492
     493        /* DAG cards support multiple streams. In a single threaded capture,
     494         * these are specified using a comma in the libtrace URI,
    468495         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    469496         *
    470          * If no stream is specified, we will read from stream 0 */
     497         * If no stream is specified, we will read from stream 0 with
     498         * one thread
     499         */
    471500        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    472                 FORMAT_DATA->device_name = strdup(libtrace->uridata);
     501                dag_dev_name = strdup(libtrace->uridata);
    473502        } else {
    474                 FORMAT_DATA->device_name = (char *)strndup(libtrace->uridata,
     503                dag_dev_name = (char *)strndup(libtrace->uridata,
    475504                                (size_t)(scan - libtrace->uridata));
    476505                stream = atoi(++scan);
    477506        }
    478507
    479         FORMAT_DATA->dagstream = stream;
     508        FORMAT_DATA_FIRST->dagstream = stream;
    480509
    481510        /* See if our DAG device is already open */
    482         dag_device = dag_find_open_device(FORMAT_DATA->device_name);
     511        dag_device = dag_find_open_device(dag_dev_name);
    483512
    484513        if (dag_device == NULL) {
    485514                /* Device not yet opened - open it ourselves */
    486                 dag_device=dag_open_device(libtrace, FORMAT_DATA->device_name);
     515                dag_device = dag_open_device(libtrace, dag_dev_name);
     516        } else {
     517                /* Otherwise, just use the existing one */
     518                free(dag_dev_name);
     519                dag_dev_name = NULL;
    487520        }
    488521
    489522        /* Make sure we have successfully opened a DAG device */
    490523        if (dag_device == NULL) {
    491                 if (FORMAT_DATA->device_name)
    492                         free(FORMAT_DATA->device_name);
    493                 FORMAT_DATA->device_name = NULL;
     524                if (dag_dev_name)
     525                        free(dag_dev_name);
     526                dag_dev_name = NULL;
    494527                pthread_mutex_unlock(&open_dag_mutex);
    495528                return -1;
     
    498531        FORMAT_DATA->device = dag_device;
    499532
    500         /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
    501         /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
    502  
    503         *  The symptom of the port being disabled is that libtrace will appear to hang.
    504         */
     533        /* See Config_Status_API_Programming_Guide.pdf from the Endace
     534           Dag Documentation */
     535        /* Check kBooleanAttributeActive is true -- no point capturing
     536         * on an interface that's disabled
     537         *
     538         * The symptom of the port being disabled is that libtrace
     539         * will appear to hang. */
    505540        /* Check kBooleanAttributeFault is false */
    506541        /* Check kBooleanAttributeLocalFault is false */
     
    508543        /* Check kBooleanAttributePeerLink ? */
    509544
    510         /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
     545        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
     546           on libtrace promisc attribute?*/
    511547        /* Set kUint32AttributeSnapLength to the snaplength */
    512548
    513549        pthread_mutex_unlock(&open_dag_mutex);
    514         return 0;
     550        return 0;
    515551}
    516552
    517553/* Configures a DAG input trace */
    518554static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
    519                                 void *data) {
    520         char conf_str[4096];
     555                            void *data)
     556{
     557        char conf_str[4096];
    521558        switch(option) {
    522                 case TRACE_OPTION_META_FREQ:
    523                         /* This option is used to specify the frequency of DUCK
    524                          * updates */
    525                         DUCK.duck_freq = *(int *)data;
    526                         return 0;
    527                 case TRACE_OPTION_SNAPLEN:
    528                         /* Tell the card our new snap length */
    529                         snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
    530                         if (dag_configure(FORMAT_DATA->device->fd,
    531                                                 conf_str) != 0) {
    532                                 trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
    533                                 return -1;
    534                         }
    535                         return 0;
    536                 case TRACE_OPTION_PROMISC:
    537                         /* DAG already operates in a promisc fashion */
    538                         return -1;
    539                 case TRACE_OPTION_FILTER:
    540                         /* We don't yet support pushing filters into DAG
    541                          * cards */
    542                         return -1;
    543                 case TRACE_OPTION_EVENT_REALTIME:
    544                         /* Live capture is always going to be realtime */
     559        case TRACE_OPTION_META_FREQ:
     560                /* This option is used to specify the frequency of DUCK
     561                 * updates */
     562                DUCK.duck_freq = *(int *)data;
     563                return 0;
     564        case TRACE_OPTION_SNAPLEN:
     565                /* Tell the card our new snap length */
     566                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
     567                if (dag_configure(FORMAT_DATA->device->fd,
     568                                  conf_str) != 0) {
     569                        trace_set_err(libtrace, errno, "Failed to configure "
     570                                      "snaplen on DAG card: %s",
     571                                      libtrace->uridata);
    545572                        return -1;
    546         }
     573                }
     574                return 0;
     575        case TRACE_OPTION_PROMISC:
     576                /* DAG already operates in a promisc fashion */
     577                return -1;
     578        case TRACE_OPTION_FILTER:
     579                /* We don't yet support pushing filters into DAG
     580                 * cards */
     581                return -1;
     582        case TRACE_OPTION_EVENT_REALTIME:
     583                /* Live capture is always going to be realtime */
     584                return -1;
     585        case TRACE_OPTION_HASHER:
     586                /* Lets just say we did this, it's currently still up to
     587                 * the user to configure this correctly. */
     588                return 0;
     589        }
    547590        return -1;
    548591}
    549592
    550593/* Starts a DAG output trace */
    551 static int dag_start_output(libtrace_out_t *libtrace) {
     594static int dag_start_output(libtrace_out_t *libtrace)
     595{
    552596        struct timeval zero, nopoll;
    553597
     
    557601
    558602        /* Attach and start the DAG stream */
    559 
    560603        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
    561604                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
     
    572615
    573616        /* We don't want the dag card to do any sleeping */
    574 
    575617        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
    576618                        FORMAT_DATA_OUT->dagstream, 0, &zero,
     
    580622}
    581623
    582 /* Starts a DAG input trace */
    583 static int dag_start_input(libtrace_t *libtrace) {
    584         struct timeval zero, nopoll;
    585         uint8_t *top, *bottom, *starttop;
     624static int dag_start_input_stream(libtrace_t *libtrace,
     625                                  struct dag_per_stream_t * stream) {
     626        struct timeval zero, nopoll;
     627        uint8_t *top, *bottom, *starttop;
    586628        top = bottom = NULL;
    587629
    588630        zero.tv_sec = 0;
    589         zero.tv_usec = 10000;
    590         nopoll = zero;
     631        zero.tv_usec = 10000;
     632        nopoll = zero;
    591633
    592634        /* Attach and start the DAG stream */
    593635        if (dag_attach_stream(FORMAT_DATA->device->fd,
    594                                 FORMAT_DATA->dagstream, 0, 0) < 0) {
    595                 trace_set_err(libtrace, errno, "Cannot attach DAG stream");
    596                 return -1;
    597         }
     636                              stream->dagstream, 0, 0) < 0) {
     637                trace_set_err(libtrace, errno, "Cannot attach DAG stream #%u",
     638                              stream->dagstream);
     639                return -1;
     640        }
    598641
    599642        if (dag_start_stream(FORMAT_DATA->device->fd,
    600                                 FORMAT_DATA->dagstream) < 0) {
    601                 trace_set_err(libtrace, errno, "Cannot start DAG stream");
    602                 return -1;
    603         }
     643                             stream->dagstream) < 0) {
     644                trace_set_err(libtrace, errno, "Cannot start DAG stream #%u",
     645                              stream->dagstream);
     646                return -1;
     647        }
    604648        FORMAT_DATA->stream_attached = 1;
    605        
     649
    606650        /* We don't want the dag card to do any sleeping */
    607         dag_set_stream_poll(FORMAT_DATA->device->fd,
    608                                 FORMAT_DATA->dagstream, 0, &zero,
    609                                 &nopoll);
     651        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
     652                            stream->dagstream, 0, &zero,
     653                            &nopoll) < 0) {
     654                trace_set_err(libtrace, errno,
     655                              "dag_set_stream_poll failed!");
     656                return -1;
     657        }
    610658
    611659        starttop = dag_advance_stream(FORMAT_DATA->device->fd,
    612                                         FORMAT_DATA->dagstream,
    613                                         &bottom);
     660                                      stream->dagstream,
     661                                      &bottom);
    614662
    615663        /* Should probably flush the memory hole now */
     
    618666                bottom += (starttop - bottom);
    619667                top = dag_advance_stream(FORMAT_DATA->device->fd,
    620                                         FORMAT_DATA->dagstream,
    621                                         &bottom);
    622         }
    623         FORMAT_DATA->top = top;
    624         FORMAT_DATA->bottom = bottom;
    625         FORMAT_DATA->processed = 0;
    626         FORMAT_DATA->drops = 0;
     668                                         stream->dagstream,
     669                                         &bottom);
     670        }
     671        stream->top = top;
     672        stream->bottom = bottom;
     673        stream->processed = 0;
     674        stream->drops = 0;
    627675
    628676        return 0;
     677
     678}
     679
     680/* Starts a DAG input trace */
     681static int dag_start_input(libtrace_t *libtrace)
     682{
     683        return dag_start_input_stream(libtrace, FORMAT_DATA_FIRST);
     684}
     685
     686static int dag_pstart_input(libtrace_t *libtrace)
     687{
     688        char *scan, *tok;
     689        uint16_t stream_count = 0, max_streams;
     690        int iserror = 0;
     691        struct dag_per_stream_t stream_data;
     692
     693        /* Check we aren't trying to create more threads than the DAG card can
     694         * handle */
     695        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
     696        if (libtrace->perpkt_thread_count > max_streams) {
     697                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     698                              "trying to create too many threads (max is %u)",
     699                              max_streams);
     700                iserror = 1;
     701                goto cleanup;
     702        }
     703
     704        /* Get the stream names from the uri */
     705        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     706                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     707                              "format uri doesn't specify the DAG streams");
     708                iserror = 1;
     709                goto cleanup;
     710        }
     711
     712        scan++;
     713
     714        tok = strtok(scan, ",");
     715        while (tok != NULL) {
     716                /* Ensure we haven't specified too many streams */
     717                if (stream_count >= libtrace->perpkt_thread_count) {
     718                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     719                                      "format uri specifies too many streams. "
     720                                      "Max is %u", max_streams);
     721                        iserror = 1;
     722                        goto cleanup;
     723                }
     724
     725                /* Save the stream details */
     726                if (stream_count == 0) {
     727                        /* Special case where we update the existing stream
     728                         * data structure */
     729                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
     730                } else {
     731                        memset(&stream_data, 0, sizeof(stream_data));
     732                        stream_data.dagstream = (uint16_t)atoi(tok);
     733                        libtrace_list_push_back(FORMAT_DATA->per_stream,
     734                                                &stream_data);
     735                }
     736
     737                stream_count++;
     738                tok = strtok(NULL, ",");
     739        }
     740
     741        FORMAT_DATA->stream_attached = 1;
     742
     743 cleanup:
     744        if (iserror) {
     745                return -1;
     746        } else {
     747                return 0;
     748        }
    629749}
    630750
    631751/* Pauses a DAG output trace */
    632 static int dag_pause_output(libtrace_out_t *libtrace) {
    633 
     752static int dag_pause_output(libtrace_out_t *libtrace)
     753{
    634754        /* Stop and detach the stream */
    635755        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
    636                         FORMAT_DATA_OUT->dagstream) < 0) {
     756                            FORMAT_DATA_OUT->dagstream) < 0) {
    637757                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
    638758                return -1;
    639759        }
    640760        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
    641                         FORMAT_DATA_OUT->dagstream) < 0) {
    642                 trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
     761                              FORMAT_DATA_OUT->dagstream) < 0) {
     762                trace_set_err_out(libtrace, errno,
     763                                  "Could not detach DAG stream");
    643764                return -1;
    644765        }
     
    648769
    649770/* Pauses a DAG input trace */
    650 static int dag_pause_input(libtrace_t *libtrace) {
    651 
    652         /* Stop and detach the stream */
    653         if (dag_stop_stream(FORMAT_DATA->device->fd,
    654                                 FORMAT_DATA->dagstream) < 0) {
    655                 trace_set_err(libtrace, errno, "Could not stop DAG stream");
    656                 return -1;
    657         }
    658         if (dag_detach_stream(FORMAT_DATA->device->fd,
    659                                 FORMAT_DATA->dagstream) < 0) {
    660                 trace_set_err(libtrace, errno, "Could not detach DAG stream");
    661                 return -1;
    662         }
     771static int dag_pause_input(libtrace_t *libtrace)
     772{
     773        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     774
     775        /* Stop and detach each stream */
     776        while (tmp != NULL) {
     777                if (dag_stop_stream(FORMAT_DATA->device->fd,
     778                                    STREAM_DATA(tmp)->dagstream) < 0) {
     779                        trace_set_err(libtrace, errno,
     780                                      "Could not stop DAG stream");
     781                        printf("Count not stop DAG stream\n");
     782                        return -1;
     783                }
     784                if (dag_detach_stream(FORMAT_DATA->device->fd,
     785                                      STREAM_DATA(tmp)->dagstream) < 0) {
     786                        trace_set_err(libtrace, errno,
     787                                      "Could not detach DAG stream");
     788                        printf("Count not detach DAG stream\n");
     789                        return -1;
     790                }
     791
     792                tmp = tmp->next;
     793        }
     794
    663795        FORMAT_DATA->stream_attached = 0;
    664796        return 0;
    665797}
    666798
     799
     800
    667801/* Closes a DAG input trace */
    668 static int dag_fin_input(libtrace_t *libtrace) {
     802static int dag_fin_input(libtrace_t *libtrace)
     803{
    669804        /* Need the lock, since we're going to be handling the device list */
    670805        pthread_mutex_lock(&open_dag_mutex);
    671        
     806
    672807        /* Detach the stream if we are not paused */
    673808        if (FORMAT_DATA->stream_attached)
    674809                dag_pause_input(libtrace);
    675         FORMAT_DATA->device->ref_count --;
     810        FORMAT_DATA->device->ref_count--;
    676811
    677812        /* Close the DAG device if there are no more references to it */
    678813        if (FORMAT_DATA->device->ref_count == 0)
    679814                dag_close_device(FORMAT_DATA->device);
     815
    680816        if (DUCK.dummy_duck)
    681817                trace_destroy_dead(DUCK.dummy_duck);
    682         if (FORMAT_DATA->device_name)
    683                 free(FORMAT_DATA->device_name);
     818
     819        /* Clear the list */
     820        libtrace_list_deinit(FORMAT_DATA->per_stream);
    684821        free(libtrace->format_data);
    685822        pthread_mutex_unlock(&open_dag_mutex);
    686         return 0; /* success */
     823        return 0; /* success */
    687824}
    688825
    689826/* Closes a DAG output trace */
    690 static int dag_fin_output(libtrace_out_t *libtrace) {
    691        
     827static int dag_fin_output(libtrace_out_t *libtrace)
     828{
     829
    692830        /* Commit any outstanding traffic in the txbuffer */
    693831        if (FORMAT_DATA_OUT->waiting) {
    694                 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    695                                 FORMAT_DATA_OUT->waiting );
    696         }
    697 
    698         /* Wait until the buffer is nearly clear before exiting the program,
     832                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     833                                           FORMAT_DATA_OUT->dagstream,
     834                                           FORMAT_DATA_OUT->waiting );
     835        }
     836
     837        /* Wait until the buffer is nearly clear before exiting the program,
    699838         * as we will lose packets otherwise */
    700         dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    701                         FORMAT_DATA_OUT->dagstream,
    702                         dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
    703                                         FORMAT_DATA_OUT->dagstream) - 8
    704                         );
     839        dag_tx_get_stream_space
     840                (FORMAT_DATA_OUT->device->fd,
     841                 FORMAT_DATA_OUT->dagstream,
     842                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
     843                                            FORMAT_DATA_OUT->dagstream) - 8);
    705844
    706845        /* Need the lock, since we're going to be handling the device list */
     
    715854        if (FORMAT_DATA_OUT->device->ref_count == 0)
    716855                dag_close_device(FORMAT_DATA_OUT->device);
    717         if (FORMAT_DATA_OUT->device_name)
    718                 free(FORMAT_DATA_OUT->device_name);
    719856        free(libtrace->format_data);
    720857        pthread_mutex_unlock(&open_dag_mutex);
     
    752889
    753890        /* Allocate memory for the DUCK data */
    754         if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
    755                         !packet->buffer) {
    756                 packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
    757                 packet->buf_control = TRACE_CTRL_PACKET;
    758                 if (!packet->buffer) {
    759                         trace_set_err(libtrace, errno,
    760                                         "Cannot allocate packet buffer");
    761                         return -1;
    762                 }
    763         }
     891        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
     892            !packet->buffer) {
     893                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
     894                packet->buf_control = TRACE_CTRL_PACKET;
     895                if (!packet->buffer) {
     896                        trace_set_err(libtrace, errno,
     897                                      "Cannot allocate packet buffer");
     898                        return -1;
     899                }
     900        }
    764901
    765902        /* DUCK doesn't have a format header */
    766         packet->header = 0;
    767         packet->payload = packet->buffer;
    768 
    769         /* No need to check if we can get DUCK or not - we're modern
    770         * enough so just grab the DUCK info */
    771         if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
    772                                         (duckinf_t *)packet->payload) < 0)) {
    773                 trace_set_err(libtrace, errno, "Error using DUCK ioctl");
     903        packet->header = 0;
     904        packet->payload = packet->buffer;
     905
     906        /* No need to check if we can get DUCK or not - we're modern
     907        * enough so just grab the DUCK info */
     908        if ((ioctl(FORMAT_DATA->device->fd, LIBTRACE_DUCK_IOCTL,
     909                   (duckinf_t *)packet->payload) < 0)) {
     910                trace_set_err(libtrace, errno, "Error using DUCK ioctl");
    774911                DUCK.duck_freq = 0;
    775                 return -1;
    776         }
    777 
    778         packet->type = LIBTRACE_DUCK_VERSION;
     912                return -1;
     913        }
     914
     915        packet->type = LIBTRACE_DUCK_VERSION;
    779916
    780917        /* Set the packet's trace to point at a DUCK trace, so that the
    781918         * DUCK format functions will be called on the packet rather than the
    782919         * DAG ones */
    783         if (!DUCK.dummy_duck)
    784                 DUCK.dummy_duck = trace_create_dead("duck:dummy");
    785         packet->trace = DUCK.dummy_duck;
    786         DUCK.last_duck = DUCK.last_pkt;
    787         return sizeof(duckinf_t);
     920        if (!DUCK.dummy_duck)
     921                DUCK.dummy_duck = trace_create_dead("duck:dummy");
     922        packet->trace = DUCK.dummy_duck;
     923        DUCK.last_duck = DUCK.last_pkt;
     924        packet->error = sizeof(duckinf_t);
     925        return sizeof(duckinf_t);
    788926}
    789927
    790928/* Determines the amount of data available to read from the DAG card */
    791 static int dag_available(libtrace_t *libtrace) {
    792         uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     929static int dag_available(libtrace_t *libtrace,
     930                         struct dag_per_stream_t *stream_data)
     931{
     932        uint32_t diff = stream_data->top - stream_data->bottom;
    793933
    794934        /* If we've processed more than 4MB of data since we last called
    795935         * dag_advance_stream, then we should call it again to allow the
    796936         * space occupied by that 4MB to be released */
    797         if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
     937        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
    798938                return diff;
    799        
     939
    800940        /* Update the top and bottom pointers */
    801         FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
    802                         FORMAT_DATA->dagstream,
    803                         &(FORMAT_DATA->bottom));
    804        
    805         if (FORMAT_DATA->top == NULL) {
     941        stream_data->top = dag_advance_stream(FORMAT_DATA->device->fd,
     942                                              stream_data->dagstream,
     943                                              &(stream_data->bottom));
     944
     945        if (stream_data->top == NULL) {
    806946                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
    807947                return -1;
    808948        }
    809         FORMAT_DATA->processed = 0;
    810         diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     949        stream_data->processed = 0;
     950        diff = stream_data->top - stream_data->bottom;
    811951        return diff;
    812952}
    813953
    814954/* Returns a pointer to the start of the next complete ERF record */
    815 static dag_record_t *dag_get_record(libtrace_t *libtrace) {
    816         dag_record_t *erfptr = NULL;
    817         uint16_t size;
    818         erfptr = (dag_record_t *)FORMAT_DATA->bottom;
     955static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
     956{
     957        dag_record_t *erfptr = NULL;
     958        uint16_t size;
     959
     960        erfptr = (dag_record_t *)stream_data->bottom;
    819961        if (!erfptr)
    820                 return NULL;
    821         size = ntohs(erfptr->rlen);
    822         assert( size >= dag_record_size );
     962                return NULL;
     963
     964        size = ntohs(erfptr->rlen);
     965        assert( size >= dag_record_size );
     966
    823967        /* Make certain we have the full packet available */
    824         if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
     968        if (size > (stream_data->top - stream_data->bottom))
    825969                return NULL;
    826         FORMAT_DATA->bottom += size;
    827         FORMAT_DATA->processed += size;
     970
     971        stream_data->bottom += size;
     972        stream_data->processed += size;
    828973        return erfptr;
    829974}
     
    831976/* Converts a buffer containing a recently read DAG packet record into a
    832977 * libtrace packet */
    833 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
    834                 void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
    835 
     978static int dag_prepare_packet_stream(libtrace_t *libtrace,
     979                                     struct dag_per_stream_t *stream_data,
     980                                     libtrace_packet_t *packet,
     981                                     void *buffer, libtrace_rt_types_t rt_type,
     982                                     uint32_t flags)
     983{
    836984        dag_record_t *erfptr;
    837        
     985
    838986        /* If the packet previously owned a buffer that is not the buffer
    839         * that contains the new packet data, we're going to need to free the
    840         * old one to avoid memory leaks */
     987        * that contains the new packet data, we're going to need to free the
     988        * old one to avoid memory leaks */
    841989        if (packet->buffer != buffer &&
    842                         packet->buf_control == TRACE_CTRL_PACKET) {
     990            packet->buf_control == TRACE_CTRL_PACKET) {
    843991                free(packet->buffer);
    844992        }
     
    8531001        erfptr = (dag_record_t *)buffer;
    8541002        packet->buffer = erfptr;
    855         packet->header = erfptr;
    856         packet->type = rt_type;
     1003        packet->header = erfptr;
     1004        packet->type = rt_type;
    8571005
    8581006        if (erfptr->flags.rxerror == 1) {
    859                 /* rxerror means the payload is corrupt - drop the payload
    860                 * by tweaking rlen */
    861                 packet->payload = NULL;
    862                 erfptr->rlen = htons(erf_get_framing_length(packet));
    863         } else {
    864                 packet->payload = (char*)packet->buffer
    865                         + erf_get_framing_length(packet);
    866         }
     1007                /* rxerror means the payload is corrupt - drop the payload
     1008                * by tweaking rlen */
     1009                packet->payload = NULL;
     1010                erfptr->rlen = htons(erf_get_framing_length(packet));
     1011        } else {
     1012                packet->payload = (char*)packet->buffer
     1013                        + erf_get_framing_length(packet);
     1014        }
    8671015
    8681016        if (libtrace->format_data == NULL) {
     
    8711019
    8721020        /* Update the dropped packets counter */
    873 
    874         /* No loss counter for DSM coloured records - have to use
    875          * some other API */
     1021        /* No loss counter for DSM coloured records - have to use some
     1022         * other API */
    8761023        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    8771024                /* TODO */
    8781025        } else {
    8791026                /* Use the ERF loss counter */
    880                 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
    881                         FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
     1027                if (stream_data->seeninterface[erfptr->flags.iface]
     1028                    == 0) {
     1029                        stream_data->seeninterface[erfptr->flags.iface]
     1030                                = 1;
    8821031                } else {
    883                         FORMAT_DATA->drops += ntohs(erfptr->lctr);
     1032                        stream_data->drops += ntohs(erfptr->lctr);
    8841033                }
    8851034        }
    8861035
    8871036        return 0;
     1037}
     1038
     1039static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
     1040                              void *buffer, libtrace_rt_types_t rt_type,
     1041                              uint32_t flags)
     1042{
     1043        return dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
     1044                                       buffer, rt_type, flags);
    8881045}
    8891046
     
    8981055/* Pushes an ERF record onto the transmit stream */
    8991056static int dag_dump_packet(libtrace_out_t *libtrace,
    900                 dag_record_t *erfptr, unsigned int pad, void *buffer) {
     1057                           dag_record_t *erfptr, unsigned int pad,
     1058                           void *buffer)
     1059{
    9011060        int size;
    9021061
    9031062        /*
    904          * If we've got 0 bytes waiting in the txqueue, assume that we haven't
    905          * requested any space yet, and request some, storing the pointer at
    906          * FORMAT_DATA_OUT->txbuffer.
     1063         * If we've got 0 bytes waiting in the txqueue, assume that we
     1064         * haven't requested any space yet, and request some, storing
     1065         * the pointer at FORMAT_DATA_OUT->txbuffer.
    9071066         *
    9081067         * The amount to request is slightly magical at the moment - it's
     
    9111070         */
    9121071        if (FORMAT_DATA_OUT->waiting == 0) {
    913                 FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    914                                 FORMAT_DATA_OUT->dagstream, 16908288);
     1072                FORMAT_DATA_OUT->txbuffer =
     1073                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
     1074                                                FORMAT_DATA_OUT->dagstream,
     1075                                                16908288);
    9151076        }
    9161077
     
    9191080         * are in contiguous memory
    9201081         */
    921         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
     1082        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
     1083               (dag_record_size + pad));
    9221084        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
    923 
    924 
    9251085
    9261086        /*
     
    9291089         */
    9301090        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
    931         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
     1091        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
     1092               size);
    9321093        FORMAT_DATA_OUT->waiting += size;
    9331094
     
    9381099         * case there is still data in the buffer at program exit.
    9391100         */
    940 
    9411101        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
    942                 FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    943                         FORMAT_DATA_OUT->waiting );
     1102                FORMAT_DATA_OUT->txbuffer =
     1103                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     1104                                                   FORMAT_DATA_OUT->dagstream,
     1105                                                   FORMAT_DATA_OUT->waiting);
    9441106                FORMAT_DATA_OUT->waiting = 0;
    9451107        }
    9461108
    9471109        return size + pad + dag_record_size;
    948 
    9491110}
    9501111
     
    9521113 * if one is found, false otherwise */
    9531114static bool find_compatible_linktype(libtrace_out_t *libtrace,
    954                                 libtrace_packet_t *packet, char *type)
    955 {
    956          // Keep trying to simplify the packet until we can find
    957          //something we can do with it
     1115                                     libtrace_packet_t *packet, char *type)
     1116{
     1117        /* Keep trying to simplify the packet until we can find
     1118         * something we can do with it */
    9581119
    9591120        do {
    960                 *type=libtrace_to_erf_type(trace_get_link_type(packet));
    961 
    962                 // Success
     1121                *type = libtrace_to_erf_type(trace_get_link_type(packet));
     1122
     1123                /* Success */
    9631124                if (*type != (char)-1)
    9641125                        return true;
     
    9661127                if (!demote_packet(packet)) {
    9671128                        trace_set_err_out(libtrace,
    968                                         TRACE_ERR_NO_CONVERSION,
    969                                         "No erf type for packet (%i)",
    970                                         trace_get_link_type(packet));
     1129                                          TRACE_ERR_NO_CONVERSION,
     1130                                          "No erf type for packet (%i)",
     1131                                          trace_get_link_type(packet));
    9711132                        return false;
    9721133                }
     
    9781139
    9791140/* Writes a packet to the provided DAG output trace */
    980 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    981         /*
    982          * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
    983          * sucks, sorry about that.
     1141static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
     1142{
     1143        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
     1144         * coding sucks, sorry about that.
    9841145         */
    9851146        unsigned int pad = 0;
     
    9901151
    9911152        if(!packet->header) {
    992                 /* No header, probably an RT packet. Lifted from 
     1153                /* No header, probably an RT packet. Lifted from
    9931154                 * erf_write_packet(). */
    9941155                return -1;
     
    10091170
    10101171        if (packet->type == TRACE_RT_DATA_ERF) {
    1011                 numbytes = dag_dump_packet(libtrace,
    1012                                 header,
    1013                                 pad,
    1014                                 payload
    1015                                 );
    1016 
     1172                numbytes = dag_dump_packet(libtrace, header, pad, payload);
    10171173        } else {
    10181174                /* Build up a new packet header from the existing header */
    10191175
    1020                 /* Simplify the packet first - if we can't do this, break 
     1176                /* Simplify the packet first - if we can't do this, break
    10211177                 * early */
    10221178                if (!find_compatible_linktype(libtrace,packet,&erf_type))
     
    10371193
    10381194                /* Packet length (rlen includes format overhead) */
    1039                 assert(trace_get_capture_length(packet)>0
    1040                                 && trace_get_capture_length(packet)<=65536);
    1041                 assert(erf_get_framing_length(packet)>0
    1042                                 && trace_get_framing_length(packet)<=65536);
    1043                 assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
    1044                       &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
     1195                assert(trace_get_capture_length(packet) > 0
     1196                       && trace_get_capture_length(packet) <= 65536);
     1197                assert(erf_get_framing_length(packet) > 0
     1198                       && trace_get_framing_length(packet) <= 65536);
     1199                assert(trace_get_capture_length(packet) +
     1200                       erf_get_framing_length(packet) > 0
     1201                       && trace_get_capture_length(packet) +
     1202                       erf_get_framing_length(packet) <= 65536);
    10451203
    10461204                erfhdr.rlen = htons(trace_get_capture_length(packet)
    1047                         + erf_get_framing_length(packet));
     1205                                    + erf_get_framing_length(packet));
    10481206
    10491207
     
    10541212
    10551213                /* Write it out */
    1056                 numbytes = dag_dump_packet(libtrace,
    1057                                 &erfhdr,
    1058                                 pad,
    1059                                 payload);
    1060 
     1214                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
    10611215        }
    10621216
     
    10681222 * If DUCK reporting is enabled, the packet returned may be a DUCK update
    10691223 */
    1070 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1071         int size = 0;
    1072         struct timeval tv;
    1073         dag_record_t *erfptr = NULL;
     1224static int dag_read_packet_stream(libtrace_t *libtrace,
     1225                                struct dag_per_stream_t *stream_data,
     1226                                libtrace_thread_t *t, /* Optional */
     1227                                libtrace_packet_t *packet)
     1228{
     1229        int size = 0;
     1230        dag_record_t *erfptr = NULL;
     1231        struct timeval tv;
    10741232        int numbytes = 0;
    10751233        uint32_t flags = 0;
    1076         struct timeval maxwait;
    1077         struct timeval pollwait;
     1234        struct timeval maxwait, pollwait;
    10781235
    10791236        pollwait.tv_sec = 0;
     
    10821239        maxwait.tv_usec = 250000;
    10831240
    1084         /* Check if we're due for a DUCK report */
    1085         size = dag_get_duckinfo(libtrace, packet);
    1086 
    1087         if (size != 0)
    1088                 return size;
     1241        /* Check if we're due for a DUCK report - only report on the first thread */
     1242        if (stream_data == FORMAT_DATA_FIRST) {
     1243                size = dag_get_duckinfo(libtrace, packet);
     1244                if (size != 0)
     1245                        return size;
     1246        }
    10891247
    10901248
     
    10941252        /* If the packet buffer is currently owned by libtrace, free it so
    10951253         * that we can set the packet to point into the DAG memory hole */
    1096         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1097                 free(packet->buffer);
    1098                 packet->buffer = 0;
    1099         }
    1100        
    1101         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1102                         FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait,
    1103                         &pollwait) == -1)
    1104         {
     1254        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1255                free(packet->buffer);
     1256                packet->buffer = 0;
     1257        }
     1258
     1259        if (dag_set_stream_poll(FORMAT_DATA->device->fd, stream_data->dagstream,
     1260                                sizeof(dag_record_t), &maxwait,
     1261                                &pollwait) == -1) {
    11051262                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11061263                return -1;
    11071264        }
    1108 
    11091265
    11101266        /* Grab a full ERF record */
    11111267        do {
    1112                 numbytes = dag_available(libtrace);
     1268                numbytes = dag_available(libtrace, stream_data);
    11131269                if (numbytes < 0)
    11141270                        return numbytes;
    11151271                if (numbytes < dag_record_size) {
     1272                        /* Check the message queue if we have one to check */
     1273                        if (t != NULL &&
     1274                            libtrace_message_queue_count(&t->messages) > 0)
     1275                                return -2;
     1276
    11161277                        if (libtrace_halt)
    11171278                                return 0;
     
    11191280                        continue;
    11201281                }
    1121                 erfptr = dag_get_record(libtrace);
     1282                erfptr = dag_get_record(stream_data);
    11221283        } while (erfptr == NULL);
    11231284
     1285        packet->trace = libtrace;
    11241286        /* Prepare the libtrace packet */
    1125         if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
    1126                                 flags))
    1127                 return -1;
    1128 
    1129         /* Update the DUCK timer */
    1130         tv = trace_get_timeval(packet);
    1131         DUCK.last_pkt = tv.tv_sec;
    1132 
    1133         return packet->payload ? htons(erfptr->rlen) :
    1134                                 erf_get_framing_length(packet);
     1287        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
     1288                                    TRACE_RT_DATA_ERF, flags))
     1289                return -1;
     1290
     1291        /* Update the DUCK timer - don't re-order this check (false-sharing) */
     1292        if (stream_data == FORMAT_DATA_FIRST && DUCK.duck_freq != 0) {
     1293                tv = trace_get_timeval(packet);
     1294                DUCK.last_pkt = tv.tv_sec;
     1295        }
     1296
     1297        packet->error = packet->payload ? htons(erfptr->rlen) :
     1298                                          erf_get_framing_length(packet);
     1299
     1300        return packet->error;
     1301}
     1302
     1303static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1304{
     1305        return dag_read_packet_stream(libtrace, FORMAT_DATA_FIRST, NULL, packet);
     1306}
     1307
     1308static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
     1309                             libtrace_packet_t **packets, size_t nb_packets)
     1310{
     1311        int ret;
     1312        size_t read_packets = 0;
     1313        int numbytes = 0;
     1314
     1315        struct dag_per_stream_t *stream_data =
     1316                (struct dag_per_stream_t *)t->format_data;
     1317
     1318        /* Read as many packets as we can, but read atleast one packet */
     1319        do {
     1320                ret = dag_read_packet_stream(libtrace, stream_data, t,
     1321                                           packets[read_packets]);
     1322                if (ret < 0)
     1323                        return ret;
     1324
     1325                read_packets++;
     1326
     1327                /* Make sure we don't read too many packets..! */
     1328                if (read_packets >= nb_packets)
     1329                        break;
     1330
     1331                numbytes = dag_available(libtrace, stream_data);
     1332        } while (numbytes >= dag_record_size);
     1333
     1334        return read_packets;
    11351335}
    11361336
     
    11401340 */
    11411341static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
    1142                                         libtrace_packet_t *packet) {
    1143         libtrace_eventobj_t event = {0,0,0.0,0};
     1342                                           libtrace_packet_t *packet)
     1343{
     1344        libtrace_eventobj_t event = {0,0,0.0,0};
    11441345        dag_record_t *erfptr = NULL;
    11451346        int numbytes;
     
    11601361        }
    11611362       
    1162         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1163                         FORMAT_DATA->dagstream, 0, &minwait,
    1164                         &minwait) == -1)
    1165         {
     1363        if (dag_set_stream_poll(FORMAT_DATA->device->fd,
     1364                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
     1365                                &minwait) == -1) {
    11661366                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11671367                event.type = TRACE_EVENT_TERMINATE;
     
    11721372                erfptr = NULL;
    11731373                numbytes = 0;
    1174        
     1374
    11751375                /* Need to call dag_available so that the top pointer will get
    11761376                 * updated, otherwise we'll never see any data! */
    1177                 numbytes = dag_available(libtrace);
    1178 
    1179                 /* May as well not bother calling dag_get_record if 
     1377                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
     1378
     1379                /* May as well not bother calling dag_get_record if
    11801380                 * dag_available suggests that there's no data */
    11811381                if (numbytes != 0)
    1182                         erfptr = dag_get_record(libtrace);
     1382                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
    11831383                if (erfptr == NULL) {
    11841384                        /* No packet available - sleep for a very short time */
    11851385                        if (libtrace_halt) {
    11861386                                event.type = TRACE_EVENT_TERMINATE;
    1187                         } else {                       
     1387                        } else {
    11881388                                event.type = TRACE_EVENT_SLEEP;
    11891389                                event.seconds = 0.0001;
     
    11911391                        break;
    11921392                }
    1193                 if (dag_prepare_packet(libtrace, packet, erfptr,
    1194                                         TRACE_RT_DATA_ERF, flags)) {
     1393                if (dag_prepare_packet_stream(libtrace, FORMAT_DATA_FIRST, packet,
     1394                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
    11951395                        event.type = TRACE_EVENT_TERMINATE;
    11961396                        break;
     
    11981398
    11991399
    1200                 event.size = trace_get_capture_length(packet) + 
    1201                                 trace_get_framing_length(packet);
    1202                
     1400                event.size = trace_get_capture_length(packet) +
     1401                        trace_get_framing_length(packet);
     1402
    12031403                /* XXX trace_read_packet() normally applies the following
    12041404                 * config options for us, but this function is called via
     
    12061406
    12071407                if (libtrace->filter) {
    1208                         int filtret = trace_apply_filter(libtrace->filter, 
    1209                                         packet);
     1408                        int filtret = trace_apply_filter(libtrace->filter,
     1409                                                         packet);
    12101410                        if (filtret == -1) {
    12111411                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
    1212                                                 "Bad BPF Filter");
     1412                                              "Bad BPF Filter");
    12131413                                event.type = TRACE_EVENT_TERMINATE;
    12141414                                break;
     
    12211421                                 * a sleep event in this case, like we used to
    12221422                                 * do! */
    1223                                 libtrace->filtered_packets ++;
     1423                                libtrace->filtered_packets ++;
    12241424                                trace_clear_cache(packet);
    12251425                                continue;
    12261426                        }
    1227                                
     1427
    12281428                        event.type = TRACE_EVENT_PACKET;
    12291429                } else {
     
    12381438                        trace_set_capture_length(packet, libtrace->snaplen);
    12391439                }
    1240                 libtrace->accepted_packets ++;
     1440                libtrace->accepted_packets ++;
    12411441                break;
    1242         } while (1);
     1442        } while(1);
    12431443
    12441444        return event;
    12451445}
    12461446
    1247 /* Gets the number of dropped packets */
    1248 static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
    1249         if (trace->format_data == NULL)
    1250                 return (uint64_t)-1;
    1251         return DATA(trace)->drops;
     1447static void dag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat)
     1448{
     1449        libtrace_list_node_t *tmp;
     1450        assert(stat && libtrace);
     1451        tmp = FORMAT_DATA_HEAD;
     1452
     1453        /* Dropped packets */
     1454        stat->dropped_valid = 1;
     1455        stat->dropped = 0;
     1456        while (tmp != NULL) {
     1457                stat->dropped += STREAM_DATA(tmp)->drops;
     1458                tmp = tmp->next;
     1459        }
     1460
     1461}
     1462
     1463static void dag_get_thread_statisitics(libtrace_t *libtrace, libtrace_thread_t *t,
     1464                                       libtrace_stat_t *stat) {
     1465        struct dag_per_stream_t *stream_data = t->format_data;
     1466        assert(stat && libtrace);
     1467
     1468        stat->dropped_valid = 1;
     1469        stat->dropped = stream_data->drops;
     1470
     1471        stat->filtered_valid = 1;
     1472        stat->filtered = 0;
    12521473}
    12531474
    12541475/* Prints some semi-useful help text about the DAG format module */
    12551476static void dag_help(void) {
    1256         printf("dag format module: $Revision: 1755 $\n");
    1257         printf("Supported input URIs:\n");
    1258         printf("\tdag:/dev/dagn\n");
    1259         printf("\n");
    1260         printf("\te.g.: dag:/dev/dag0\n");
    1261         printf("\n");
    1262         printf("Supported output URIs:\n");
    1263         printf("\tnone\n");
    1264         printf("\n");
     1477        printf("dag format module: $Revision: 1755 $\n");
     1478        printf("Supported input URIs:\n");
     1479        printf("\tdag:/dev/dagn\n");
     1480        printf("\n");
     1481        printf("\te.g.: dag:/dev/dag0\n");
     1482        printf("\n");
     1483        printf("Supported output URIs:\n");
     1484        printf("\tnone\n");
     1485        printf("\n");
     1486}
     1487
     1488static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     1489                                bool reader)
     1490{
     1491        struct dag_per_stream_t *stream_data;
     1492        libtrace_list_node_t *node;
     1493
     1494        if (reader && t->type == THREAD_PERPKT) {
     1495                fprintf(stderr, "t%u: registered reader thread", t->perpkt_num);
     1496                node = libtrace_list_get_index(FORMAT_DATA->per_stream,
     1497                                                t->perpkt_num);
     1498                if (node == NULL) {
     1499                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     1500                                      "Too few streams supplied for the"
     1501                                      " number of threads lanuched");
     1502                        return -1;
     1503                }
     1504                stream_data = node->data;
     1505
     1506                /* Pass the per thread data to the thread */
     1507                t->format_data = stream_data;
     1508
     1509                /* Attach and start the DAG stream */
     1510                printf("t%u: starting and attaching stream #%u\n",
     1511                       t->perpkt_num, stream_data->dagstream);
     1512                if (dag_start_input_stream(libtrace, stream_data) < 0)
     1513                        return -1;
     1514        }
     1515
     1516        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
     1517
     1518        return 0;
    12651519}
    12661520
    12671521static struct libtrace_format_t dag = {
    1268         "dag",
    1269         "$Id$",
    1270         TRACE_FORMAT_ERF,
     1522        "dag",
     1523        "$Id$",
     1524        TRACE_FORMAT_ERF,
    12711525        dag_probe_filename,             /* probe filename */
    12721526        NULL,                           /* probe magic */
    1273         dag_init_input,                 /* init_input */
    1274         dag_config_input,               /* config_input */
    1275         dag_start_input,                /* start_input */
    1276         dag_pause_input,                /* pause_input */
     1527        dag_init_input,                 /* init_input */
     1528        dag_config_input,               /* config_input */
     1529        dag_start_input,                /* start_input */
     1530        dag_pause_input,                /* pause_input */
    12771531        dag_init_output,                /* init_output */
    1278         NULL,                           /* config_output */
     1532        NULL,                           /* config_output */
    12791533        dag_start_output,               /* start_output */
    1280         dag_fin_input,                  /* fin_input */
     1534        dag_fin_input,                  /* fin_input */
    12811535        dag_fin_output,                 /* fin_output */
    1282         dag_read_packet,                /* read_packet */
    1283         dag_prepare_packet,             /* prepare_packet */
     1536        dag_read_packet,                /* read_packet */
     1537        dag_prepare_packet,             /* prepare_packet */
    12841538        NULL,                           /* fin_packet */
    12851539        dag_write_packet,               /* write_packet */
    1286         erf_get_link_type,              /* get_link_type */
    1287         erf_get_direction,              /* get_direction */
    1288         erf_set_direction,              /* set_direction */
    1289         erf_get_erf_timestamp,          /* get_erf_timestamp */
    1290         NULL,                           /* get_timeval */
    1291         NULL,                           /* get_seconds */
     1540        erf_get_link_type,              /* get_link_type */
     1541        erf_get_direction,              /* get_direction */
     1542        erf_set_direction,              /* set_direction */
     1543        erf_get_erf_timestamp,          /* get_erf_timestamp */
     1544        NULL,                           /* get_timeval */
     1545        NULL,                           /* get_seconds */
    12921546        NULL,                           /* get_timespec */
    1293         NULL,                           /* seek_erf */
    1294         NULL,                           /* seek_timeval */
    1295         NULL,                           /* seek_seconds */
    1296         erf_get_capture_length,         /* get_capture_length */
    1297         erf_get_wire_length,            /* get_wire_length */
    1298         erf_get_framing_length,         /* get_framing_length */
    1299         erf_set_capture_length,         /* set_capture_length */
     1547        NULL,                           /* seek_erf */
     1548        NULL,                           /* seek_timeval */
     1549        NULL,                           /* seek_seconds */
     1550        erf_get_capture_length,         /* get_capture_length */
     1551        erf_get_wire_length,            /* get_wire_length */
     1552        erf_get_framing_length,         /* get_framing_length */
     1553        erf_set_capture_length,         /* set_capture_length */
    13001554        NULL,                           /* get_received_packets */
    13011555        NULL,                           /* get_filtered_packets */
    1302         dag_get_dropped_packets,        /* get_dropped_packets */
    1303         NULL,                           /* get_captured_packets */
    1304         NULL,                           /* get_fd */
    1305         trace_event_dag,                /* trace_event */
    1306         dag_help,                       /* help */
    1307         NULL                            /* next pointer */
     1556        NULL,                           /* get_dropped_packets */
     1557        dag_get_statistics,             /* get_statistics */
     1558        NULL,                           /* get_fd */
     1559        trace_event_dag,                /* trace_event */
     1560        dag_help,                       /* help */
     1561        NULL,                            /* next pointer */
     1562        {true, 0}, /* live packet capture, thread limit TBD */
     1563        dag_pstart_input,
     1564        dag_pread_packets,
     1565        dag_pause_input,
     1566        NULL,
     1567        dag_pregister_thread,
     1568        NULL,
     1569        dag_get_thread_statisitics      /* get thread stats */
    13081570};
    13091571
    1310 void dag_constructor(void) {
     1572void dag_constructor(void)
     1573{
    13111574        register_format(&dag);
    13121575}
  • lib/format_dpdk.c

    rb585975 r773d5e2  
    33 * This file is part of libtrace
    44 *
    5  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 
     5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
    66 * New Zealand.
    77 *
    8  * Author: Richard Sanger 
    9  *         
     8 * Author: Richard Sanger
     9 *
    1010 * All rights reserved.
    1111 *
    12  * This code has been developed by the University of Waikato WAND 
     12 * This code has been developed by the University of Waikato WAND
    1313 * research group. For further information please see http://www.wand.net.nz/
    1414 *
     
    3636 * Intel Data Plane Development Kit is a LIVE capture format.
    3737 *
    38  * This format also supports writing which will write packets out to the 
    39  * network as a form of packet replay. This should not be confused with the 
    40  * RT protocol which is intended to transfer captured packet records between 
     38 * This format also supports writing which will write packets out to the
     39 * network as a form of packet replay. This should not be confused with the
     40 * RT protocol which is intended to transfer captured packet records between
    4141 * RT-speaking programs.
    4242 */
     43
     44#define _GNU_SOURCE
    4345
    4446#include "config.h"
     
    4749#include "format_helper.h"
    4850#include "libtrace_arphrd.h"
     51#include "hash_toeplitz.h"
    4952
    5053#ifdef HAVE_INTTYPES_H
     
    5962#include <endian.h>
    6063#include <string.h>
     64
     65#if HAVE_LIBNUMA
     66#include <numa.h>
     67#endif
    6168
    6269/* We can deal with any minor differences by checking the RTE VERSION
     
    151158#include <rte_mempool.h>
    152159#include <rte_mbuf.h>
    153 
    154 /* The default size of memory buffers to use - This is the max size of standard
     160#include <rte_launch.h>
     161#include <rte_lcore.h>
     162#include <rte_per_lcore.h>
     163#include <rte_cycles.h>
     164#include <pthread.h>
     165#ifdef __FreeBSD__
     166#include <pthread_np.h>
     167#endif
     168
     169
     170/* The default size of memory buffers to use - This is the max size of standard
    155171 * ethernet packet less the size of the MAC CHECKSUM */
    156172#define RX_MBUF_SIZE 1514
    157173
    158 /* The minimum number of memory buffers per queue tx or rx. Search for 
     174/* The minimum number of memory buffers per queue tx or rx. Search for
    159175 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
    160176 */
     
    174190#define NB_TX_MBUF 1024
    175191
    176 /* The size of the PCI blacklist needs to be big enough to contain 
     192/* The size of the PCI blacklist needs to be big enough to contain
    177193 * every PCI device address (listed by lspci every bus:device.function tuple).
    178194 */
     
    181197/* The maximum number of characters the mempool name can be */
    182198#define MEMPOOL_NAME_LEN 20
     199
     200/* For single threaded libtrace we read packets as a batch/burst
     201 * this is the maximum size of said burst */
     202#define BURST_SIZE 50
    183203
    184204#define MBUF(x) ((struct rte_mbuf *) x)
     
    186206#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    187207#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     208#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     209
     210#define FORMAT_DATA_HEAD(x) FORMAT(x)->per_stream->head
     211#define FORMAT_DATA_FIRST(x) ((dpdk_per_stream_t *)FORMAT_DATA_HEAD(x)->data)
     212
    188213#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    189                         (uint64_t) tv.tv_usec*1000ull)
     214                        (uint64_t) tv.tv_usec*1000ull)
    190215#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
    191                         (uint64_t) ts.tv_nsec)
     216                        (uint64_t) ts.tv_nsec)
    192217
    193218#if RTE_PKTMBUF_HEADROOM != 128
    194219#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
    195         "any libtrace instance processing these packet must be have the" \
    196         "same RTE_PKTMBUF_HEADROOM set"
     220        "any libtrace instance processing these packet must be have the" \
     221        "same RTE_PKTMBUF_HEADROOM set"
    197222#endif
    198223
    199224/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    200  * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 
    201  * 
     225 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
     226 *
    202227 * Make sure you understand what these are doing before enabling them.
    203228 * They might make traces incompatable with other builds etc.
    204  * 
     229 *
    205230 * These are also included to show how to do somethings which aren't
    206231 * obvious in the DPDK documentation.
    207232 */
    208233
    209 /* Print verbose messages to stdout */
     234/* Print verbose messages to stderr */
    210235#define DEBUG 0
    211236
    212 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 
    213  * only turn on if you know clock_gettime is a vsyscall on your system 
     237/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     238 * only turn on if you know clock_gettime is a vsyscall on your system
    214239 * overwise could be a large overhead. Again gettimeofday() should be
    215240 * vsyscall also if it's not you should seriously consider updating your
    216241 * kernel.
    217242 */
    218 #ifdef HAVE_LIBRT
     243#ifdef HAVE_CLOCK_GETTIME
    219244/* You can turn this on (set to 1) to prefer clock_gettime */
    220 #define USE_CLOCK_GETTIME 0
     245#define USE_CLOCK_GETTIME 1
    221246#else
    222 /* DONT CHANGE THIS !!! */
     247/* DON'T CHANGE THIS !!! */
    223248#define USE_CLOCK_GETTIME 0
    224249#endif
     
    229254 * hence writing out a port such as int: ring: and dpdk: assumes there
    230255 * is no checksum and will attempt to write the checksum as part of the
    231  * packet 
     256 * packet
    232257 */
    233258#define GET_MAC_CRC_CHECKSUM 0
    234259
    235260/* This requires a modification of the pmd drivers (inside Intel DPDK)
     261 * TODO this requires updating (packet sizes are wrong TS most likely also)
    236262 */
    237263#define HAS_HW_TIMESTAMPS_82580 0
     
    244270#endif
    245271
     272static pthread_mutex_t dpdk_lock = PTHREAD_MUTEX_INITIALIZER;
     273/* Memory pools Per NUMA node */
     274static struct rte_mempool * mem_pools[4][RTE_MAX_LCORE] = {{0}};
     275
    246276/* As per Intel 82580 specification - mismatch in 82580 datasheet
    247277 * it states ts is stored in Big Endian, however its actually Little */
    248278struct hw_timestamp_82580 {
    249     uint64_t reserved;
    250     uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
     279        uint64_t reserved;
     280        uint64_t timestamp; /* Little Endian only lower 40 bits are valid */
    251281};
    252282
    253283enum paused_state {
    254     DPDK_NEVER_STARTED,
    255     DPDK_RUNNING,
    256     DPDK_PAUSED,
     284        DPDK_NEVER_STARTED,
     285        DPDK_RUNNING,
     286        DPDK_PAUSED,
    257287};
     288
     289struct dpdk_per_stream_t
     290{
     291        uint16_t queue_id;
     292        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
     293        struct rte_mempool *mempool;
     294        int lcore;
     295#if HAS_HW_TIMESTAMPS_82580
     296        /* Timestamping only relevent to RX */
     297        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
     298        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
     299#endif
     300} ALIGN_STRUCT(CACHE_LINE_SIZE);
     301
     302#if HAS_HW_TIMESTAMPS_82580
     303#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1, 0, 0}
     304#else
     305#define DPDK_EMPTY_STREAM {-1, 0, NULL, -1}
     306#endif
     307
     308typedef struct dpdk_per_stream_t dpdk_per_stream_t;
    258309
    259310/* Used by both input and output however some fields are not used
    260311 * for output */
    261312struct dpdk_format_data_t {
    262     int8_t promisc; /* promiscuous mode - RX only */
    263     uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
    264     uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
    265     uint8_t paused; /* See paused_state */
    266     uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
    267     int snaplen; /* The snap length for the capture - RX only */
    268     /* We always have to setup both rx and tx queues even if we don't want them */
    269     int nb_rx_buf; /* The number of packet buffers in the rx ring */
    270     int nb_tx_buf; /* The number of packet buffers in the tx ring */
    271     struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
     313        int8_t promisc; /* promiscuous mode - RX only */
     314        uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
     315        uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
     316        uint8_t paused; /* See paused_state */
     317        uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
     318        int snaplen; /* The snap length for the capture - RX only */
     319        /* We always have to setup both rx and tx queues even if we don't want them */
     320        int nb_rx_buf; /* The number of packet buffers in the rx ring */
     321        int nb_tx_buf; /* The number of packet buffers in the tx ring */
     322        int nic_numa_node; /* The NUMA node that the NIC is attached to */
     323        struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
    272324#if DPDK_USE_BLACKLIST
    273     struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
     325        struct rte_pci_addr blacklist[BLACK_LIST_SIZE]; /* Holds our device blacklist */
    274326        unsigned int nb_blacklist; /* Number of blacklist items in are valid */
    275327#endif
    276     char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    277 #if HAS_HW_TIMESTAMPS_82580
    278     /* Timestamping only relevent to RX */
    279     uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
    280     uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
    281     uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    282 #endif
     328        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
     329        uint8_t rss_key[40]; // This is the RSS KEY
     330        /* To improve single-threaded performance we always batch reading
     331         * packets, in a burst, otherwise the parallel library does this for us */
     332        struct rte_mbuf* burst_pkts[BURST_SIZE];
     333        int burst_size; /* The total number read in the burst */
     334        int burst_offset; /* The offset we are into the burst */
     335
     336        /* Our parallel streams */
     337        libtrace_list_t *per_stream;
    283338};
    284339
    285340enum dpdk_addt_hdr_flags {
    286     INCLUDES_CHECKSUM = 0x1,
    287     INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
     341        INCLUDES_CHECKSUM = 0x1,
     342        INCLUDES_HW_TIMESTAMP = 0x2, /* Used with 82580 driver */
    288343};
    289344
    290 /** 
     345/**
    291346 * A structure placed in front of the packet where we can store
    292347 * additional information about the given packet.
     
    294349 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
    295350 * +--------------------------+
    296  * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
    297  * +--------------------------+
    298351 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
    299352 * +--------------------------+
    300  * |   sizeof(dpdk_addt_hdr)  | 1 byte
    301  * +--------------------------+ 
     353 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
     354 * +--------------------------+
    302355 * *   hw_timestamp_82580     * 16 bytes Optional
    303356 * +--------------------------+
     
    306359 */
    307360struct dpdk_addt_hdr {
    308     uint64_t timestamp;
    309     uint8_t flags;
    310     uint8_t direction;
    311     uint8_t reserved1;
    312     uint8_t reserved2;
    313     uint32_t cap_len; /* The size to say the capture is */
     361        uint64_t timestamp;
     362        uint8_t flags;
     363        uint8_t direction;
     364        uint8_t reserved1;
     365        uint8_t reserved2;
     366        uint32_t cap_len; /* The size to say the capture is */
    314367};
    315368
     
    317370 * We want to blacklist all devices except those on the whitelist
    318371 * (I say list, but yes it is only the one).
    319  * 
     372 *
    320373 * The default behaviour of rte_pci_probe() will map every possible device
    321374 * to its DPDK driver. The DPDK driver will take the ethernet device
    322375 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
    323  * 
    324  * So blacklist all devices except the one that we wish to use so that 
     376 *
     377 * So blacklist all devices except the one that we wish to use so that
    325378 * the others can still be used as standard ethernet ports.
    326379 *
     
    336389
    337390        TAILQ_FOREACH(dev, &device_list, next) {
    338         if (whitelist != NULL && whitelist->domain == dev->addr.domain
    339             && whitelist->bus == dev->addr.bus
    340             && whitelist->devid == dev->addr.devid
    341             && whitelist->function == dev->addr.function)
    342             continue;
     391        if (whitelist != NULL && whitelist->domain == dev->addr.domain
     392            && whitelist->bus == dev->addr.bus
     393            && whitelist->devid == dev->addr.devid
     394            && whitelist->function == dev->addr.function)
     395            continue;
    343396                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
    344                                 / sizeof (format_data->blacklist[0])) {
    345                         printf("Warning: too many devices to blacklist consider"
    346                                         " increasing BLACK_LIST_SIZE");
     397                                / sizeof (format_data->blacklist[0])) {
     398                        fprintf(stderr, "Warning: too many devices to blacklist consider"
     399                                        " increasing BLACK_LIST_SIZE");
    347400                        break;
    348401                }
     
    360413        char pci_str[20] = {0};
    361414        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
    362                 whitelist->domain,
    363                 whitelist->bus,
    364                 whitelist->devid,
    365                 whitelist->function);
     415                whitelist->domain,
     416                whitelist->bus,
     417                whitelist->devid,
     418                whitelist->function);
    366419        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
    367420                return -1;
     
    375428 * Fills in addr, note core is optional and is unchanged if
    376429 * a value for it is not provided.
    377  * 
     430 *
    378431 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
    379  * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 
     432 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
    380433 */
    381434static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
    382     int matches;
    383     assert(str);
    384     matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
    385                      &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
    386     if (matches >= 4) {
    387         return 0;
    388     } else {
    389         return -1;
    390     }
     435        int matches;
     436        assert(str);
     437        matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld",
     438                         &addr->domain, &addr->bus, &addr->devid,
     439                         &addr->function, core);
     440        if (matches >= 4) {
     441                return 0;
     442        } else {
     443                return -1;
     444        }
     445}
     446
     447/**
     448 * Convert a pci address to the numa node it is
     449 * connected to.
     450 *
     451 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
     452 * so we can call it before DPDK
     453 *
     454 * @return -1 if unknown otherwise a number 0 or higher of the numa node
     455 */
     456static int pci_to_numa(struct rte_pci_addr * dev_addr) {
     457        char path[50] = {0};
     458        FILE *file;
     459
     460        /* Read from the system */
     461        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
     462                 dev_addr->domain,
     463                 dev_addr->bus,
     464                 dev_addr->devid,
     465                 dev_addr->function);
     466
     467        if((file = fopen(path, "r")) != NULL) {
     468                int numa_node = -1;
     469                fscanf(file, "%d", &numa_node);
     470                fclose(file);
     471                return numa_node;
     472        }
     473        return -1;
    391474}
    392475
     
    395478static inline void dump_configuration()
    396479{
    397     struct rte_config * global_config;
    398     long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    399 
    400     if (nb_cpu <= 0) {
    401         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    402         nb_cpu = 1; /* fallback to just 1 core */
    403     }
    404     if (nb_cpu > RTE_MAX_LCORE)
    405         nb_cpu = RTE_MAX_LCORE;
    406 
    407     global_config = rte_eal_get_configuration();
    408 
    409     if (global_config != NULL) {
    410         int i;
    411         fprintf(stderr, "Intel DPDK setup\n"
    412                "---Version      : %s\n"
    413                "---Master LCore : %"PRIu32"\n"
    414                "---LCore Count  : %"PRIu32"\n",
    415                rte_version(),
    416                global_config->master_lcore, global_config->lcore_count);
    417 
    418         for (i = 0 ; i < nb_cpu; i++) {
    419             fprintf(stderr, "   ---Core %d : %s\n", i,
    420                    global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
    421         }
    422 
    423         const char * proc_type;
    424         switch (global_config->process_type) {
    425             case RTE_PROC_AUTO:
    426                 proc_type = "auto";
    427                 break;
    428             case RTE_PROC_PRIMARY:
    429                 proc_type = "primary";
    430                 break;
    431             case RTE_PROC_SECONDARY:
    432                 proc_type = "secondary";
    433                 break;
    434             case RTE_PROC_INVALID:
    435                 proc_type = "invalid";
    436                 break;
    437             default:
    438                 proc_type = "something worse than invalid!!";
    439         }
    440         fprintf(stderr, "---Process Type : %s\n", proc_type);
    441     }
    442 
    443 }
    444 #endif
     480        struct rte_config * global_config;
     481        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     482
     483        if (nb_cpu <= 0) {
     484                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
     485                       " Falling back to the first core.");
     486                nb_cpu = 1; /* fallback to just 1 core */
     487        }
     488        if (nb_cpu > RTE_MAX_LCORE)
     489                nb_cpu = RTE_MAX_LCORE;
     490
     491        global_config = rte_eal_get_configuration();
     492
     493        if (global_config != NULL) {
     494                int i;
     495                fprintf(stderr, "Intel DPDK setup\n"
     496                        "---Version      : %s\n"
     497                        "---Master LCore : %"PRIu32"\n"
     498                        "---LCore Count  : %"PRIu32"\n",
     499                        rte_version(),
     500                        global_config->master_lcore, global_config->lcore_count);
     501
     502                for (i = 0 ; i < nb_cpu; i++) {
     503                        fprintf(stderr, "   ---Core %d : %s\n", i,
     504                                global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
     505                }
     506
     507                const char * proc_type;
     508                switch (global_config->process_type) {
     509                case RTE_PROC_AUTO:
     510                        proc_type = "auto";
     511                        break;
     512                case RTE_PROC_PRIMARY:
     513                        proc_type = "primary";
     514                        break;
     515                case RTE_PROC_SECONDARY:
     516                        proc_type = "secondary";
     517                        break;
     518                case RTE_PROC_INVALID:
     519                        proc_type = "invalid";
     520                        break;
     521                default:
     522                        proc_type = "something worse than invalid!!";
     523                }
     524                fprintf(stderr, "---Process Type : %s\n", proc_type);
     525        }
     526
     527}
     528#endif
     529
     530/**
     531 * Expects to be called from the master lcore and moves it to the given dpdk id
     532 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     533 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     534 *               and not already in use.
     535 * @return 0 is successful otherwise -1 on error.
     536 */
     537static inline int dpdk_move_master_lcore(libtrace_t *libtrace, size_t core) {
     538        struct rte_config *cfg = rte_eal_get_configuration();
     539        cpu_set_t cpuset;
     540        int i;
     541
     542        assert (core < RTE_MAX_LCORE);
     543        assert (rte_get_master_lcore() == rte_lcore_id());
     544
     545        if (core == rte_lcore_id())
     546                return 0;
     547
     548        /* Make sure we are not overwriting someone else */
     549        assert(!rte_lcore_is_enabled(core));
     550
     551        /* Move the core */
     552        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     553        cfg->lcore_role[core] = ROLE_RTE;
     554        lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     555        rte_eal_get_configuration()->master_lcore = core;
     556        RTE_PER_LCORE(_lcore_id) = core;
     557
     558        /* Now change the affinity, either mapped to a single core or all accepted */
     559        CPU_ZERO(&cpuset);
     560
     561        if (lcore_config[core].detected) {
     562                CPU_SET(core, &cpuset);
     563        } else {
     564                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     565                        if (lcore_config[i].detected)
     566                                CPU_SET(i, &cpuset);
     567                }
     568        }
     569
     570        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     571        if (i != 0) {
     572                trace_set_err(libtrace, errno, "pthread_setaffinity_np failed\n");
     573                return -1;
     574        }
     575        return 0;
     576}
    445577
    446578/**
     
    474606static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
    475607                                        char * err, int errlen) {
    476     int ret; /* Returned error codes */
    477     struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
    478     char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
    479     char mem_map[20] = {0}; /* The memory name */
    480     long nb_cpu; /* The number of CPUs in the system */
    481     long my_cpu; /* The CPU number we want to bind to */
     608        int ret; /* Returned error codes */
     609        struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
     610        char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
     611        char mem_map[20] = {0}; /* The memory name */
     612        long nb_cpu; /* The number of CPUs in the system */
     613        long my_cpu; /* The CPU number we want to bind to */
     614        int i;
     615        struct rte_config *cfg = rte_eal_get_configuration();
    482616        struct saved_getopts save_opts;
    483    
    484 #if DEBUG
    485     rte_set_log_level(RTE_LOG_DEBUG);
    486 #else
    487     rte_set_log_level(RTE_LOG_WARNING);
    488 #endif
    489     /*
    490      * Using unique file prefixes mean separate memory is used, unlinking
    491      * the two processes. However be careful we still cannot access a
    492      * port that already in use.
    493      */
    494     char* argv[] = {"libtrace",
    495                     "-c", cpu_number,
    496                     "-n", "1",
    497                     "--proc-type", "auto",
    498                     "--file-prefix", mem_map,
    499                     "-m", "256",
     617
     618        /* This initialises the Environment Abstraction Layer (EAL)
     619         * If we had slave workers these are put into WAITING state
     620         *
     621         * Basically binds this thread to a fixed core, which we choose as
     622         * the last core on the machine (assuming fewer interrupts mapped here).
     623         * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
     624         * "-n" the number of memory channels into the CPU (hardware specific)
     625         *      - Most likely to be half the number of ram slots in your machine.
     626         *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
     627         * Controls where in memory packets are stored such that they are spread
     628         * across the channels. We just use 1 to be safe.
     629         *
     630         * Using unique file prefixes mean separate memory is used, unlinking
     631         * the two processes. However be careful we still cannot access a
     632         * port that already in use.
     633         */
     634        char* argv[] = {"libtrace",
     635                        "-c", cpu_number,
     636                        "-n", "1",
     637                        "--proc-type", "auto",
     638                        "--file-prefix", mem_map,
     639                        "-m", "512",
    500640#if DPDK_USE_LOG_LEVEL
    501641#       if DEBUG
    502                     "--log-level", "8", /* RTE_LOG_DEBUG */
     642                        "--log-level", "8", /* RTE_LOG_DEBUG */
    503643#       else
    504                     "--log-level", "5", /* RTE_LOG_WARNING */
     644                        "--log-level", "5", /* RTE_LOG_WARNING */
    505645#       endif
    506646#endif
    507                     NULL};
    508     int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    509 
    510     /* This initialises the Environment Abstraction Layer (EAL)
    511      * If we had slave workers these are put into WAITING state
    512      *
    513      * Basically binds this thread to a fixed core, which we choose as
    514      * the last core on the machine (assuming fewer interrupts mapped here).
    515      * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
    516      * "-n" the number of memory channels into the CPU (hardware specific)
    517      *      - Most likely to be half the number of ram slots in your machine.
    518      *        We could count ram slots by "dmidecode -t 17 | grep -c 'Size:'"
    519      * Controls where in memory packets are stored and should spread across
    520      * the channels. We just use 1 to be safe.
    521      */
    522 
    523     /* Get the number of cpu cores in the system and use the last core */
    524     nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    525     if (nb_cpu <= 0) {
    526         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    527         nb_cpu = 1; /* fallback to the first core */
    528     }
    529     if (nb_cpu > RTE_MAX_LCORE)
    530         nb_cpu = RTE_MAX_LCORE;
    531 
    532     my_cpu = nb_cpu;
    533     /* This allows the user to specify the core - we would try to do this
    534      * automatically but it's hard to tell that this is secondary
    535      * before running rte_eal_init(...). Currently we are limited to 1
    536      * instance per core due to the way memory is allocated. */
    537     if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
    538         snprintf(err, errlen, "Failed to parse URI");
    539         return -1;
    540     }
    541 
    542     snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
    543                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
    544 
    545     if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
    546         snprintf(err, errlen,
    547           "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
    548           " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
    549         return -1;
    550     }
    551 
    552     /* Make our mask */
    553     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     647                        NULL};
     648        int argc = sizeof(argv) / sizeof(argv[0]) - 1;
     649
     650#if DEBUG
     651        rte_set_log_level(RTE_LOG_DEBUG);
     652#else
     653        rte_set_log_level(RTE_LOG_WARNING);
     654#endif
     655
     656        /* Get the number of cpu cores in the system and use the last core
     657         * on the correct numa node */
     658        nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     659        if (nb_cpu <= 0) {
     660                perror("sysconf(_SC_NPROCESSORS_ONLN) failed."
     661                       " Falling back to the first core.");
     662                nb_cpu = 1; /* fallback to the first core */
     663        }
     664        if (nb_cpu > RTE_MAX_LCORE)
     665                nb_cpu = RTE_MAX_LCORE;
     666
     667        my_cpu = -1;
     668        /* This allows the user to specify the core - we would try to do this
     669         * automatically but it's hard to tell that this is secondary
     670         * before running rte_eal_init(...). Currently we are limited to 1
     671         * instance per core due to the way memory is allocated. */
     672        if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
     673                snprintf(err, errlen, "Failed to parse URI");
     674                return -1;
     675        }
     676
     677#if HAVE_LIBNUMA
     678        format_data->nic_numa_node = pci_to_numa(&use_addr);
     679        if (my_cpu < 0) {
     680                /* If we can assign to a core on the same numa node */
     681                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
     682                if(format_data->nic_numa_node >= 0) {
     683                        int max_node_cpu = -1;
     684                        struct bitmask *mask = numa_allocate_cpumask();
     685                        assert(mask);
     686                        numa_node_to_cpus(format_data->nic_numa_node, mask);
     687                        for (i = 0 ; i < nb_cpu; ++i) {
     688                                if (numa_bitmask_isbitset(mask,i))
     689                                        max_node_cpu = i+1;
     690                        }
     691                        my_cpu = max_node_cpu;
     692                }
     693        }
     694#endif
     695        if (my_cpu < 0) {
     696                my_cpu = nb_cpu;
     697        }
     698
     699
     700        snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
     701                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
     702
     703        if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
     704                snprintf(err, errlen,
     705                         "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
     706                         " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
     707                return -1;
     708        }
     709
     710        /* Make our mask with all cores turned on this is so that DPDK to
     711         * gets CPU info older versions */
     712        snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     713        //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    554714
    555715#if !DPDK_USE_BLACKLIST
    556     /* Black list all ports besides the one that we want to use */
    557     if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
    558         snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    559                  " are you sure the address is correct?: %s", strerror(-ret));
    560         return -1;
    561     }
     716        /* Black list all ports besides the one that we want to use */
     717        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
     718                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     719                         " are you sure the address is correct?: %s", strerror(-ret));
     720                return -1;
     721        }
    562722#endif
    563723
    564724        /* Give the memory map a unique name */
    565725        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
    566     /* rte_eal_init it makes a call to getopt so we need to reset the
    567     * global optind variable of getopt otherwise this fails */
     726        /* rte_eal_init it makes a call to getopt so we need to reset the
     727        * global optind variable of getopt otherwise this fails */
    568728        save_getopts(&save_opts);
    569     optind = 1;
    570     if ((ret = rte_eal_init(argc, argv)) < 0) {
    571         snprintf(err, errlen,
    572           "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
    573         return -1;
    574     }
     729        optind = 1;
     730        if ((ret = rte_eal_init(argc, argv)) < 0) {
     731                snprintf(err, errlen,
     732                         "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
     733                return -1;
     734        }
    575735        restore_getopts(&save_opts);
     736        // These are still running but will never do anything with DPDK v1.7 we
     737        // should remove this XXX in the future
     738        for(i = 0; i < RTE_MAX_LCORE; ++i) {
     739                if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
     740                        cfg->lcore_role[i] = ROLE_OFF;
     741                        cfg->lcore_count--;
     742                }
     743        }
     744        // Only the master should be running
     745        assert(cfg->lcore_count == 1);
     746
     747        // TODO XXX TODO
     748        dpdk_move_master_lcore(NULL, my_cpu-1);
    576749
    577750#if DEBUG
    578     dump_configuration();
     751        dump_configuration();
    579752#endif
    580753
    581754#if DPDK_USE_PMD_INIT
    582     /* This registers all available NICs with Intel DPDK
    583     * These are not loaded until rte_eal_pci_probe() is called.
    584     */
    585     if ((ret = rte_pmd_init_all()) < 0) {
    586         snprintf(err, errlen,
    587           "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
    588         return -1;
    589     }
     755        /* This registers all available NICs with Intel DPDK
     756        * These are not loaded until rte_eal_pci_probe() is called.
     757        */
     758        if ((ret = rte_pmd_init_all()) < 0) {
     759                snprintf(err, errlen,
     760                         "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
     761                return -1;
     762        }
    590763#endif
    591764
    592765#if DPDK_USE_BLACKLIST
    593     /* Blacklist all ports besides the one that we want to use */
     766        /* Blacklist all ports besides the one that we want to use */
    594767        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
    595768                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     
    600773
    601774#if DPDK_USE_PCI_PROBE
    602     /* This loads DPDK drivers against all ports that are not blacklisted */
     775        /* This loads DPDK drivers against all ports that are not blacklisted */
    603776        if ((ret = rte_eal_pci_probe()) < 0) {
    604         snprintf(err, errlen,
    605             "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
    606         return -1;
    607     }
    608 #endif
    609 
    610     format_data->nb_ports = rte_eth_dev_count();
    611 
    612     if (format_data->nb_ports != 1) {
    613         snprintf(err, errlen,
    614             "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
    615             format_data->nb_ports);
    616         return -1;
    617     }
    618 
    619     return 0;
     777                snprintf(err, errlen,
     778                         "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
     779                return -1;
     780        }
     781#endif
     782
     783        format_data->nb_ports = rte_eth_dev_count();
     784
     785        if (format_data->nb_ports != 1) {
     786                snprintf(err, errlen,
     787                         "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
     788                         format_data->nb_ports);
     789                return -1;
     790        }
     791
     792        struct rte_eth_dev_info dev_info;
     793        rte_eth_dev_info_get(0, &dev_info);
     794        fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
     795                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
     796
     797        return 0;
    620798}
    621799
    622800static int dpdk_init_input (libtrace_t *libtrace) {
    623     char err[500];
    624     err[0] = 0;
    625    
    626     libtrace->format_data = (struct dpdk_format_data_t *)
    627                             malloc(sizeof(struct dpdk_format_data_t));
    628     FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    629     FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
    630     FORMAT(libtrace)->nb_ports = 0;
    631     FORMAT(libtrace)->snaplen = 0; /* Use default */
    632     FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
    633     FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
    634     FORMAT(libtrace)->promisc = -1;
    635     FORMAT(libtrace)->pktmbuf_pool = NULL;
     801        dpdk_per_stream_t stream = DPDK_EMPTY_STREAM;
     802        char err[500];
     803        err[0] = 0;
     804
     805        libtrace->format_data = (struct dpdk_format_data_t *)
     806                                malloc(sizeof(struct dpdk_format_data_t));
     807        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
     808        FORMAT(libtrace)->nb_ports = 0;
     809        FORMAT(libtrace)->snaplen = 0; /* Use default */
     810        FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
     811        FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
     812        FORMAT(libtrace)->nic_numa_node = -1;
     813        FORMAT(libtrace)->promisc = -1;
     814        FORMAT(libtrace)->pktmbuf_pool = NULL;
    636815#if DPDK_USE_BLACKLIST
    637     FORMAT(libtrace)->nb_blacklist = 0;
    638 #endif
    639     FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    640     FORMAT(libtrace)->mempool_name[0] = 0;
    641 #if HAS_HW_TIMESTAMPS_82580
    642     FORMAT(libtrace)->ts_first_sys = 0;
    643     FORMAT(libtrace)->ts_last_sys = 0;
    644     FORMAT(libtrace)->wrap_count = 0;
    645 #endif
    646 
    647     if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    648         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    649         free(libtrace->format_data);
    650         libtrace->format_data = NULL;
    651         return -1;
    652     }
    653     return 0;
    654 };
     816        FORMAT(libtrace)->nb_blacklist = 0;
     817#endif
     818        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
     819        FORMAT(libtrace)->mempool_name[0] = 0;
     820        memset(FORMAT(libtrace)->burst_pkts, 0,
     821               sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     822        FORMAT(libtrace)->burst_size = 0;
     823        FORMAT(libtrace)->burst_offset = 0;
     824
     825        /* Make our first stream */
     826        FORMAT(libtrace)->per_stream = libtrace_list_init(sizeof(struct dpdk_per_stream_t));
     827        libtrace_list_push_back(FORMAT(libtrace)->per_stream, &stream);
     828
     829        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     830                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     831                free(libtrace->format_data);
     832                libtrace->format_data = NULL;
     833                return -1;
     834        }
     835        return 0;
     836}
    655837
    656838static int dpdk_init_output(libtrace_out_t *libtrace)
    657839{
    658     char err[500];
    659     err[0] = 0;
    660    
    661     libtrace->format_data = (struct dpdk_format_data_t *)
    662                             malloc(sizeof(struct dpdk_format_data_t));
    663     FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    664     FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
    665     FORMAT(libtrace)->nb_ports = 0;
    666     FORMAT(libtrace)->snaplen = 0; /* Use default */
    667     FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
    668     FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
    669     FORMAT(libtrace)->promisc = -1;
    670     FORMAT(libtrace)->pktmbuf_pool = NULL;
     840        char err[500];
     841        err[0] = 0;
     842
     843        libtrace->format_data = (struct dpdk_format_data_t *)
     844                                malloc(sizeof(struct dpdk_format_data_t));
     845        FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
     846        FORMAT(libtrace)->nb_ports = 0;
     847        FORMAT(libtrace)->snaplen = 0; /* Use default */
     848        FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
     849        FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
     850        FORMAT(libtrace)->nic_numa_node = -1;
     851        FORMAT(libtrace)->promisc = -1;
     852        FORMAT(libtrace)->pktmbuf_pool = NULL;
    671853#if DPDK_USE_BLACKLIST
    672     FORMAT(libtrace)->nb_blacklist = 0;
    673 #endif
    674     FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    675     FORMAT(libtrace)->mempool_name[0] = 0;
    676 #if HAS_HW_TIMESTAMPS_82580
    677     FORMAT(libtrace)->ts_first_sys = 0;
    678     FORMAT(libtrace)->ts_last_sys = 0;
    679     FORMAT(libtrace)->wrap_count = 0;
    680 #endif
    681 
    682     if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    683         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    684         free(libtrace->format_data);
    685         libtrace->format_data = NULL;
    686         return -1;
    687     }
    688     return 0;
    689 };
     854        FORMAT(libtrace)->nb_blacklist = 0;
     855#endif
     856        FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
     857        FORMAT(libtrace)->mempool_name[0] = 0;
     858        memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     859        FORMAT(libtrace)->burst_size = 0;
     860        FORMAT(libtrace)->burst_offset = 0;
     861
     862        if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     863                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     864                free(libtrace->format_data);
     865                libtrace->format_data = NULL;
     866                return -1;
     867        }
     868        return 0;
     869}
    690870
    691871/**
    692872 * Note here snaplen excludes the MAC checksum. Packets over
    693873 * the requested snaplen will be dropped. (Excluding MAC checksum)
    694  * 
     874 *
    695875 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
    696876 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
    697877 * is set the maximum size of the returned packet would be 1518 otherwise
    698878 * 1514 would be the largest size possibly returned.
    699  * 
     879 *
    700880 */
    701881static int dpdk_config_input (libtrace_t *libtrace,
    702                                         trace_option_t option,
    703                                         void *data) {
    704     switch (option) {
    705         case TRACE_OPTION_SNAPLEN:
    706             /* Only support changing snaplen before a call to start is
    707              * made */
    708             if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
    709                 FORMAT(libtrace)->snaplen=*(int*)data;
    710             else
    711                 return -1;
    712             return 0;
    713                 case TRACE_OPTION_PROMISC:
    714                         FORMAT(libtrace)->promisc=*(int*)data;
    715             return 0;
    716         case TRACE_OPTION_FILTER:
    717             /* TODO filtering */
    718             break;
    719         case TRACE_OPTION_META_FREQ:
    720             break;
    721         case TRACE_OPTION_EVENT_REALTIME:
    722             break;
    723         /* Avoid default: so that future options will cause a warning
    724          * here to remind us to implement it, or flag it as
    725          * unimplementable
    726          */
    727     }
     882                              trace_option_t option,
     883                              void *data) {
     884        switch (option) {
     885        case TRACE_OPTION_SNAPLEN:
     886                /* Only support changing snaplen before a call to start is
     887                 * made */
     888                if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
     889                        FORMAT(libtrace)->snaplen=*(int*)data;
     890                else
     891                        return -1;
     892                return 0;
     893        case TRACE_OPTION_PROMISC:
     894                FORMAT(libtrace)->promisc=*(int*)data;
     895                return 0;
     896        case TRACE_OPTION_HASHER:
     897                switch (*((enum hasher_types *) data))
     898                {
     899                case HASHER_BALANCE:
     900                case HASHER_UNIDIRECTIONAL:
     901                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     902                        return 0;
     903                case HASHER_BIDIRECTIONAL:
     904                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     905                        return 0;
     906                case HASHER_CUSTOM:
     907                        // We don't support these
     908                        return -1;
     909                }
     910                break;
     911        case TRACE_OPTION_FILTER:
     912                /* TODO filtering */
     913        case TRACE_OPTION_META_FREQ:
     914        case TRACE_OPTION_EVENT_REALTIME:
     915                break;
     916        /* Avoid default: so that future options will cause a warning
     917         * here to remind us to implement it, or flag it as
     918         * unimplementable
     919         */
     920        }
    728921
    729922        /* Don't set an error - trace_config will try to deal with the
    730923         * option and will set an error if it fails */
    731     return -1;
     924        return -1;
    732925}
    733926
    734927/* Can set jumbo frames/ or limit the size of a frame by setting both
    735928 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
    736  * 
     929 *
    737930 */
    738931static struct rte_eth_conf port_conf = {
    739932        .rxmode = {
     933                .mq_mode = ETH_RSS,
    740934                .split_hdr_size = 0,
    741935                .header_split   = 0, /**< Header Split disabled */
     
    743937                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
    744938                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
    745         .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
     939                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
    746940#if GET_MAC_CRC_CHECKSUM
    747941/* So it appears that if hw_strip_crc is turned off the driver will still
     
    756950 * always cut off the checksum in the future
    757951 */
    758         .hw_strip_crc   = 1, /**< CRC stripped by hardware */
     952                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
    759953#endif
    760954        },
     
    762956                .mq_mode = ETH_DCB_NONE,
    763957        },
     958        .rx_adv_conf = {
     959                .rss_conf = {
     960                        // .rss_key = &rss_key, // We set this per format
     961                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     962                },
     963        },
     964        .intr_conf = {
     965                .lsc = 1
     966        }
    764967};
    765968
     
    770973                .wthresh = 4,/* RX_WTHRESH writeback */
    771974        },
    772     .rx_free_thresh = 0,
    773     .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
     975        .rx_free_thresh = 0,
     976        .rx_drop_en = 0, /* Drop packets oldest packets if out of space */
    774977};
    775978
    776979static const struct rte_eth_txconf tx_conf = {
    777980        .tx_thresh = {
    778         /**
    779         * TX_PTHRESH prefetch
    780         * Set on the NIC, if the number of unprocessed descriptors to queued on
    781         * the card fall below this try grab at least hthresh more unprocessed
    782         * descriptors.
    783         */
     981                /*
     982                * TX_PTHRESH prefetch
     983                * Set on the NIC, if the number of unprocessed descriptors to queued on
     984                * the card fall below this try grab at least hthresh more unprocessed
     985                * descriptors.
     986                */
    784987                .pthresh = 36,
    785988
    786         /* TX_HTHRESH host
    787         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
    788         */
     989                /* TX_HTHRESH host
     990                * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
     991                */
    789992                .hthresh = 0,
    790        
    791         /* TX_WTHRESH writeback
    792         * Set on the NIC, the number of sent descriptors before writing back
    793         * status to confirm the transmission. This is done more efficiently as
    794         * a bulk DMA-transfer rather than writing one at a time.
    795         * Similar to tx_free_thresh however this is applied to the NIC, where
    796         * as tx_free_thresh is when DPDK will check these. This is extended
    797         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
    798         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
    799         */
     993
     994                /* TX_WTHRESH writeback
     995                * Set on the NIC, the number of sent descriptors before writing back
     996                * status to confirm the transmission. This is done more efficiently as
     997                * a bulk DMA-transfer rather than writing one at a time.
     998                * Similar to tx_free_thresh however this is applied to the NIC, where
     999                * as tx_free_thresh is when DPDK will check these. This is extended
     1000                * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
     1001                * descriptors rather only every n'th item, reducing DMA memory bandwidth.
     1002                */
    8001003                .wthresh = 4,
    8011004        },
    8021005
    803     /* Used internally by DPDK rather than passed to the NIC. The number of
    804     * packet descriptors to send before checking for any responses written
    805     * back (to confirm the transmission). Default = 32 if set to 0)
    806     */
     1006        /* Used internally by DPDK rather than passed to the NIC. The number of
     1007        * packet descriptors to send before checking for any responses written
     1008        * back (to confirm the transmission). Default = 32 if set to 0)
     1009        */
    8071010        .tx_free_thresh = 0,
    8081011
    809     /* This is the Report Status threshold, used by 10Gbit cards,
    810      * This signals the card to only write back status (such as
    811     * transmission successful) after this minimum number of transmit
    812     * descriptors are seen. The default is 32 (if set to 0) however if set
    813     * to greater than 1 TX wthresh must be set to zero, because this is kindof
    814     * a replacement. See the dpdk programmers guide for more restrictions.
    815     */
     1012        /* This is the Report Status threshold, used by 10Gbit cards,
     1013         * This signals the card to only write back status (such as
     1014        * transmission successful) after this minimum number of transmit
     1015        * descriptors are seen. The default is 32 (if set to 0) however if set
     1016        * to greater than 1 TX wthresh must be set to zero, because this is kindof
     1017        * a replacement. See the dpdk programmers guide for more restrictions.
     1018        */
    8161019        .tx_rs_thresh = 1,
    8171020};
    8181021
    819 /* Attach memory to the port and start the port or restart the port.
    820  */
    821 static int dpdk_start_port (struct dpdk_format_data_t * format_data, char *err, int errlen){
    822     int ret; /* Check return values for errors */
    823     struct rte_eth_link link_info; /* Wait for link */
    824    
    825     /* Already started */
    826     if (format_data->paused == DPDK_RUNNING)
    827         return 0;
    828 
    829     /* First time started we need to alloc our memory, doing this here
    830      * rather than in environment setup because we don't have snaplen then */
    831     if (format_data->paused == DPDK_NEVER_STARTED) {
    832         if (format_data->snaplen == 0) {
    833             format_data->snaplen = RX_MBUF_SIZE;
    834             port_conf.rxmode.jumbo_frame = 0;
    835             port_conf.rxmode.max_rx_pkt_len = 0;
    836         } else {
    837             /* Use jumbo frames */
    838             port_conf.rxmode.jumbo_frame = 1;
    839             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    840         }
    841 
    842         /* This is additional overhead so make sure we allow space for this */
     1022/**
     1023 * A callback for a link state change (LSC).
     1024 *
     1025 * Packets may be received before this notification. In fact the DPDK IGXBE
     1026 * driver likes to put a delay upto 5sec before sending this.
     1027 *
     1028 * We use this to ensure the link speed is correct for our timestamp
     1029 * calculations. Because packets might be received before the link up we still
     1030 * update this when the packet is received.
     1031 *
     1032 * @param port The DPDK port
     1033 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
     1034 * @param cb_arg The dpdk_format_data_t structure associated with the format
     1035 */
     1036static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
     1037                              void *cb_arg) {
     1038        struct dpdk_format_data_t * format_data = cb_arg;
     1039        struct rte_eth_link link_info;
     1040        assert(event == RTE_ETH_EVENT_INTR_LSC);
     1041        assert(port == format_data->port);
     1042
     1043        rte_eth_link_get_nowait(port, &link_info);
     1044
     1045        if (link_info.link_status)
     1046                format_data->link_speed = link_info.link_speed;
     1047        else
     1048                format_data->link_speed = 0;
     1049
     1050#if DEBUG
     1051        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
     1052                link_info.link_status ? "up" : "down",
     1053                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
     1054                                          "full-duplex" : "half-duplex",
     1055                (int) link_info.link_speed);
     1056#endif
     1057
     1058        /* Turns out DPDK drivers might not come back up if the link speed
     1059         * changes. So we reset the autoneg procedure. This is very unsafe
     1060         * we have have threads reading packets and we stop the port. */
     1061#if 0
     1062        if (!link_info.link_status) {
     1063                int ret;
     1064                rte_eth_dev_stop(port);
     1065                ret = rte_eth_dev_start(port);
     1066                if (ret < 0) {
     1067                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
     1068                                strerror(-ret));
     1069                }
     1070        }
     1071#endif
     1072}
     1073
     1074/** Reserve a DPDK lcore ID for a thread globally.
     1075 *
     1076 * @param real If true allocate a real lcore, otherwise allocate a core which
     1077 * does not exist on the local machine.
     1078 * @param socket the prefered NUMA socket - only used if a real core is requested
     1079 * @return a valid core, which can later be used with dpdk_register_lcore() or a
     1080 * -1 if have run out of cores.
     1081 *
     1082 * If any thread is reading or freeing packets we need to register it here
     1083 * due to TLS caches in the memory pools.
     1084 */
     1085static int dpdk_reserve_lcore(bool real, int socket) {
     1086        int new_id = -1;
     1087        int i;
     1088        struct rte_config *cfg = rte_eal_get_configuration();
     1089
     1090        pthread_mutex_lock(&dpdk_lock);
     1091        /* If 'reading packets' fill in cores from 0 up and bind affinity
     1092         * otherwise start from the MAX core (which is also the master) and work backwards
     1093         * in this case physical cores on the system will not exist so we don't bind
     1094         * these to any particular physical core */
     1095        if (real) {
     1096#if HAVE_LIBNUMA
     1097                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1098                        if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == socket) {
     1099                                new_id = i;
     1100                                if (!lcore_config[i].detected)
     1101                                        new_id = -1;
     1102                                break;
     1103                        }
     1104                }
     1105#endif
     1106                /* Retry without the the numa restriction */
     1107                if (new_id == -1) {
     1108                        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1109                                if (!rte_lcore_is_enabled(i)) {
     1110                                        new_id = i;
     1111                                        if (!lcore_config[i].detected)
     1112                                                fprintf(stderr, "Warning the"
     1113                                                        " number of 'reading' "
     1114                                                        "threads exceed cores\n");
     1115                                        break;
     1116                                }
     1117                        }
     1118                }
     1119        } else {
     1120                for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1121                        if (!rte_lcore_is_enabled(i)) {
     1122                                new_id = i;
     1123                                break;
     1124                        }
     1125                }
     1126        }
     1127
     1128        if (new_id != -1) {
     1129                /* Enable the core in global DPDK structs */
     1130                cfg->lcore_role[new_id] = ROLE_RTE;
     1131                cfg->lcore_count++;
     1132        }
     1133
     1134        pthread_mutex_unlock(&dpdk_lock);
     1135        return new_id;
     1136}
     1137
     1138/** Register a thread as a lcore
     1139 * @param libtrace any error is set against libtrace on exit
     1140 * @param real If this is a true lcore we will bind its affinty to the
     1141 * requested core.
     1142 * @param lcore The lcore as retrieved from dpdk_reserve_lcore()
     1143 * @return 0, if successful otherwise -1 if an error occured (details are stored
     1144 * in libtrace)
     1145 *
     1146 * @note This must be called from the thread being registered.
     1147 */
     1148static int dpdk_register_lcore(libtrace_t *libtrace, bool real, int lcore) {
     1149        int ret;
     1150        RTE_PER_LCORE(_lcore_id) = lcore;
     1151
     1152        /* Set affinity bind to corresponding core */
     1153        if (real) {
     1154                cpu_set_t cpuset;
     1155                CPU_ZERO(&cpuset);
     1156                CPU_SET(rte_lcore_id(), &cpuset);
     1157                ret = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1158                if (ret != 0) {
     1159                        trace_set_err(libtrace, errno, "Warning "
     1160                                      "pthread_setaffinity_np failed");
     1161                        return -1;
     1162                }
     1163        }
     1164
     1165        return 0;
     1166}
     1167
     1168/** Allocates a new dpdk packet buffer memory pool.
     1169 *
     1170 * @param n The number of threads
     1171 * @param pkt_size The packet size we need ot store
     1172 * @param socket_id The NUMA socket id
     1173 * @param A new mempool, if NULL query the DPDK library for the error code
     1174 * see rte_mempool_create() documentation.
     1175 *
     1176 * This allocates a new pool or recycles an existing memory pool.
     1177 * Call dpdk_free_memory() to free the memory.
     1178 * We cannot delete memory so instead we store the pools, allowing them to be
     1179 * re-used.
     1180 */
     1181static struct rte_mempool *dpdk_alloc_memory(unsigned n,
     1182                                             unsigned pkt_size,
     1183                                             int socket_id) {
     1184        struct rte_mempool *ret;
     1185        size_t j,k;
     1186        char name[MEMPOOL_NAME_LEN];
     1187
     1188        /* Add on packet size overheads */
     1189        pkt_size += sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1190
     1191        pthread_mutex_lock(&dpdk_lock);
     1192
     1193        if (socket_id == SOCKET_ID_ANY || socket_id > 4) {
     1194                /* Best guess go for zero */
     1195                socket_id = 0;
     1196        }
     1197
     1198        /* Find a valid pool */
     1199        for (j = 0; j < RTE_MAX_LCORE && mem_pools[socket_id][j]; ++j) {
     1200                if (mem_pools[socket_id][j]->size >= n &&
     1201                    mem_pools[socket_id][j]->elt_size >= pkt_size) {
     1202                        break;
     1203                }
     1204        }
     1205
     1206        /* Find the end (+1) of the list */
     1207        for (k = j; k < RTE_MAX_LCORE && mem_pools[socket_id][k]; ++k) {}
     1208
     1209        if (mem_pools[socket_id][j]) {
     1210                ret = mem_pools[socket_id][j];
     1211                mem_pools[socket_id][j] = mem_pools[socket_id][k-1];
     1212                mem_pools[socket_id][k-1] = NULL;
     1213                mem_pools[socket_id][j] = NULL;
     1214        } else {
     1215                static uint32_t test = 10;
     1216                test++;
     1217                snprintf(name, MEMPOOL_NAME_LEN,
     1218                         "libtrace_pool_%"PRIu32, test);
     1219
     1220                ret = rte_mempool_create(name, n, pkt_size,
     1221                                         128, sizeof(struct rte_pktmbuf_pool_private),
     1222                                         rte_pktmbuf_pool_init, NULL,
     1223                                         rte_pktmbuf_init, NULL,
     1224                                         socket_id, 0);
     1225        }
     1226
     1227        pthread_mutex_unlock(&dpdk_lock);
     1228        return ret;
     1229}
     1230
     1231/** Stores the memory against the DPDK library.
     1232 *
     1233 * @param mempool The mempool to free
     1234 * @param socket_id The NUMA socket this mempool was allocated upon.
     1235 *
     1236 * Because we cannot free a memory pool, we verify it's full (i.e. unused) and
     1237 * store the memory shared globally against the format.
     1238 */
     1239static void dpdk_free_memory(struct rte_mempool *mempool, int socket_id) {
     1240        size_t i;
     1241        pthread_mutex_lock(&dpdk_lock);
     1242
     1243        /* We should have all entries back in the mempool */
     1244        rte_mempool_audit(mempool);
     1245        if (!rte_mempool_full(mempool)) {
     1246                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
     1247                        "free all packets before finishing a trace\n",
     1248                        rte_mempool_count(mempool), mempool->size);
     1249        }
     1250
     1251        /* Find the end (+1) of the list */
     1252        for (i = 0; i < RTE_MAX_LCORE && mem_pools[socket_id][i]; ++i) {}
     1253
     1254        if (i >= RTE_MAX_LCORE) {
     1255                fprintf(stderr, "Too many memory pools, dropping this one\n");
     1256        } else {
     1257                mem_pools[socket_id][i] = mempool;
     1258        }
     1259
     1260        pthread_mutex_unlock(&dpdk_lock);
     1261}
     1262
     1263/* Attach memory to the port and start (or restart) the port/s.
     1264 */
     1265static int dpdk_start_streams(struct dpdk_format_data_t *format_data,
     1266                              char *err, int errlen, uint16_t rx_queues) {
     1267        int ret, i;
     1268        struct rte_eth_link link_info; /* Wait for link */
     1269        dpdk_per_stream_t empty_stream = DPDK_EMPTY_STREAM;
     1270
     1271        /* Already started */
     1272        if (format_data->paused == DPDK_RUNNING)
     1273                return 0;
     1274
     1275        /* First time started we need to alloc our memory, doing this here
     1276         * rather than in environment setup because we don't have snaplen then */
     1277        if (format_data->paused == DPDK_NEVER_STARTED) {
     1278                if (format_data->snaplen == 0) {
     1279                        format_data->snaplen = RX_MBUF_SIZE;
     1280                        port_conf.rxmode.jumbo_frame = 0;
     1281                        port_conf.rxmode.max_rx_pkt_len = 0;
     1282                } else {
     1283                        /* Use jumbo frames */
     1284                        port_conf.rxmode.jumbo_frame = 1;
     1285                        port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1286                }
     1287
    8431288#if GET_MAC_CRC_CHECKSUM
    844         format_data->snaplen += ETHER_CRC_LEN;
     1289                /* This is additional overhead so make sure we allow space for this */
     1290                format_data->snaplen += ETHER_CRC_LEN;
    8451291#endif
    8461292#if HAS_HW_TIMESTAMPS_82580
    847         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    848 #endif
    849 
    850         /* Create the mbuf pool, which is the place our packets are allocated
    851          * from - TODO figure out if there is is a free function (I cannot see one)
    852          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    853          * allocate however that extra 1 packet is not used.
    854         * (I assume <= vs < error some where in DPDK code)
    855          * TX requires nb_tx_buffers + 1 in the case the queue is full
    856         * so that will fill the new buffer and wait until slots in the
    857         * ring become available.
    858         */
     1293                format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1294#endif
     1295
     1296                /* Create the mbuf pool, which is the place packets are allocated
     1297                 * from - There is no free function (I cannot see one).
     1298                 * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1299                 * allocate however that extra 1 packet is not used.
     1300                * (I assume <= vs < error some where in DPDK code)
     1301                 * TX requires nb_tx_buffers + 1 in the case the queue is full
     1302                * so that will fill the new buffer and wait until slots in the
     1303                * ring become available.
     1304                */
    8591305#if DEBUG
    860     fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    861 #endif
    862         format_data->pktmbuf_pool =
    863             rte_mempool_create(format_data->mempool_name,
    864                        format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
    865                        format_data->snaplen + sizeof(struct rte_mbuf)
    866                                         + RTE_PKTMBUF_HEADROOM,
    867                        8, sizeof(struct rte_pktmbuf_pool_private),
    868                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    869                        rte_socket_id(), MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
    870 
    871         if (format_data->pktmbuf_pool == NULL) {
    872             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
    873                         "pool failed: %s", strerror(rte_errno));
    874             return -1;
    875         }
    876     }
    877    
    878     /* ----------- Now do the setup for the port mapping ------------ */
    879     /* Order of calls must be
    880      * rte_eth_dev_configure()
    881      * rte_eth_tx_queue_setup()
    882      * rte_eth_rx_queue_setup()
    883      * rte_eth_dev_start()
    884      * other rte_eth calls
    885      */
    886    
    887     /* This must be called first before another *eth* function
    888      * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    889     ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
    890     if (ret < 0) {
    891         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    892                             " %"PRIu8" : %s", format_data->port,
    893                             strerror(-ret));
    894         return -1;
    895     }
    896     /* Initialise the TX queue a minimum value if using this port for
    897      * receiving. Otherwise a larger size if writing packets.
    898      */
    899     ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    900                         format_data->nb_tx_buf, rte_socket_id(), &tx_conf);
    901     if (ret < 0) {
    902         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    903                             " %"PRIu8" : %s", format_data->port,
    904                             strerror(-ret));
    905         return -1;
    906     }
    907     /* Initialise the RX queue with some packets from memory */
    908     ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
    909                             format_data->nb_rx_buf, rte_socket_id(),
    910                             &rx_conf, format_data->pktmbuf_pool);
    911     if (ret < 0) {
    912         snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
    913                     " %"PRIu8" : %s", format_data->port,
    914                     strerror(-ret));
    915         return -1;
    916     }
    917    
    918     /* Start device */
    919     ret = rte_eth_dev_start(format_data->port);
    920     if (ret < 0) {
    921         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    922                     strerror(-ret));
    923         return -1;
    924     }
    925 
    926     /* Default promiscuous to on */
    927     if (format_data->promisc == -1)
    928         format_data->promisc = 1;
    929    
    930     if (format_data->promisc == 1)
    931         rte_eth_promiscuous_enable(format_data->port);
    932     else
    933         rte_eth_promiscuous_disable(format_data->port);
    934    
    935     /* Wait for the link to come up */
    936     rte_eth_link_get(format_data->port, &link_info);
     1306                fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
     1307#endif
     1308                format_data->pktmbuf_pool = dpdk_alloc_memory(format_data->nb_tx_buf*2,
     1309                                                              format_data->snaplen,
     1310                                                              format_data->nic_numa_node);
     1311
     1312                if (format_data->pktmbuf_pool == NULL) {
     1313                        snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1314                                 "pool failed: %s", strerror(rte_errno));
     1315                        return -1;
     1316                }
     1317        }
     1318
     1319        /* ----------- Now do the setup for the port mapping ------------ */
     1320        /* Order of calls must be
     1321         * rte_eth_dev_configure()
     1322         * rte_eth_tx_queue_setup()
     1323         * rte_eth_rx_queue_setup()
     1324         * rte_eth_dev_start()
     1325         * other rte_eth calls
     1326         */
     1327
     1328        /* This must be called first before another *eth* function
     1329         * 1+ rx, 1 tx queues, port_conf sets checksum stripping etc */
     1330        ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     1331        if (ret < 0) {
     1332                snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1333                         " %"PRIu8" : %s", format_data->port,
     1334                         strerror(-ret));
     1335                return -1;
     1336        }
    9371337#if DEBUG
    938     fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    939             (int) link_info.link_duplex, (int) link_info.link_speed);
    940 #endif
    941 
    942     /* We have now successfully started/unpaused */
    943     format_data->paused = DPDK_RUNNING;
    944    
    945     return 0;
     1338        fprintf(stderr, "Doing dev configure\n");
     1339#endif
     1340        /* Initialise the TX queue a minimum value if using this port for
     1341         * receiving. Otherwise a larger size if writing packets.
     1342         */
     1343        ret = rte_eth_tx_queue_setup(format_data->port,
     1344                                     0 /* queue XXX */,
     1345                                     format_data->nb_tx_buf,
     1346                                     SOCKET_ID_ANY,
     1347                                     &tx_conf);
     1348        if (ret < 0) {
     1349                snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue"
     1350                         " on port %"PRIu8" : %s", format_data->port,
     1351                         strerror(-ret));
     1352                return -1;
     1353        }
     1354
     1355        /* Attach memory to our RX queues */
     1356        for (i=0; i < rx_queues; i++) {
     1357                dpdk_per_stream_t *stream;
     1358#if DEBUG
     1359                fprintf(stderr, "Configuring queue %d\n", i);
     1360#endif
     1361
     1362                /* Add storage for the stream */
     1363                if (libtrace_list_get_size(format_data->per_stream) <= (size_t) i)
     1364                        libtrace_list_push_back(format_data->per_stream, &empty_stream);
     1365                stream = libtrace_list_get_index(format_data->per_stream, i)->data;
     1366                stream->queue_id = i;
     1367
     1368                if (stream->lcore == -1)
     1369                        stream->lcore = dpdk_reserve_lcore(true, format_data->nic_numa_node);
     1370
     1371                if (stream->lcore == -1) {
     1372                        snprintf(err, errlen, "Intel DPDK - Failed to reserve a lcore"
     1373                                 ". Too many threads?");
     1374                        return -1;
     1375                }
     1376
     1377                if (stream->mempool == NULL) {
     1378                        stream->mempool = dpdk_alloc_memory(
     1379                                                  format_data->nb_rx_buf*2,
     1380                                                  format_data->snaplen,
     1381                                                  rte_lcore_to_socket_id(stream->lcore));
     1382
     1383                        if (stream->mempool == NULL) {
     1384                                snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1385                                         "pool failed: %s", strerror(rte_errno));
     1386                                return -1;
     1387                        }
     1388                }
     1389
     1390                /* Initialise the RX queue with some packets from memory */
     1391                ret = rte_eth_rx_queue_setup(format_data->port,
     1392                                             stream->queue_id,
     1393                                             format_data->nb_rx_buf,
     1394                                             format_data->nic_numa_node,
     1395                                             &rx_conf,
     1396                                             stream->mempool);
     1397                if (ret < 0) {
     1398                        snprintf(err, errlen, "Intel DPDK - Cannot configure"
     1399                                 " RX queue on port %"PRIu8" : %s",
     1400                                 format_data->port,
     1401                                 strerror(-ret));
     1402                        return -1;
     1403                }
     1404        }
     1405
     1406#if DEBUG
     1407        fprintf(stderr, "Doing start device\n");
     1408#endif
     1409        rte_eth_stats_reset(format_data->port);
     1410        /* Start device */
     1411        ret = rte_eth_dev_start(format_data->port);
     1412        if (ret < 0) {
     1413                snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1414                         strerror(-ret));
     1415                return -1;
     1416        }
     1417
     1418        /* Default promiscuous to on */
     1419        if (format_data->promisc == -1)
     1420                format_data->promisc = 1;
     1421
     1422        if (format_data->promisc == 1)
     1423                rte_eth_promiscuous_enable(format_data->port);
     1424        else
     1425                rte_eth_promiscuous_disable(format_data->port);
     1426
     1427        /* We have now successfully started/unpased */
     1428        format_data->paused = DPDK_RUNNING;
     1429
     1430
     1431        /* Register a callback for link state changes */
     1432        ret = rte_eth_dev_callback_register(format_data->port,
     1433                                            RTE_ETH_EVENT_INTR_LSC,
     1434                                            dpdk_lsc_callback,
     1435                                            format_data);
     1436#if DEBUG
     1437        if (ret)
     1438                fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
     1439                        ret, strerror(-ret));
     1440#endif
     1441
     1442        /* Get the current link status */
     1443        rte_eth_link_get_nowait(format_data->port, &link_info);
     1444        format_data->link_speed = link_info.link_speed;
     1445#if DEBUG
     1446        fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
     1447                (int) link_info.link_duplex, (int) link_info.link_speed);
     1448#endif
     1449
     1450        return 0;
    9461451}
    9471452
    9481453static int dpdk_start_input (libtrace_t *libtrace) {
    949     char err[500];
    950     err[0] = 0;
    951 
    952     if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    953         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    954         free(libtrace->format_data);
    955         libtrace->format_data = NULL;
    956         return -1;
    957     }
    958     return 0;
     1454        char err[500];
     1455        err[0] = 0;
     1456
     1457        /* Make sure we don't reserve an extra thread for this */
     1458        FORMAT_DATA_FIRST(libtrace)->queue_id = rte_lcore_id();
     1459
     1460        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
     1461                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1462                free(libtrace->format_data);
     1463                libtrace->format_data = NULL;
     1464                return -1;
     1465        }
     1466        return 0;
     1467}
     1468
     1469static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
     1470        struct rte_eth_dev_info dev_info;
     1471        rte_eth_dev_info_get(port_id, &dev_info);
     1472        return dev_info.max_rx_queues;
     1473}
     1474
     1475static inline size_t dpdk_processor_count () {
     1476        long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1477        if (nb_cpu <= 0)
     1478                return 1;
     1479        else
     1480                return (size_t) nb_cpu;
     1481}
     1482
     1483static int dpdk_pstart_input (libtrace_t *libtrace) {
     1484        char err[500];
     1485        int i=0, phys_cores=0;
     1486        int tot = libtrace->perpkt_thread_count;
     1487        libtrace_list_node_t *n;
     1488        err[0] = 0;
     1489
     1490        if (rte_lcore_id() != rte_get_master_lcore())
     1491                fprintf(stderr, "Warning dpdk_pstart_input should be called"
     1492                        " from the master DPDK thread!\n");
     1493
     1494        /* If the master is not on the last thread we move it there */
     1495        if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1496                if (dpdk_move_master_lcore(libtrace, RTE_MAX_LCORE - 1) != 0)
     1497                        return -1;
     1498        }
     1499
     1500        /* Don't exceed the number of cores in the system/detected by dpdk
     1501         * We don't have to force this but performance wont be good if we don't */
     1502        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1503                if (lcore_config[i].detected) {
     1504                        if (rte_lcore_is_enabled(i)) {
     1505#if DEBUG
     1506                                fprintf(stderr, "Found core %d already in use!\n", i);
     1507#endif
     1508                        } else {
     1509                                phys_cores++;
     1510                        }
     1511                }
     1512        }
     1513        /* If we are restarting we have already allocated some threads as such
     1514         * we add these back to the count for this calculation */
     1515        for (n = FORMAT_DATA_HEAD(libtrace); n; n = n->next) {
     1516                dpdk_per_stream_t * stream = n->data;
     1517                if (stream->lcore != -1)
     1518                        phys_cores++;
     1519        }
     1520
     1521        tot = MIN(libtrace->perpkt_thread_count,
     1522                  dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1523        tot = MIN(tot, phys_cores);
     1524
     1525#if DEBUG
     1526        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot,
     1527                libtrace->perpkt_thread_count, phys_cores);
     1528#endif
     1529
     1530        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1531                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1532                free(libtrace->format_data);
     1533                libtrace->format_data = NULL;
     1534                return -1;
     1535        }
     1536
     1537        /* Make sure we only start the number that we should */
     1538        libtrace->perpkt_thread_count = tot;
     1539        return 0;
     1540}
     1541
     1542/**
     1543 * Register a thread with the DPDK system,
     1544 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1545 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1546 * gives it.
     1547 *
     1548 * We then allow a mapper thread to be started on every real core as DPDK would,
     1549 * we also bind these to the corresponding CPU cores.
     1550 *
     1551 * @param libtrace A pointer to the trace
     1552 * @param reading True if the thread will be used to read packets, i.e. will
     1553 *                call pread_packet(), false if thread used to process packet
     1554 *                in any other manner including statistics functions.
     1555 */
     1556static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1557{
     1558#if DEBUG
     1559        char name[99];
     1560        pthread_getname_np(pthread_self(),
     1561                           name, sizeof(name));
     1562#endif
     1563        if (reading) {
     1564                dpdk_per_stream_t *stream;
     1565                /* Attach our thread */
     1566                if(t->type == THREAD_PERPKT) {
     1567                        t->format_data = libtrace_list_get_index(FORMAT(libtrace)->per_stream, t->perpkt_num)->data;
     1568                        if (t->format_data == NULL) {
     1569                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     1570                                              "Too many threads registered");
     1571                                return -1;
     1572                        }
     1573                } else {
     1574                        t->format_data = FORMAT_DATA_FIRST(libtrace);
     1575                }
     1576                stream = t->format_data;
     1577#if DEBUG
     1578                fprintf(stderr, "%s new id memory:%s cpu-core:%d\n", name, stream->mempool->name, rte_lcore_id());
     1579#endif
     1580                return dpdk_register_lcore(libtrace, true, stream->lcore);
     1581        } else {
     1582                int lcore = dpdk_reserve_lcore(reading, 0);
     1583                if (lcore == -1) {
     1584                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Too many threads"
     1585                                      " for DPDK");
     1586                        return -1;
     1587                }
     1588#if DEBUG
     1589                fprintf(stderr, "%s new id cpu-core:%d\n", name, rte_lcore_id());
     1590#endif
     1591                return dpdk_register_lcore(libtrace, false, lcore);
     1592        }
     1593
     1594        return 0;
     1595}
     1596
     1597/**
     1598 * Unregister a thread with the DPDK system.
     1599 *
     1600 * Only previously registered threads should be calling this just before
     1601 * they are destroyed.
     1602 */
     1603static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
     1604{
     1605        struct rte_config *cfg = rte_eal_get_configuration();
     1606
     1607        assert(rte_lcore_id() < RTE_MAX_LCORE);
     1608        pthread_mutex_lock(&dpdk_lock);
     1609        /* Skip if master */
     1610        if (rte_lcore_id() == rte_get_master_lcore()) {
     1611                fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1612                pthread_mutex_unlock(&dpdk_lock);
     1613                return;
     1614        }
     1615
     1616        /* Disable this core in global DPDK structs */
     1617        cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1618        cfg->lcore_count--;
     1619        RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1620        assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1621        pthread_mutex_unlock(&dpdk_lock);
     1622        return;
    9591623}
    9601624
    9611625static int dpdk_start_output(libtrace_out_t *libtrace)
    9621626{
    963     char err[500];
    964     err[0] = 0;
    965    
    966     if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    967         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    968         free(libtrace->format_data);
    969         libtrace->format_data = NULL;
    970         return -1;
    971     }
    972     return 0;
    973 }
    974 
    975 static int dpdk_pause_input(libtrace_t * libtrace){
    976     /* This stops the device, but can be restarted using rte_eth_dev_start() */
    977     if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
    978 #if DEBUG     
    979         fprintf(stderr, "Pausing port\n");
    980 #endif
    981         rte_eth_dev_stop(FORMAT(libtrace)->port);
    982         FORMAT(libtrace)->paused = DPDK_PAUSED;
    983         /* If we pause it the driver will be reset and likely our counter */
     1627        char err[500];
     1628        err[0] = 0;
     1629
     1630        if (dpdk_start_streams(FORMAT(libtrace), err, sizeof(err), 1) != 0) {
     1631                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1632                free(libtrace->format_data);
     1633                libtrace->format_data = NULL;
     1634                return -1;
     1635        }
     1636        return 0;
     1637}
     1638
     1639static int dpdk_pause_input(libtrace_t * libtrace) {
     1640        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD(libtrace);
     1641        /* This stops the device, but can be restarted using rte_eth_dev_start() */
     1642        if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
     1643#if DEBUG
     1644                fprintf(stderr, "Pausing DPDK port\n");
     1645#endif
     1646                rte_eth_dev_stop(FORMAT(libtrace)->port);
     1647                FORMAT(libtrace)->paused = DPDK_PAUSED;
     1648                /* Empty the queue of packets */
     1649                for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
     1650                        rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
     1651                }
     1652                FORMAT(libtrace)->burst_offset = 0;
     1653                FORMAT(libtrace)->burst_size = 0;
     1654
     1655                for (; tmp != NULL; tmp = tmp->next) {
     1656                        dpdk_per_stream_t *stream = tmp->data;
     1657                        stream->ts_last_sys = 0;
    9841658#if HAS_HW_TIMESTAMPS_82580
    985         FORMAT(libtrace)->ts_first_sys = 0;
    986         FORMAT(libtrace)->ts_last_sys = 0;
    987 #endif
    988     }
    989     return 0;
    990 }
    991 
    992 static int dpdk_write_packet(libtrace_out_t *trace,
    993                 libtrace_packet_t *packet){
    994     struct rte_mbuf* m_buff[1];
    995    
    996     int wirelen = trace_get_wire_length(packet);
    997     int caplen = trace_get_capture_length(packet);
    998    
    999     /* Check for a checksum and remove it */
    1000     if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
    1001                                             wirelen == caplen)
    1002         caplen -= ETHER_CRC_LEN;
    1003 
    1004     m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
    1005     if (m_buff[0] == NULL) {
    1006         trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
    1007         return -1;
    1008     } else {
    1009         int ret;
    1010         memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
    1011         do {
    1012             ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
    1013         } while (ret != 1);
    1014     }
    1015 
    1016     return 0;
     1659                        stream->ts_first_sys = 0;
     1660#endif
     1661                }
     1662
     1663        }
     1664        return 0;
     1665}
     1666
     1667static int dpdk_write_packet(libtrace_out_t *trace,
     1668                             libtrace_packet_t *packet){
     1669        struct rte_mbuf* m_buff[1];
     1670
     1671        int wirelen = trace_get_wire_length(packet);
     1672        int caplen = trace_get_capture_length(packet);
     1673
     1674        /* Check for a checksum and remove it */
     1675        if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
     1676            wirelen == caplen)
     1677                caplen -= ETHER_CRC_LEN;
     1678
     1679        m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
     1680        if (m_buff[0] == NULL) {
     1681                trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
     1682                return -1;
     1683        } else {
     1684                int ret;
     1685                memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
     1686                do {
     1687                        ret = rte_eth_tx_burst(0 /*queue TODO*/, FORMAT(trace)->port, m_buff, 1);
     1688                } while (ret != 1);
     1689        }
     1690
     1691        return 0;
    10171692}
    10181693
    10191694static int dpdk_fin_input(libtrace_t * libtrace) {
    1020     /* Free our memory structures */
    1021     if (libtrace->format_data != NULL) {
    1022         /* Close the device completely, device cannot be restarted */
    1023         if (FORMAT(libtrace)->port != 0xFF)
    1024             rte_eth_dev_close(FORMAT(libtrace)->port);
    1025         /* filter here if we used it */
     1695        libtrace_list_node_t * n;
     1696        /* Free our memory structures */
     1697        if (libtrace->format_data != NULL) {
     1698
     1699                if (FORMAT(libtrace)->port != 0xFF)
     1700                        rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
     1701                                                        RTE_ETH_EVENT_INTR_LSC,
     1702                                                        dpdk_lsc_callback,
     1703                                                        FORMAT(libtrace));
     1704                /* Close the device completely, device cannot be restarted */
     1705                rte_eth_dev_close(FORMAT(libtrace)->port);
     1706
     1707                dpdk_free_memory(FORMAT(libtrace)->pktmbuf_pool,
     1708                                 FORMAT(libtrace)->nic_numa_node);
     1709
     1710                for (n = FORMAT(libtrace)->per_stream->head; n ; n = n->next) {
     1711                        dpdk_per_stream_t * stream = n->data;
     1712                        if (stream->mempool)
     1713                                dpdk_free_memory(stream->mempool,
     1714                                                 rte_lcore_to_socket_id(stream->lcore));
     1715                }
     1716
     1717                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
     1718                /* filter here if we used it */
    10261719                free(libtrace->format_data);
    10271720        }
    10281721
    1029     /* Revert to the original PCI drivers */
    1030     /* No longer in DPDK
    1031     rte_eal_pci_exit(); */
    1032     return 0;
     1722        return 0;
    10331723}
    10341724
    10351725
    10361726static int dpdk_fin_output(libtrace_out_t * libtrace) {
    1037     /* Free our memory structures */
    1038     if (libtrace->format_data != NULL) {
    1039         /* Close the device completely, device cannot be restarted */
    1040         if (FORMAT(libtrace)->port != 0xFF)
    1041             rte_eth_dev_close(FORMAT(libtrace)->port);
    1042         /* filter here if we used it */
     1727        /* Free our memory structures */
     1728        if (libtrace->format_data != NULL) {
     1729                /* Close the device completely, device cannot be restarted */
     1730                if (FORMAT(libtrace)->port != 0xFF)
     1731                        rte_eth_dev_close(FORMAT(libtrace)->port);
     1732                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
     1733                /* filter here if we used it */
    10431734                free(libtrace->format_data);
    10441735        }
    10451736
    1046     /* Revert to the original PCI drivers */
    1047     /* No longer in DPDK
    1048     rte_eal_pci_exit(); */
    1049     return 0;
    1050 }
    1051 
    1052 /**
    1053  * Get the start of additional header that we added to a packet.
     1737        return 0;
     1738}
     1739
     1740/**
     1741 * Get the start of the additional header that we added to a packet.
    10541742 */
    10551743static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
    1056     uint8_t *hdrsize;
    1057     assert(packet);
    1058     assert(packet->buffer);
    1059     hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
    1060     /* The byte before the original packet data denotes the size in bytes
    1061      * of our additional header that we added sits before the 'size byte' */
    1062     hdrsize--;
    1063     return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
     1744        assert(packet);
     1745        assert(packet->buffer);
     1746        /* Our header sits straight after the mbuf header */
     1747        return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
    10641748}
    10651749
    10661750static int dpdk_get_capture_length (const libtrace_packet_t *packet) {
    1067     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1068     return hdr->cap_len;
     1751        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1752        return hdr->cap_len;
    10691753}
    10701754
    10711755static size_t dpdk_set_capture_length(libtrace_packet_t *packet, size_t size) {
    1072     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1073     if (size > hdr->cap_len) {
    1074         /* Cannot make a packet bigger */
     1756        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1757        if (size > hdr->cap_len) {
     1758                /* Cannot make a packet bigger */
    10751759                return trace_get_capture_length(packet);
    10761760        }
    10771761
    1078     /* Reset the cached capture length first*/
    1079     packet->capture_length = -1;
    1080     hdr->cap_len = (uint32_t) size;
     1762        /* Reset the cached capture length first*/
     1763        packet->capture_length = -1;
     1764        hdr->cap_len = (uint32_t) size;
    10811765        return trace_get_capture_length(packet);
    10821766}
    10831767
    10841768static int dpdk_get_wire_length (const libtrace_packet_t *packet) {
    1085     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1086     int org_cap_size; /* The original capture size */
    1087     if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
    1088         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1089                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
    1090                             sizeof(struct hw_timestamp_82580);
    1091     } else {
    1092         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1093                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
    1094     }
    1095     if (hdr->flags & INCLUDES_CHECKSUM) {
    1096         return org_cap_size;
    1097     } else {
    1098         /* DPDK packets are always TRACE_TYPE_ETH packets */
    1099         return org_cap_size + ETHER_CRC_LEN;
    1100     }
    1101 }
     1769        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1770        int org_cap_size; /* The original capture size */
     1771        if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
     1772                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
     1773                               sizeof(struct hw_timestamp_82580);
     1774        } else {
     1775                org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
     1776        }
     1777        if (hdr->flags & INCLUDES_CHECKSUM) {
     1778                return org_cap_size;
     1779        } else {
     1780                /* DPDK packets are always TRACE_TYPE_ETH packets */
     1781                return org_cap_size + ETHER_CRC_LEN;
     1782        }
     1783}
     1784
    11021785static int dpdk_get_framing_length (const libtrace_packet_t *packet) {
    1103     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1104     if (hdr->flags & INCLUDES_HW_TIMESTAMP)
    1105         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
    1106                 sizeof(struct hw_timestamp_82580);
    1107     else
    1108         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1786        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     1787        if (hdr->flags & INCLUDES_HW_TIMESTAMP)
     1788                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
     1789                                sizeof(struct hw_timestamp_82580);
     1790        else
     1791                return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
    11091792}
    11101793
    11111794static int dpdk_prepare_packet(libtrace_t *libtrace UNUSED,
    1112                 libtrace_packet_t *packet, void *buffer,
    1113                 libtrace_rt_types_t rt_type, uint32_t flags) {
    1114     assert(packet);
    1115     if (packet->buffer != buffer &&
    1116         packet->buf_control == TRACE_CTRL_PACKET) {
    1117         free(packet->buffer);
    1118     }
    1119 
    1120     if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    1121         packet->buf_control = TRACE_CTRL_PACKET;
    1122     } else
    1123         packet->buf_control = TRACE_CTRL_EXTERNAL;
    1124 
    1125     packet->buffer = buffer;
    1126     packet->header = buffer;
    1127 
    1128     /* Don't use pktmbuf_mtod will fail if the packet is a copy */
    1129     packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
    1130     packet->type = rt_type;
    1131     return 0;
    1132 }
    1133 
    1134 /*
    1135  * Does any extra preperation to a captured packet.
    1136  * This includes adding our extra header to it with the timestamp
    1137  */
    1138 static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
    1139                                                         struct rte_mbuf* pkt){
    1140     uint8_t * hdr_size;
    1141     struct dpdk_addt_hdr *hdr;
     1795                               libtrace_packet_t *packet, void *buffer,
     1796                               libtrace_rt_types_t rt_type, uint32_t flags) {
     1797        assert(packet);
     1798        if (packet->buffer != buffer &&
     1799            packet->buf_control == TRACE_CTRL_PACKET) {
     1800                free(packet->buffer);
     1801        }
     1802
     1803        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
     1804                packet->buf_control = TRACE_CTRL_PACKET;
     1805        else
     1806                packet->buf_control = TRACE_CTRL_EXTERNAL;
     1807
     1808        packet->buffer = buffer;
     1809        packet->header = buffer;
     1810
     1811        /* Don't use pktmbuf_mtod will fail if the packet is a copy */
     1812        packet->payload = (char *)buffer + dpdk_get_framing_length(packet);
     1813        packet->type = rt_type;
     1814        return 0;
     1815}
     1816
     1817/**
     1818 * Given a packet size and a link speed, computes the
     1819 * time to transmit in nanoseconds.
     1820 *
     1821 * @param format_data The dpdk format data from which we get the link speed
     1822 *        and if unset updates it in a thread safe manner
     1823 * @param pkt_size The size of the packet in bytes
     1824 * @return The wire time in nanoseconds
     1825 */
     1826static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
     1827        uint32_t wire_time;
     1828        /* 20 extra bytes of interframe gap and preamble */
     1829# if GET_MAC_CRC_CHECKSUM
     1830        wire_time = ((pkt_size + 20) * 8000);
     1831# else
     1832        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
     1833# endif
     1834
     1835        /* Division is really slow and introduces a pipeline stall
     1836         * The compiler will optimise this into magical multiplication and shifting
     1837         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
     1838         */
     1839retry_calc_wiretime:
     1840        switch (format_data->link_speed) {
     1841        case ETH_LINK_SPEED_40G:
     1842                wire_time /=  ETH_LINK_SPEED_40G;
     1843                break;
     1844        case ETH_LINK_SPEED_20G:
     1845                wire_time /= ETH_LINK_SPEED_20G;
     1846                break;
     1847        case ETH_LINK_SPEED_10G:
     1848                wire_time /= ETH_LINK_SPEED_10G;
     1849                break;
     1850        case ETH_LINK_SPEED_1000:
     1851                wire_time /= ETH_LINK_SPEED_1000;
     1852                break;
     1853        case 0:
     1854                {
     1855                /* Maybe the link was down originally, but now it should be up */
     1856                struct rte_eth_link link = {0};
     1857                rte_eth_link_get_nowait(format_data->port, &link);
     1858                if (link.link_status && link.link_speed) {
     1859                        format_data->link_speed = link.link_speed;
     1860#ifdef DEBUG
     1861                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
     1862#endif
     1863                        goto retry_calc_wiretime;
     1864                }
     1865                /* We don't know the link speed, make sure numbers are counting up */
     1866                wire_time = 1;
     1867                break;
     1868                }
     1869        default:
     1870                wire_time /= format_data->link_speed;
     1871        }
     1872        return wire_time;
     1873}
     1874
     1875/**
     1876 * Does any extra preperation to all captured packets
     1877 * This includes adding our extra header to it with the timestamp,
     1878 * and any snapping
     1879 *
     1880 * @param format_data The DPDK format data
     1881 * @param plc The DPDK per lcore format data
     1882 * @param pkts An array of size nb_pkts of DPDK packets
     1883 */
     1884static inline void dpdk_ready_pkts(libtrace_t *libtrace,
     1885                                   struct dpdk_per_stream_t *plc,
     1886                                   struct rte_mbuf **pkts,
     1887                                   size_t nb_pkts) {
     1888        struct dpdk_format_data_t *format_data = FORMAT(libtrace);
     1889        struct dpdk_addt_hdr *hdr;
     1890        size_t i;
     1891        uint64_t cur_sys_time_ns;
    11421892#if HAS_HW_TIMESTAMPS_82580
    1143     struct hw_timestamp_82580 *hw_ts;
    1144     struct timeval cur_sys_time;
    1145     uint64_t cur_sys_time_ns;
    1146     uint64_t estimated_wraps;
    1147    
    1148     /* Using gettimeofday because it's most likely to be a vsyscall
    1149      * We don't want to slow down anything with systemcalls we dont need
    1150      * accauracy */
    1151     gettimeofday(&cur_sys_time, NULL);
     1893        struct hw_timestamp_82580 *hw_ts;
     1894        uint64_t estimated_wraps;
    11521895#else
    1153 # if USE_CLOCK_GETTIME
    1154     struct timespec cur_sys_time;
    1155    
    1156     /* This looks terrible and I feel bad doing it. But it's OK
    1157      * on new kernels, because this is a vsyscall */
    1158     clock_gettime(CLOCK_REALTIME, &cur_sys_time);
    1159 # else
    1160     struct timeval cur_sys_time;
    1161     /* Should be a vsyscall */
    1162     gettimeofday(&cur_sys_time, NULL);
    1163 # endif
    1164 #endif
    1165 
    1166     /* Record the size of our header */
    1167     hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
    1168     *hdr_size = sizeof(struct dpdk_addt_hdr);
    1169     /* Now put our header in front of that size */
    1170     hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
    1171     memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
    1172    
     1896
     1897#endif
     1898
     1899#if USE_CLOCK_GETTIME
     1900        struct timespec cur_sys_time = {0};
     1901        /* This looks terrible and I feel bad doing it. But it's OK
     1902         * on new kernels, because this is a fast vsyscall */
     1903        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
     1904        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
     1905#else
     1906        struct timeval cur_sys_time = {0};
     1907        /* Also a fast vsyscall */
     1908        gettimeofday(&cur_sys_time, NULL);
     1909        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
     1910#endif
     1911
     1912        /* The system clock is not perfect so when running
     1913         * at linerate we could timestamp a packet in the past.
     1914         * To avoid this we munge the timestamp to appear 1ns
     1915         * after the previous packet. We should eventually catch up
     1916         * to system time since a 64byte packet on a 10G link takes 67ns.
     1917         *
     1918         * Note with parallel readers timestamping packets
     1919         * with duplicate stamps or out of order is unavoidable without
     1920         * hardware timestamping from the NIC.
     1921         */
     1922#if !HAS_HW_TIMESTAMPS_82580
     1923        if (plc->ts_last_sys >= cur_sys_time_ns) {
     1924                cur_sys_time_ns = plc->ts_last_sys + 1;
     1925        }
     1926#endif
     1927
     1928        ct_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr));
     1929        for (i = 0 ; i < nb_pkts ; ++i) {
     1930
     1931                /* We put our header straight after the dpdk header */
     1932                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
     1933                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
     1934
    11731935#if GET_MAC_CRC_CHECKSUM
    1174     /* Add back in the CRC sum */
    1175     rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
    1176     rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
    1177     hdr->flags |= INCLUDES_CHECKSUM;
    1178 #endif
     1936                /* Add back in the CRC sum */
     1937                rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
     1938                rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
     1939                hdr->flags |= INCLUDES_CHECKSUM;
     1940#endif
     1941
     1942                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
    11791943
    11801944#if HAS_HW_TIMESTAMPS_82580
    1181     /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
    1182      *
    1183      *        +----------+---+   +--------------+
    1184      *  82580 |    24    | 8 |   |      32      |
    1185      *        +----------+---+   +--------------+
    1186      *          reserved  \______ 40 bits _____/
    1187      *
    1188      * The 40 bit 82580 SYSTIM overflows every
    1189      *   2^40 * 10^-9 /  60  = 18.3 minutes.
    1190      *
    1191      * NOTE picture is in Big Endian order, in memory it's acutally in Little
    1192      * Endian (for the full 64 bits) i.e. picture is mirrored
    1193      */
    1194    
    1195     /* The timestamp is sitting before our packet and is included in pkt_len */
    1196     hdr->flags |= INCLUDES_HW_TIMESTAMP;
    1197     hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
    1198    
    1199     /* Despite what the documentation says this is in Little
    1200      * Endian byteorder. Mask the reserved section out.
    1201      */
    1202     hdr->timestamp = le64toh(hw_ts->timestamp) &
    1203                 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
    1204                
    1205     cur_sys_time_ns = TV_TO_NS(cur_sys_time);
    1206     if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
    1207         FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
    1208         FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
    1209     }
    1210    
    1211     /* This will have serious problems if packets aren't read quickly
    1212      * that is within a couple of seconds because our clock cycles every
    1213      * 18 seconds */
    1214     estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
    1215                             / (1ull<<TS_NBITS_82580);
    1216    
    1217     /* Estimated_wraps gives the number of times the counter should have
    1218      * wrapped (however depending on value last time it could have wrapped
    1219      * twice more (if hw clock is close to its max value) or once less (allowing
    1220      * for a bit of variance between hw and sys clock). But if the clock
    1221      * shouldn't have wrapped once then don't allow it to go backwards in time */
    1222     if (unlikely(estimated_wraps >= 2)) {
    1223         /* 2 or more wrap arounds add all but the very last wrap */
    1224         FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
    1225     }
    1226    
    1227     /* Set the timestamp to the lowest possible value we're considering */
    1228     hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
    1229                         FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
    1230    
    1231     /* In most runs only the first if() will need evaluating - i.e our
    1232      * estimate is correct. */
    1233     if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
    1234                                 hdr->timestamp, MAXSKEW_82580))) {
    1235         /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
    1236         FORMAT(libtrace)->wrap_count++;
    1237         hdr->timestamp += (1ull<<TS_NBITS_82580);
    1238         if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1239                                 hdr->timestamp, MAXSKEW_82580)) {
    1240             /* Failed to match estimated_wraps */
    1241             FORMAT(libtrace)->wrap_count++;
    1242             hdr->timestamp += (1ull<<TS_NBITS_82580);
    1243             if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1244                                 hdr->timestamp, MAXSKEW_82580)) {
    1245                 if (estimated_wraps == 0) {
    1246                     /* 0 case Failed to match estimated_wraps+2 */
    1247                     printf("WARNING - Hardware Timestamp failed to"
    1248                                             " match using systemtime!\n");
    1249                     hdr->timestamp = cur_sys_time_ns;
    1250                 } else {
    1251                     /* Failed to match estimated_wraps+1 */
    1252                     FORMAT(libtrace)->wrap_count++;
    1253                     hdr->timestamp += (1ull<<TS_NBITS_82580);
    1254                     if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1255                                 hdr->timestamp, MAXSKEW_82580)) {
    1256                         /* Failed to match estimated_wraps+2 */
    1257                         printf("WARNING - Hardware Timestamp failed to"
    1258                                             " match using systemtime!!\n");
    1259                     }
    1260                 }
    1261             }
    1262         }
    1263     }
    1264 
    1265     /* Log our previous for the next loop */
    1266     FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
    1267 
     1945                /* The timestamp is sitting before our packet and is included in pkt_len */
     1946                hdr->flags |= INCLUDES_HW_TIMESTAMP;
     1947                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
     1948                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
     1949
     1950                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
     1951                 *
     1952                 *        +----------+---+   +--------------+
     1953                 *  82580 |    24    | 8 |   |      32      |
     1954                 *        +----------+---+   +--------------+
     1955                 *          reserved  \______ 40 bits _____/
     1956                 *
     1957                 * The 40 bit 82580 SYSTIM overflows every
     1958                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
     1959                 *
     1960                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
     1961                 * Endian (for the full 64 bits) i.e. picture is mirrored
     1962                 */
     1963
     1964                /* Despite what the documentation says this is in Little
     1965                 * Endian byteorder. Mask the reserved section out.
     1966                 */
     1967                hdr->timestamp = le64toh(hw_ts->timestamp) &
     1968                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
     1969
     1970                if (unlikely(plc->ts_first_sys == 0)) {
     1971                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
     1972                        plc->ts_last_sys = plc->ts_first_sys;
     1973                }
     1974
     1975                /* This will have serious problems if packets aren't read quickly
     1976                 * that is within a couple of seconds because our clock cycles every
     1977                 * 18 seconds */
     1978                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
     1979                                  / (1ull<<TS_NBITS_82580);
     1980
     1981                /* Estimated_wraps gives the number of times the counter should have
     1982                 * wrapped (however depending on value last time it could have wrapped
     1983                 * twice more (if hw clock is close to its max value) or once less (allowing
     1984                 * for a bit of variance between hw and sys clock). But if the clock
     1985                 * shouldn't have wrapped once then don't allow it to go backwards in time */
     1986                if (unlikely(estimated_wraps >= 2)) {
     1987                        /* 2 or more wrap arounds add all but the very last wrap */
     1988                        plc->wrap_count += estimated_wraps - 1;
     1989                }
     1990
     1991                /* Set the timestamp to the lowest possible value we're considering */
     1992                hdr->timestamp += plc->ts_first_sys +
     1993                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
     1994
     1995                /* In most runs only the first if() will need evaluating - i.e our
     1996                 * estimate is correct. */
     1997                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
     1998                                              hdr->timestamp, MAXSKEW_82580))) {
     1999                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
     2000                        plc->wrap_count++;
     2001                        hdr->timestamp += (1ull<<TS_NBITS_82580);
     2002                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2003                                             hdr->timestamp, MAXSKEW_82580)) {
     2004                                /* Failed to match estimated_wraps */
     2005                                plc->wrap_count++;
     2006                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     2007                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2008                                                     hdr->timestamp, MAXSKEW_82580)) {
     2009                                        if (estimated_wraps == 0) {
     2010                                                /* 0 case Failed to match estimated_wraps+2 */
     2011                                                printf("WARNING - Hardware Timestamp failed to"
     2012                                                       " match using systemtime!\n");
     2013                                                hdr->timestamp = cur_sys_time_ns;
     2014                                        } else {
     2015                                                /* Failed to match estimated_wraps+1 */
     2016                                                plc->wrap_count++;
     2017                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     2018                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     2019                                                                     hdr->timestamp, MAXSKEW_82580)) {
     2020                                                        /* Failed to match estimated_wraps+2 */
     2021                                                        printf("WARNING - Hardware Timestamp failed to"
     2022                                                               " match using systemtime!!\n");
     2023                                                }
     2024                                        }
     2025                                }
     2026                        }
     2027                }
    12682028#else
    1269 # if USE_CLOCK_GETTIME
    1270     hdr->timestamp = TS_TO_NS(cur_sys_time);
    1271 # else
    1272     hdr->timestamp = TV_TO_NS(cur_sys_time);
    1273 # endif
    1274 #endif
    1275 
    1276     /* Intels samples prefetch into level 0 cache lets assume it is a good
    1277      * idea and do the same */
    1278     rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
    1279     packet->buffer = pkt;
    1280     dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
    1281 
    1282     /* Set our capture length for the first time */
    1283     hdr->cap_len = dpdk_get_wire_length(packet);
    1284     if (!(hdr->flags & INCLUDES_CHECKSUM)) {
    1285         hdr->cap_len -= ETHER_CRC_LEN;
    1286     }
    1287    
    1288 
    1289     return dpdk_get_framing_length(packet) +
    1290                         dpdk_get_capture_length(packet);
     2029
     2030                hdr->timestamp = cur_sys_time_ns;
     2031                /* Offset the next packet by the wire time of previous */
     2032                calculate_wire_time(format_data, hdr->cap_len);
     2033
     2034#endif
     2035        }
     2036
     2037        plc->ts_last_sys = cur_sys_time_ns;
     2038        return;
     2039}
     2040
     2041
     2042static void dpdk_fin_packet(libtrace_packet_t *packet)
     2043{
     2044        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     2045                rte_pktmbuf_free(packet->buffer);
     2046                packet->buffer = NULL;
     2047        }
     2048}
     2049
     2050/** Reads at least one packet or returns an error
     2051 */
     2052static inline int dpdk_read_packet_stream (libtrace_t *libtrace,
     2053                                           dpdk_per_stream_t *stream,
     2054                                           libtrace_message_queue_t *mesg,
     2055                                           struct rte_mbuf* pkts_burst[],
     2056                                           size_t nb_packets) {
     2057        size_t nb_rx; /* Number of rx packets we've recevied */
     2058        while (1) {
     2059                /* Poll for a batch of packets */
     2060                nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
     2061                                         stream->queue_id, pkts_burst, nb_packets);
     2062                if (nb_rx > 0) {
     2063                        /* Got some packets - otherwise we keep spining */
     2064                        dpdk_ready_pkts(libtrace, stream, pkts_burst, nb_rx);
     2065                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     2066                        return nb_rx;
     2067                }
     2068                /* Check the message queue this could be less than 0 */
     2069                if (mesg && libtrace_message_queue_count(mesg) > 0)
     2070                        return READ_MESSAGE;
     2071                if (libtrace_halt)
     2072                        return READ_EOF;
     2073                /* Wait a while, polling on memory degrades performance
     2074                 * This relieves the pressure on memory allowing the NIC to DMA */
     2075                rte_delay_us(10);
     2076        }
     2077
     2078        /* We'll never get here - but if we did it would be bad */
     2079        return READ_ERROR;
     2080}
     2081
     2082static int dpdk_pread_packets (libtrace_t *libtrace,
     2083                                    libtrace_thread_t *t,
     2084                                    libtrace_packet_t **packets,
     2085                                    size_t nb_packets) {
     2086        int nb_rx; /* Number of rx packets we've recevied */
     2087        struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
     2088        int i;
     2089        dpdk_per_stream_t *stream = t->format_data;
     2090
     2091        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
     2092                                         pkts_burst, nb_packets);
     2093
     2094        if (nb_rx > 0) {
     2095                for (i = 0; i < nb_rx; ++i) {
     2096                        if (packets[i]->buffer != NULL) {
     2097                                /* The packet should always be finished */
     2098                                assert(packets[i]->buf_control == TRACE_CTRL_PACKET);
     2099                                free(packets[i]->buffer);
     2100                        }
     2101                        packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
     2102                        packets[i]->type = TRACE_RT_DATA_DPDK;
     2103                        packets[i]->buffer = pkts_burst[i];
     2104                        packets[i]->trace = libtrace;
     2105                        packets[i]->error = 1;
     2106                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
     2107                }
     2108        }
     2109
     2110        return nb_rx;
    12912111}
    12922112
    12932113static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    1294     int nb_rx; /* Number of rx packets we've recevied */
    1295     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
    1296 
    1297     /* Free the last packet buffer */
    1298     if (packet->buffer != NULL) {
    1299         /* Buffer is owned by DPDK */
    1300         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1301             rte_pktmbuf_free(packet->buffer);
    1302             packet->buffer = NULL;
    1303         } else
    1304         /* Buffer is owned by packet i.e. has been malloc'd */
    1305         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1306             free(packet->buffer);
    1307             packet->buffer = NULL;
    1308         }
    1309     }
    1310    
    1311     packet->buf_control = TRACE_CTRL_EXTERNAL;
    1312     packet->type = TRACE_RT_DATA_DPDK;
    1313    
    1314     /* Wait for a packet */
    1315     while (1) {
    1316         /* Poll for a single packet */
    1317         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    1318                             FORMAT(libtrace)->queue_id, pkts_burst, 1);
    1319         if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    1320             return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    1321         }
    1322         if (libtrace_halt) {
    1323             return 0;
    1324         }
    1325     }
    1326    
    1327     /* We'll never get here - but if we did it would be bad */
    1328     return -1;
     2114        int nb_rx; /* Number of rx packets we've received */
     2115        dpdk_per_stream_t *stream = FORMAT_DATA_FIRST(libtrace);
     2116
     2117        /* Free the last packet buffer */
     2118        if (packet->buffer != NULL) {
     2119                /* The packet should always be finished */
     2120                assert(packet->buf_control == TRACE_CTRL_PACKET);
     2121                free(packet->buffer);
     2122                packet->buffer = NULL;
     2123        }
     2124
     2125        packet->buf_control = TRACE_CTRL_EXTERNAL;
     2126        packet->type = TRACE_RT_DATA_DPDK;
     2127
     2128        /* Check if we already have some packets buffered */
     2129        if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
     2130                packet->buffer = FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
     2131                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2132                return 1; // TODO should be bytes read, which essentially useless anyway
     2133        }
     2134
     2135        nb_rx = dpdk_read_packet_stream (libtrace, stream, NULL,
     2136                                         FORMAT(libtrace)->burst_pkts, BURST_SIZE);
     2137
     2138        if (nb_rx > 0) {
     2139                FORMAT(libtrace)->burst_size = nb_rx;
     2140                FORMAT(libtrace)->burst_offset = 1;
     2141                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
     2142                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2143                return 1;
     2144        }
     2145        return nb_rx;
    13292146}
    13302147
    13312148static struct timeval dpdk_get_timeval (const libtrace_packet_t *packet) {
    1332     struct timeval tv;
    1333     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1334    
    1335     tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    1336     tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
    1337     return tv;
     2149        struct timeval tv;
     2150        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2151
     2152        tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
     2153        tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
     2154        return tv;
    13382155}
    13392156
    13402157static struct timespec dpdk_get_timespec (const libtrace_packet_t *packet) {
    1341     struct timespec ts;
    1342     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1343    
    1344     ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    1345     ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
    1346     return ts;
     2158        struct timespec ts;
     2159        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2160
     2161        ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
     2162        ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
     2163        return ts;
    13472164}
    13482165
    13492166static libtrace_linktype_t dpdk_get_link_type (const libtrace_packet_t *packet UNUSED) {
    1350     return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
     2167        return TRACE_TYPE_ETH; /* Always ethernet until proven otherwise */
    13512168}
    13522169
    13532170static libtrace_direction_t dpdk_get_direction (const libtrace_packet_t *packet) {
    1354     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1355     return (libtrace_direction_t) hdr->direction;
     2171        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2172        return (libtrace_direction_t) hdr->direction;
    13562173}
    13572174
    13582175static libtrace_direction_t dpdk_set_direction(libtrace_packet_t *packet, libtrace_direction_t direction) {
    1359     struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1360     hdr->direction = (uint8_t) direction;
    1361     return (libtrace_direction_t) hdr->direction;
    1362 }
    1363 
    1364 /*
    1365  * NOTE: Drops could occur for other reasons than running out of buffer
    1366  * space. Such as failed MAC checksums and oversized packets.
    1367  */
    1368 static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
    1369     struct rte_eth_stats stats = {0};
    1370    
    1371     if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1372         return UINT64_MAX;
    1373     /* Grab the current stats */
    1374     rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1375    
    1376     /* Get the drop counter */
    1377     return (uint64_t) stats.ierrors;
    1378 }
    1379 
    1380 static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
    1381     struct rte_eth_stats stats = {0};
    1382    
    1383     if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1384         return UINT64_MAX;
    1385     /* Grab the current stats */
    1386     rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1387    
    1388     /* Get the drop counter */
    1389     return (uint64_t) stats.ipackets;
    1390 }
    1391 
    1392 /*
    1393  * This is the number of packets filtered by the NIC
    1394  * and maybe ahead of number read using libtrace.
    1395  *
    1396  * XXX we are yet to implement any filtering, but if it was this should
    1397  * get the result. So this will just return 0 for now.
    1398  */
    1399 static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
    1400     struct rte_eth_stats stats = {0};
    1401    
    1402     if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1403         return UINT64_MAX;
    1404     /* Grab the current stats */
    1405     rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1406    
    1407     /* Get the drop counter */
    1408     return (uint64_t) stats.fdirmiss;
     2176        struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
     2177        hdr->direction = (uint8_t) direction;
     2178        return (libtrace_direction_t) hdr->direction;
     2179}
     2180
     2181static void dpdk_get_stats(libtrace_t *trace, libtrace_stat_t *stats) {
     2182        struct rte_eth_stats dev_stats = {0};
     2183
     2184        if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
     2185                return;
     2186
     2187        /* Grab the current stats */
     2188        rte_eth_stats_get(FORMAT(trace)->port, &dev_stats);
     2189
     2190        stats->captured_valid = true;
     2191        stats->captured = dev_stats.ipackets;
     2192
     2193        /* Not that we support adding filters but if we did this
     2194         * would work */
     2195        stats->filtered += dev_stats.fdirmiss;
     2196
     2197        stats->dropped_valid = true;
     2198        stats->dropped = dev_stats.imissed;
     2199
     2200        /* DPDK errors includes drops */
     2201        stats->errors_valid = true;
     2202        stats->errors = dev_stats.ierrors - dev_stats.imissed;
     2203
     2204        stats->received_valid = true;
     2205        stats->received = dev_stats.ipackets + dev_stats.imissed;
     2206
    14092207}
    14102208
     
    14142212 */
    14152213static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
    1416                                         libtrace_packet_t *packet) {
    1417     libtrace_eventobj_t event = {0,0,0.0,0};
    1418     int nb_rx; /* Number of receive packets we've read */
    1419     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
    1420    
    1421     do {
    1422    
    1423         /* See if we already have a packet waiting */
    1424         nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
    1425                         FORMAT(trace)->queue_id, pkts_burst, 1);
    1426        
    1427         if (nb_rx > 0) {
    1428             /* Free the last packet buffer */
    1429             if (packet->buffer != NULL) {
    1430                 /* Buffer is owned by DPDK */
    1431                 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1432                     rte_pktmbuf_free(packet->buffer);
    1433                     packet->buffer = NULL;
    1434                 } else
    1435                 /* Buffer is owned by packet i.e. has been malloc'd */
    1436                 if (packet->buf_control == TRACE_CTRL_PACKET) {
    1437                     free(packet->buffer);
    1438                     packet->buffer = NULL;
    1439                 }
    1440             }
    1441            
    1442             packet->buf_control = TRACE_CTRL_EXTERNAL;
    1443             packet->type = TRACE_RT_DATA_DPDK;
    1444             event.type = TRACE_EVENT_PACKET;
    1445             event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
    1446            
    1447             /* XXX - Check this passes the filter trace_read_packet normally
    1448              * does this for us but this wont */
    1449             if (trace->filter) {
    1450                 if (!trace_apply_filter(trace->filter, packet)) {
    1451                     /* Failed the filter so we loop for another packet */
    1452                     trace->filtered_packets ++;
    1453                     continue;
    1454                 }
    1455             }
    1456             trace->accepted_packets ++;
    1457         } else {
    1458             /* We only want to sleep for a very short time - we are non-blocking */
    1459             event.type = TRACE_EVENT_SLEEP;
    1460             event.seconds = 0.0001;
    1461             event.size = 0;
    1462         }
    1463        
    1464         /* If we get here we have our event */
    1465         break;
    1466     } while (1);
    1467 
    1468     return event;
    1469 }
    1470 
     2214                                            libtrace_packet_t *packet) {
     2215        libtrace_eventobj_t event = {0,0,0.0,0};
     2216        int nb_rx; /* Number of receive packets we've read */
     2217        struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
     2218
     2219        do {
     2220
     2221                /* See if we already have a packet waiting */
     2222                nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
     2223                                         FORMAT_DATA_FIRST(trace)->queue_id,
     2224                                         pkts_burst, 1);
     2225
     2226                if (nb_rx > 0) {
     2227                        /* Free the last packet buffer */
     2228                        if (packet->buffer != NULL) {
     2229                                /* The packet should always be finished */
     2230                                assert(packet->buf_control == TRACE_CTRL_PACKET);
     2231                                free(packet->buffer);
     2232                                packet->buffer = NULL;
     2233                        }
     2234
     2235                        packet->buf_control = TRACE_CTRL_EXTERNAL;
     2236                        packet->type = TRACE_RT_DATA_DPDK;
     2237                        event.type = TRACE_EVENT_PACKET;
     2238                        dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
     2239                        packet->buffer = FORMAT(trace)->burst_pkts[0];
     2240                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
     2241                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
     2242
     2243                        /* XXX - Check this passes the filter trace_read_packet normally
     2244                         * does this for us but this wont */
     2245                        if (trace->filter) {
     2246                                if (!trace_apply_filter(trace->filter, packet)) {
     2247                                        /* Failed the filter so we loop for another packet */
     2248                                        trace->filtered_packets ++;
     2249                                        continue;
     2250                                }
     2251                        }
     2252                        trace->accepted_packets ++;
     2253                } else {
     2254                        /* We only want to sleep for a very short time - we are non-blocking */
     2255                        event.type = TRACE_EVENT_SLEEP;
     2256                        event.seconds = 0.0001;
     2257                        event.size = 0;
     2258                }
     2259
     2260                /* If we get here we have our event */
     2261                break;
     2262        } while (1);
     2263
     2264        return event;
     2265}
    14712266
    14722267static void dpdk_help(void) {
    1473     printf("dpdk format module: $Revision: 1752 $\n");
    1474     printf("Supported input URIs:\n");
    1475     printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
    1476     printf("\tThe -<coreid> is optional \n");
    1477     printf("\t e.g. dpdk:0000:01:00.1\n");
    1478     printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
    1479     printf("\t By default the last CPU core is used if not otherwise specified.\n");
    1480     printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
    1481     printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
    1482     printf("\n");
    1483     printf("Supported output URIs:\n");
    1484     printf("\tSame format as the input URI.\n");
    1485     printf("\t e.g. dpdk:0000:01:00.1\n");
    1486     printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
    1487     printf("\n");
    1488 }
    1489 
    1490  static struct libtrace_format_t dpdk = {
     2268        printf("dpdk format module: $Revision: 1752 $\n");
     2269        printf("Supported input URIs:\n");
     2270        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
     2271        printf("\tThe -<coreid> is optional \n");
     2272        printf("\t e.g. dpdk:0000:01:00.1\n");
     2273        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n\n");
     2274        printf("\t By default the last CPU core is used if not otherwise specified.\n");
     2275        printf("\t Only a single libtrace instance of dpdk can use the same CPU core.\n");
     2276        printf("\t Support for multiple simultaneous instances of dpdk format is currently limited.\n");
     2277        printf("\n");
     2278        printf("Supported output URIs:\n");
     2279        printf("\tSame format as the input URI.\n");
     2280        printf("\t e.g. dpdk:0000:01:00.1\n");
     2281        printf("\t e.g. dpdk:0000:01:00.1-2 (Use the second CPU core)\n");
     2282        printf("\n");
     2283}
     2284
     2285static struct libtrace_format_t dpdk = {
    14912286        "dpdk",
    1492         "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
     2287        "$Id$",
    14932288        TRACE_FORMAT_DPDK,
    1494         NULL,                   /* probe filename */
    1495         NULL,                               /* probe magic */
    1496         dpdk_init_input,            /* init_input */
    1497         dpdk_config_input,          /* config_input */
    1498         dpdk_start_input,           /* start_input */
    1499         dpdk_pause_input,           /* pause_input */
    1500         dpdk_init_output,           /* init_output */
    1501         NULL,                               /* config_output */
    1502         dpdk_start_output,          /* start_ouput */
    1503         dpdk_fin_input,             /* fin_input */
    1504         dpdk_fin_output,        /* fin_output */
    1505         dpdk_read_packet,           /* read_packet */
    1506         dpdk_prepare_packet,    /* prepare_packet */
    1507         NULL,                               /* fin_packet */
    1508         dpdk_write_packet,          /* write_packet */
    1509         dpdk_get_link_type,         /* get_link_type */
    1510         dpdk_get_direction,         /* get_direction */
    1511         dpdk_set_direction,         /* set_direction */
    1512         NULL,                               /* get_erf_timestamp */
    1513         dpdk_get_timeval,           /* get_timeval */
    1514         dpdk_get_timespec,          /* get_timespec */
    1515         NULL,                               /* get_seconds */
    1516         NULL,                               /* seek_erf */
    1517         NULL,                               /* seek_timeval */
    1518         NULL,                               /* seek_seconds */
    1519         dpdk_get_capture_length,/* get_capture_length */
    1520         dpdk_get_wire_length,   /* get_wire_length */
    1521         dpdk_get_framing_length,/* get_framing_length */
    1522         dpdk_set_capture_length,/* set_capture_length */
    1523         NULL,                               /* get_received_packets */
    1524         dpdk_get_filtered_packets,/* get_filtered_packets */
    1525         dpdk_get_dropped_packets,/* get_dropped_packets */
    1526     dpdk_get_captured_packets,/* get_captured_packets */
    1527         NULL,                       /* get_fd */
    1528         dpdk_trace_event,               /* trace_event */
    1529     dpdk_help,              /* help */
    1530         NULL
     2289        NULL,                               /* probe filename */
     2290        NULL,                               /* probe magic */
     2291        dpdk_init_input,                    /* init_input */
     2292        dpdk_config_input,                  /* config_input */
     2293        dpdk_start_input,                   /* start_input */
     2294        dpdk_pause_input,                   /* pause_input */
     2295        dpdk_init_output,                   /* init_output */
     2296        NULL,                               /* config_output */
     2297        dpdk_start_output,                  /* start_ouput */
     2298        dpdk_fin_input,                     /* fin_input */
     2299        dpdk_fin_output,                    /* fin_output */
     2300        dpdk_read_packet,                   /* read_packet */
     2301        dpdk_prepare_packet,                /* prepare_packet */
     2302        dpdk_fin_packet,                    /* fin_packet */
     2303        dpdk_write_packet,                  /* write_packet */
     2304        dpdk_get_link_type,                 /* get_link_type */
     2305        dpdk_get_direction,                 /* get_direction */
     2306        dpdk_set_direction,                 /* set_direction */
     2307        NULL,                               /* get_erf_timestamp */
     2308        dpdk_get_timeval,                   /* get_timeval */
     2309        dpdk_get_timespec,                  /* get_timespec */
     2310        NULL,                               /* get_seconds */
     2311        NULL,                               /* seek_erf */
     2312        NULL,                               /* seek_timeval */
     2313        NULL,                               /* seek_seconds */
     2314        dpdk_get_capture_length,            /* get_capture_length */
     2315        dpdk_get_wire_length,               /* get_wire_length */
     2316        dpdk_get_framing_length,            /* get_framing_length */
     2317        dpdk_set_capture_length,            /* set_capture_length */
     2318        NULL,                               /* get_received_packets */
     2319        NULL,                               /* get_filtered_packets */
     2320        NULL,                               /* get_dropped_packets */
     2321        dpdk_get_stats,                     /* get_statistics */
     2322        NULL,                               /* get_fd */
     2323        dpdk_trace_event,                   /* trace_event */
     2324        dpdk_help,                          /* help */
     2325        NULL,                               /* next pointer */
     2326        {true, 8},                          /* Live, NICs typically have 8 threads */
     2327        dpdk_pstart_input,                  /* pstart_input */
     2328        dpdk_pread_packets,                 /* pread_packets */
     2329        dpdk_pause_input,                   /* ppause */
     2330        dpdk_fin_input,                     /* p_fin */
     2331        dpdk_pregister_thread,              /* pregister_thread */
     2332        dpdk_punregister_thread,            /* punregister_thread */
     2333        NULL                                /* get thread stats */
    15312334};
    15322335
  • lib/format_duck.c

    rc5ac872 r5ab626a  
    362362        NULL,                           /* get_filtered_packets */
    363363        NULL,                           /* get_dropped_packets */
    364         NULL,                           /* get_captured_packets */
     364        NULL,                           /* get_statistics */
    365365        NULL,                           /* get_fd */
    366366        NULL,                           /* trace_event */
    367367        duck_help,                      /* help */
    368         NULL                            /* next pointer */
     368        NULL,                            /* next pointer */
     369        NON_PARALLEL(false)
    369370};
    370371
  • lib/format_erf.c

    rc69aecb r5ab626a  
    833833        NULL,                           /* get_filtered_packets */
    834834        erf_get_dropped_packets,        /* get_dropped_packets */
    835         NULL,                           /* get_captured_packets */
     835        NULL,                           /* get_statistics */
    836836        NULL,                           /* get_fd */
    837837        erf_event,                      /* trace_event */
    838838        erf_help,                       /* help */
    839         NULL                            /* next pointer */
     839        NULL,                           /* next pointer */
     840        NON_PARALLEL(false)
    840841};
    841842
     
    876877        NULL,                           /* get_filtered_packets */
    877878        erf_get_dropped_packets,        /* get_dropped_packets */
    878         NULL,                           /* get_captured_packets */
     879        NULL,                           /* get_statistics */
    879880        NULL,                           /* get_fd */
    880881        erf_event,                      /* trace_event */
    881882        erf_help,                       /* help */
    882         NULL                            /* next pointer */
     883        NULL,                           /* next pointer */
     884        NON_PARALLEL(false)
    883885};
    884886
  • lib/format_legacy.c

    r1ca603b r5ab626a  
    548548        NULL,                           /* get_filtered_packets */
    549549        NULL,                           /* get_dropped_packets */
    550         NULL,                           /* get_captured_packets */
     550        NULL,                           /* get_statistics */
    551551        NULL,                           /* get_fd */
    552552        trace_event_trace,              /* trace_event */
    553553        legacyatm_help,                 /* help */
    554         NULL                            /* next pointer */
     554        NULL,                           /* next pointer */
     555        NON_PARALLEL(false)
    555556};
    556557
     
    591592        NULL,                           /* get_filtered_packets */
    592593        NULL,                           /* get_dropped_packets */
    593         NULL,                           /* get_captured_packets */
     594        NULL,                           /* get_statistics */
    594595        NULL,                           /* get_fd */
    595596        trace_event_trace,              /* trace_event */
    596597        legacyeth_help,                 /* help */
    597         NULL                            /* next pointer */
     598        NULL,                           /* next pointer */
     599        NON_PARALLEL(false)
    598600};
    599601
     
    634636        NULL,                           /* get_filtered_packets */
    635637        NULL,                           /* get_dropped_packets */
    636         NULL,                           /* get_captured_packets */
     638        NULL,                           /* get_statistics */
    637639        NULL,                           /* get_fd */
    638640        trace_event_trace,              /* trace_event */
    639641        legacypos_help,                 /* help */
    640642        NULL,                           /* next pointer */
     643        NON_PARALLEL(false)
    641644};
    642645
     
    677680        NULL,                           /* get_filtered_packets */
    678681        NULL,                           /* get_dropped_packets */
    679         NULL,                           /* get_captured_packets */
     682        NULL,                           /* get_statistics */
    680683        NULL,                           /* get_fd */
    681684        trace_event_trace,              /* trace_event */
    682685        legacynzix_help,                /* help */
    683686        NULL,                           /* next pointer */
     687        NON_PARALLEL(false)
    684688};
    685689       
  • lib/format_pcap.c

    r4649fea r5ab626a  
    830830        NULL,                           /* get_filtered_packets */
    831831        NULL,                           /* get_dropped_packets */
    832         NULL,                           /* get_captured_packets */
     832        NULL,                           /* get_statistics */
    833833        NULL,                           /* get_fd */
    834834        trace_event_trace,              /* trace_event */
    835835        pcap_help,                      /* help */
    836         NULL                            /* next pointer */
     836        NULL,                   /* next pointer */
     837        NON_PARALLEL(false)
    837838};
    838839
     
    873874        NULL,                           /* get_filtered_packets */
    874875        pcap_get_dropped_packets,       /* get_dropped_packets */
    875         NULL,                           /* get_captured_packets */
     876        NULL,                           /* get_statistics */
    876877        pcap_get_fd,                    /* get_fd */
    877878        trace_event_device,             /* trace_event */
    878879        pcapint_help,                   /* help */
    879         NULL                            /* next pointer */
     880        NULL,                   /* next pointer */
     881        NON_PARALLEL(true)
    880882};
    881883
  • lib/format_pcapfile.c

    rc69aecb r6b98325  
    264264                case TRACE_OPTION_PROMISC:
    265265                case TRACE_OPTION_FILTER:
     266                case TRACE_OPTION_HASHER:
    266267                        /* All these are either unsupported or handled
    267268                         * by trace_config */
     
    782783        NULL,                           /* get_filtered_packets */
    783784        NULL,                           /* get_dropped_packets */
    784         NULL,                           /* get_captured_packets */
     785        NULL,                           /* get_statistics */
    785786        NULL,                           /* get_fd */
    786787        pcapfile_event,         /* trace_event */
    787788        pcapfile_help,                  /* help */
    788         NULL                            /* next pointer */
     789        NULL,                   /* next pointer */
     790        NON_PARALLEL(false)
    789791};
    790792
  • lib/format_rt.c

    rc5ac872 r5ab626a  
    458458                                /* This may fail on a non-Linux machine */
    459459                                if (trace_is_err(RT_INFO->dummy_ring)) {
    460                                         trace_perror(RT_INFO->dummy_ring, "Creating dead int trace");
     460                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
    461461                                        return -1;
    462462                                }
     
    859859        NULL,                           /* get_filtered_packets */
    860860        NULL,                           /* get_dropped_packets */
    861         NULL,                           /* get_captured_packets */
     861        NULL,                           /* get_statistics */
    862862        rt_get_fd,                      /* get_fd */
    863863        trace_event_rt,             /* trace_event */
    864864        rt_help,                        /* help */
    865         NULL                            /* next pointer */
     865        N