Changes in / [92d5f15:4631115]


Ignore:
Files:
40 added
1 deleted
25 edited

Legend:

Unmodified
Added
Removed
  • README

    r3e5518a r3e5518a  
     1This fork of Libtrace aims to support parallel packet processing.
     2
     3This is still work in progress and is full of bugs, some of the original
     4Libtrace functions might not function correctly breaking the supplied tools.
     5
    16libtrace 3.0.22
    27
  • configure.in

    r3e5518a r3e5518a  
    392392fi
    393393
     394# If we use DPDK we might be able to use libnuma
     395AC_CHECK_LIB(numa, numa_node_to_cpus, have_numa=1, have_numa=0)
     396
    394397# Checks for various "optional" libraries
    395398AC_CHECK_LIB(pthread, pthread_create, have_pthread=1, have_pthread=0)
     
    411414AC_CHECK_LIB(rt, clock_gettime, have_clock_gettime=1, have_clock_gettime=0)
    412415LIBS=
     416
     417if test "$have_numa" = 1; then
     418        LIBTRACE_LIBS="$LIBTRACE_LIBS -lnuma"
     419        AC_DEFINE(HAVE_LIBNUMA, 1, [Set to 1 if libnuma is found supported])
     420        with_numa=yes
     421else
     422        AC_DEFINE(HAVE_LIBNUMA, 0, [Set to 1 if libnuma is found supported])
     423        with_numa=no
     424fi
    413425
    414426if test "$dlfound" = 0; then
     
    688700if test x"$libtrace_dpdk" = xtrue; then
    689701    AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes])
     702    reportopt "Compiled with DPDK trace NUMA support" $with_numa
    690703elif test x"$want_dpdk" != "xno"; then
    691704#   We don't officially support DPDK so only report failure if the user
  • lib/Makefile.am

    r3fc3267 r6cf3ca0  
    22include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 
    33
    4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@
    5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@
     4AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread
     5AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread
    66
    77extra_DIST = format_template.c
    8 NATIVEFORMATS=format_linux.c
     8NATIVEFORMATS=format_linux_common.c format_linux_ring.c format_linux_int.c format_linux_common.h
    99BPFFORMATS=format_bpf.c
    1010
     
    2929NATIVEFORMATS+= format_dpdk.c
    3030# So we also make libtrace.mk in dpdk otherwise automake tries to expand
    31 # it to early which I cannot seem to stop unless we use a path that
     31# it too early which I cannot seem to stop unless we use a path that
    3232# doesn't exist currently
    3333export RTE_SDK=@RTE_SDK@
     
    4444endif
    4545
    46 libtrace_la_SOURCES = trace.c common.h \
     46libtrace_la_SOURCES = trace.c trace_parallel.c common.h \
    4747                format_erf.c format_pcap.c format_legacy.c \
    4848                format_rt.c format_helper.c format_helper.h format_pcapfile.c \
     
    5757                $(DAGSOURCE) format_erf.h \
    5858                $(BPFJITSOURCE) \
    59                 libtrace_arphrd.h
     59                libtrace_arphrd.h \
     60                data-struct/ring_buffer.c data-struct/vector.c \
     61                data-struct/message_queue.c data-struct/deque.c \
     62                data-struct/sliding_window.c data-struct/object_cache.c \
     63                data-struct/linked_list.c hash_toeplitz.c combiner_ordered.c \
     64                combiner_sorted.c combiner_unordered.c
    6065
    6166if DAG2_4
  • lib/format_atmhdr.c

    r5952ff0 rb13b939  
    231231        trace_event_trace,              /* trace_event */
    232232        NULL,                           /* help */
    233         NULL                            /* next pointer */
     233        NULL,                            /* next pointer */
     234        NON_PARALLEL(false)
    234235};
    235236       
  • lib/format_bpf.c

    r08f5060 r08f5060  
    614614        trace_event_device,     /* trace_event */
    615615        bpf_help,               /* help */
    616         NULL
     616        NULL,                   /* next pointer */
     617        NON_PARALLEL(true)
    617618};
    618619#else   /* HAVE_DECL_BIOCSETIF */
     
    663664        NULL,                   /* trace_event */
    664665        bpf_help,               /* help */
    665         NULL
     666        NULL,                   /* next pointer */
     667        NON_PARALLEL(true)
    666668};
    667669#endif  /* HAVE_DECL_BIOCSETIF */
  • lib/format_dag24.c

    rc70f59f rc70f59f  
    558558        trace_event_dag,                /* trace_event */
    559559        dag_help,                       /* help */
    560         NULL                            /* next pointer */
     560        NULL,                            /* next pointer */
     561    NON_PARALLEL(true)
    561562};
    562563
  • lib/format_dag25.c

    rc5ac872 rc5ac872  
    7979 */
    8080
    81 
    8281#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8382#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     83#define STREAM_DATA(x) ((struct dag_per_stream_t *)x->data)
    8484
    8585#define FORMAT_DATA DATA(libtrace)
     
    8787
    8888#define DUCK FORMAT_DATA->duck
     89
     90#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
     91#define FORMAT_DATA_FIRST ((struct dag_per_stream_t *)FORMAT_DATA_HEAD->data)
     92
    8993static struct libtrace_format_t dag;
    9094
     
    118122};
    119123
     124/* Data that is stored against each input stream */
     125struct dag_per_stream_t {
     126        /* DAG device */
     127        struct dag_dev_t *device;
     128        /* DAG stream number */
     129        uint16_t dagstream;
     130        /* Pointer to the last unread byte in the DAG memory */
     131        uint8_t *top;
     132        /* Pointer to the first unread byte in the DAG memory */
     133        uint8_t *bottom;
     134        /* Amount of data processed from the bottom pointer */
     135        uint32_t processed;
     136        /* Number of packets seen by the stream */
     137        uint64_t pkt_count;
     138        /* Drop count for this particular stream */
     139        uint64_t drops;
     140        /* Boolean values to indicate if a particular interface has been seen
     141         * or not. This is limited to four interfaces, which is enough to
     142         * support all current DAG cards */
     143        uint8_t seeninterface[4];
     144};
     145
    120146/* "Global" data that is stored for each DAG input trace */
    121147struct dag_format_data_t {
    122 
    123148        /* Data required for regular DUCK reporting */
     149        /* TODO: This doesn't work with the 10X2S card! I don't know how
     150         * DUCK stuff works and don't know how to fix it */
    124151        struct {
    125152                /* Timestamp of the last DUCK report */
    126153                uint32_t last_duck;
    127154                /* The number of seconds between each DUCK report */
    128                 uint32_t duck_freq;
     155                uint32_t duck_freq;
    129156                /* Timestamp of the last packet read from the DAG card */
    130                 uint32_t last_pkt;
     157                uint32_t last_pkt;
    131158                /* Dummy trace to ensure DUCK packets are dealt with using the
    132159                 * DUCK format functions */
    133                 libtrace_t *dummy_duck;
    134         } duck;
     160                libtrace_t *dummy_duck;
     161        } duck;
    135162
    136163        /* String containing the DAG device name */
    137164        char *device_name;
    138         /* The DAG device that we are reading from */
    139         struct dag_dev_t *device;
    140         /* The DAG stream that we are reading from */
    141         unsigned int dagstream;
    142         /* Boolean flag indicating whether the stream is currently attached */
     165        /* Boolean flag indicating whether the trace is currently attached */
    143166        int stream_attached;
    144         /* Pointer to the first unread byte in the DAG memory hole */
    145         uint8_t *bottom;
    146         /* Pointer to the last unread byte in the DAG memory hole */
    147         uint8_t *top;
    148         /* The amount of data processed thus far from the bottom pointer */
    149         uint32_t processed;
    150         /* The number of packets that have been dropped */
    151         uint64_t drops;
    152 
    153         uint8_t seeninterface[4];
     167
     168        /* Data stored against each DAG input stream */
     169        libtrace_list_t *per_stream;
    154170};
    155171
     
    207223
    208224/* Initialises the DAG output data structure */
    209 static void dag_init_format_out_data(libtrace_out_t *libtrace) {
    210         libtrace->format_data = (struct dag_format_data_out_t *) malloc(sizeof(struct dag_format_data_out_t));
     225static void dag_init_format_out_data(libtrace_out_t *libtrace)
     226{
     227        libtrace->format_data = (struct dag_format_data_out_t *)
     228                malloc(sizeof(struct dag_format_data_out_t));
    211229        // no DUCK on output
    212230        FORMAT_DATA_OUT->stream_attached = 0;
     
    219237
    220238/* Initialises the DAG input data structure */
    221 static void dag_init_format_data(libtrace_t *libtrace) {
     239static void dag_init_format_data(libtrace_t *libtrace)
     240{
     241        struct dag_per_stream_t stream_data;
     242
    222243        libtrace->format_data = (struct dag_format_data_t *)
    223244                malloc(sizeof(struct dag_format_data_t));
    224245        DUCK.last_duck = 0;
    225         DUCK.duck_freq = 0;
    226         DUCK.last_pkt = 0;
    227         DUCK.dummy_duck = NULL;
    228         FORMAT_DATA->stream_attached = 0;
    229         FORMAT_DATA->drops = 0;
     246        DUCK.duck_freq = 0;
     247        DUCK.last_pkt = 0;
     248        DUCK.dummy_duck = NULL;
     249
     250        FORMAT_DATA->per_stream =
     251                libtrace_list_init(sizeof(stream_data));
     252        assert(FORMAT_DATA->per_stream != NULL);
     253
     254        /* We'll start with just one instance of stream_data, and we'll
     255         * add more later if we need them */
     256        memset(&stream_data, 0, sizeof(stream_data));
     257        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
    230258        FORMAT_DATA->device_name = NULL;
    231         FORMAT_DATA->device = NULL;
    232         FORMAT_DATA->dagstream = 0;
    233         FORMAT_DATA->processed = 0;
    234         FORMAT_DATA->bottom = NULL;
    235         FORMAT_DATA->top = NULL;
    236         memset(FORMAT_DATA->seeninterface, 0, sizeof(FORMAT_DATA->seeninterface));
    237259}
    238260
     
    241263 *
    242264 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    243 static struct dag_dev_t *dag_find_open_device(char *dev_name) {
     265static struct dag_dev_t *dag_find_open_device(char *dev_name)
     266{
    244267        struct dag_dev_t *dag_dev;
    245268
     
    252275                        dag_dev->ref_count ++;
    253276                        return dag_dev;
    254 
    255277                }
    256278                dag_dev = dag_dev->next;
    257279        }
    258280        return NULL;
    259 
    260 
    261281}
    262282
     
    267287 *
    268288 * NOTE: This function assumes the open_dag_mutex is held by the caller */
    269 static void dag_close_device(struct dag_dev_t *dev) {
     289static void dag_close_device(struct dag_dev_t *dev)
     290{
    270291        /* Need to remove from the device list */
    271 
    272292        assert(dev->ref_count == 0);
    273293
     
    292312 *
    293313 * NOTE: this function should only be called when opening a DAG device for
    294  * writing - there is little practical difference between this and the 
     314 * writing - there is little practical difference between this and the
    295315 * function below that covers the reading case, but we need the output trace
    296  * object to report errors properly so the two functions take slightly 
     316 * object to report errors properly so the two functions take slightly
    297317 * different arguments. This is really lame and there should be a much better
    298318 * way of doing this.
    299319 *
    300  * NOTE: This function assumes the open_dag_mutex is held by the caller 
     320 * NOTE: This function assumes the open_dag_mutex is held by the caller
    301321 */
    302 static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace, char *dev_name) {
     322static struct dag_dev_t *dag_open_output_device(libtrace_out_t *libtrace,
     323                                                char *dev_name)
     324{
    303325        struct stat buf;
    304326        int fd;
     
    309331                trace_set_err_out(libtrace,errno,"stat(%s)",dev_name);
    310332                return NULL;
    311 }
     333        }
    312334
    313335        /* Make sure it is the appropriate type of device */
     
    346368 *
    347369 * NOTE: this function should only be called when opening a DAG device for
    348  * reading - there is little practical difference between this and the 
     370 * reading - there is little practical difference between this and the
    349371 * function above that covers the writing case, but we need the input trace
    350  * object to report errors properly so the two functions take slightly 
     372 * object to report errors properly so the two functions take slightly
    351373 * different arguments. This is really lame and there should be a much better
    352374 * way of doing this.
     
    359381
    360382        /* Make sure the device exists */
    361         if (stat(dev_name, &buf) == -1) {
    362                 trace_set_err(libtrace,errno,"stat(%s)",dev_name);
    363                 return NULL;
    364         }
     383        if (stat(dev_name, &buf) == -1) {
     384                trace_set_err(libtrace,errno,"stat(%s)",dev_name);
     385                return NULL;
     386        }
    365387
    366388        /* Make sure it is the appropriate type of device */
     
    368390                /* Try opening the DAG device */
    369391                if((fd = dag_open(dev_name)) < 0) {
    370                         trace_set_err(libtrace,errno,"Cannot open DAG %s",
    371                                         dev_name);
    372                         return NULL;
    373                 }
     392                        trace_set_err(libtrace,errno,"Cannot open DAG %s",
     393                                      dev_name);
     394                        return NULL;
     395                }
    374396        } else {
    375397                trace_set_err(libtrace,errno,"Not a valid dag device: %s",
    376                                 dev_name);
    377                 return NULL;
    378         }
     398                              dev_name);
     399                return NULL;
     400        }
    379401
    380402        /* Add the device to our device list - it is just a doubly linked
     
    397419
    398420/* Creates and initialises a DAG output trace */
    399 static int dag_init_output(libtrace_out_t *libtrace) {
     421static int dag_init_output(libtrace_out_t *libtrace)
     422{
    400423        char *scan = NULL;
    401424        struct dag_dev_t *dag_device = NULL;
    402425        int stream = 1;
    403        
     426
    404427        /* XXX I don't know if this is important or not, but this function
    405428         * isn't present in all of the driver releases that this code is
     
    411434
    412435        dag_init_format_out_data(libtrace);
    413         /* Grab the mutex while we're likely to be messing with the device 
     436        /* Grab the mutex while we're likely to be messing with the device
    414437         * list */
    415438        pthread_mutex_lock(&open_dag_mutex);
    416        
     439
    417440        /* Specific streams are signified using a comma in the libtrace URI,
    418441         * e.g. dag:/dev/dag0,1 refers to stream 1 on the dag0 device.
     
    460483
    461484        dag_init_format_data(libtrace);
    462         /* Grab the mutex while we're likely to be messing with the device 
     485        /* Grab the mutex while we're likely to be messing with the device
    463486         * list */
    464487        pthread_mutex_lock(&open_dag_mutex);
    465        
    466        
    467         /* Specific streams are signified using a comma in the libtrace URI,
     488
     489
     490        /* DAG cards support multiple streams. In a single threaded capture,
     491         * these are specified using a comma in the libtrace URI,
    468492         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    469493         *
    470          * If no stream is specified, we will read from stream 0 */
     494         * If no stream is specified, we will read from stream 0 with
     495         * one thread
     496         */
    471497        if ((scan = strchr(libtrace->uridata,',')) == NULL) {
    472498                FORMAT_DATA->device_name = strdup(libtrace->uridata);
     
    477503        }
    478504
    479         FORMAT_DATA->dagstream = stream;
     505        FORMAT_DATA_FIRST->dagstream = stream;
    480506
    481507        /* See if our DAG device is already open */
     
    496522        }
    497523
    498         FORMAT_DATA->device = dag_device;
    499 
    500         /* See Config_Status_API_Programming_Guide.pdf from the Endace Dag Documentation */
    501         /* Check kBooleanAttributeActive is true -- no point capturing on an interface that's disabled
    502  
    503         *  The symptom of the port being disabled is that libtrace will appear to hang.
    504         */
     524        FORMAT_DATA_FIRST->device = dag_device;
     525
     526        /* See Config_Status_API_Programming_Guide.pdf from the Endace
     527           Dag Documentation */
     528        /* Check kBooleanAttributeActive is true -- no point capturing
     529         * on an interface that's disabled
     530         *
     531         * The symptom of the port being disabled is that libtrace
     532         * will appear to hang. */
    505533        /* Check kBooleanAttributeFault is false */
    506534        /* Check kBooleanAttributeLocalFault is false */
     
    508536        /* Check kBooleanAttributePeerLink ? */
    509537
    510         /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based on libtrace promisc attribute?*/
     538        /* Set kBooleanAttributePromisc/kBooleanPromiscuousMode based
     539           on libtrace promisc attribute?*/
    511540        /* Set kUint32AttributeSnapLength to the snaplength */
    512541
    513542        pthread_mutex_unlock(&open_dag_mutex);
    514         return 0;
     543        return 0;
    515544}
    516545
    517546/* Configures a DAG input trace */
    518547static int dag_config_input(libtrace_t *libtrace, trace_option_t option,
    519                                 void *data) {
    520         char conf_str[4096];
     548                            void *data)
     549{
     550        char conf_str[4096];
    521551        switch(option) {
    522                 case TRACE_OPTION_META_FREQ:
    523                         /* This option is used to specify the frequency of DUCK
    524                          * updates */
    525                         DUCK.duck_freq = *(int *)data;
    526                         return 0;
    527                 case TRACE_OPTION_SNAPLEN:
    528                         /* Tell the card our new snap length */
    529                         snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
    530                         if (dag_configure(FORMAT_DATA->device->fd,
    531                                                 conf_str) != 0) {
    532                                 trace_set_err(libtrace, errno, "Failed to configure snaplen on DAG card: %s", libtrace->uridata);
    533                                 return -1;
    534                         }
    535                         return 0;
    536                 case TRACE_OPTION_PROMISC:
    537                         /* DAG already operates in a promisc fashion */
    538                         return -1;
    539                 case TRACE_OPTION_FILTER:
    540                         /* We don't yet support pushing filters into DAG
    541                          * cards */
    542                         return -1;
    543                 case TRACE_OPTION_EVENT_REALTIME:
    544                         /* Live capture is always going to be realtime */
     552        case TRACE_OPTION_META_FREQ:
     553                /* This option is used to specify the frequency of DUCK
     554                 * updates */
     555                DUCK.duck_freq = *(int *)data;
     556                return 0;
     557        case TRACE_OPTION_SNAPLEN:
     558                /* Tell the card our new snap length */
     559                snprintf(conf_str, 4096, "varlen slen=%i", *(int *)data);
     560                if (dag_configure(FORMAT_DATA_FIRST->device->fd,
     561                                  conf_str) != 0) {
     562                        trace_set_err(libtrace, errno, "Failed to configure "
     563                                      "snaplen on DAG card: %s",
     564                                      libtrace->uridata);
    545565                        return -1;
    546         }
     566                }
     567                return 0;
     568        case TRACE_OPTION_PROMISC:
     569                /* DAG already operates in a promisc fashion */
     570                return -1;
     571        case TRACE_OPTION_FILTER:
     572                /* We don't yet support pushing filters into DAG
     573                 * cards */
     574                return -1;
     575        case TRACE_OPTION_EVENT_REALTIME:
     576                /* Live capture is always going to be realtime */
     577                return -1;
     578        }
    547579        return -1;
    548580}
    549581
    550582/* Starts a DAG output trace */
    551 static int dag_start_output(libtrace_out_t *libtrace) {
     583static int dag_start_output(libtrace_out_t *libtrace)
     584{
    552585        struct timeval zero, nopoll;
    553586
     
    557590
    558591        /* Attach and start the DAG stream */
    559 
    560592        if (dag_attach_stream(FORMAT_DATA_OUT->device->fd,
    561593                        FORMAT_DATA_OUT->dagstream, 0, 4 * 1024 * 1024) < 0) {
     
    572604
    573605        /* We don't want the dag card to do any sleeping */
    574 
    575606        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
    576607                        FORMAT_DATA_OUT->dagstream, 0, &zero,
     
    581612
    582613/* Starts a DAG input trace */
    583 static int dag_start_input(libtrace_t *libtrace) {
    584         struct timeval zero, nopoll;
    585         uint8_t *top, *bottom, *starttop;
     614static int dag_start_input(libtrace_t *libtrace)
     615{
     616        struct timeval zero, nopoll;
     617        uint8_t *top, *bottom, *starttop;
    586618        top = bottom = NULL;
    587619
    588620        zero.tv_sec = 0;
    589         zero.tv_usec = 10000;
    590         nopoll = zero;
     621        zero.tv_usec = 10000;
     622        nopoll = zero;
    591623
    592624        /* Attach and start the DAG stream */
    593         if (dag_attach_stream(FORMAT_DATA->device->fd,
    594                                 FORMAT_DATA->dagstream, 0, 0) < 0) {
    595                 trace_set_err(libtrace, errno, "Cannot attach DAG stream");
    596                 return -1;
    597         }
    598 
    599         if (dag_start_stream(FORMAT_DATA->device->fd,
    600                                 FORMAT_DATA->dagstream) < 0) {
    601                 trace_set_err(libtrace, errno, "Cannot start DAG stream");
    602                 return -1;
    603         }
     625        if (dag_attach_stream(FORMAT_DATA_FIRST->device->fd,
     626                              FORMAT_DATA_FIRST->dagstream, 0, 0) < 0) {
     627                trace_set_err(libtrace, errno, "Cannot attach DAG stream");
     628                return -1;
     629        }
     630
     631        if (dag_start_stream(FORMAT_DATA_FIRST->device->fd,
     632                             FORMAT_DATA_FIRST->dagstream) < 0) {
     633                trace_set_err(libtrace, errno, "Cannot start DAG stream");
     634                return -1;
     635        }
    604636        FORMAT_DATA->stream_attached = 1;
    605        
     637
    606638        /* We don't want the dag card to do any sleeping */
    607         dag_set_stream_poll(FORMAT_DATA->device->fd,
    608                                 FORMAT_DATA->dagstream, 0, &zero,
    609                                 &nopoll);
    610 
    611         starttop = dag_advance_stream(FORMAT_DATA->device->fd,
    612                                         FORMAT_DATA->dagstream,
    613                                         &bottom);
     639        dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd,
     640                            FORMAT_DATA_FIRST->dagstream, 0, &zero,
     641                            &nopoll);
     642
     643        starttop = dag_advance_stream(FORMAT_DATA_FIRST->device->fd,
     644                                      FORMAT_DATA_FIRST->dagstream,
     645                                      &bottom);
    614646
    615647        /* Should probably flush the memory hole now */
     
    617649        while (starttop - bottom > 0) {
    618650                bottom += (starttop - bottom);
    619                 top = dag_advance_stream(FORMAT_DATA->device->fd,
    620                                         FORMAT_DATA->dagstream,
    621                                         &bottom);
    622         }
    623         FORMAT_DATA->top = top;
    624         FORMAT_DATA->bottom = bottom;
    625         FORMAT_DATA->processed = 0;
    626         FORMAT_DATA->drops = 0;
     651                top = dag_advance_stream(FORMAT_DATA_FIRST->device->fd,
     652                                         FORMAT_DATA_FIRST->dagstream,
     653                                         &bottom);
     654        }
     655        FORMAT_DATA_FIRST->top = top;
     656        FORMAT_DATA_FIRST->bottom = bottom;
     657        FORMAT_DATA_FIRST->processed = 0;
     658        FORMAT_DATA_FIRST->drops = 0;
    627659
    628660        return 0;
    629661}
    630662
     663static int dag_pstart_input(libtrace_t *libtrace)
     664{
     665        char *scan, *tok;
     666        uint16_t stream_count = 0, max_streams;
     667        int iserror = 0;
     668        struct dag_per_stream_t stream_data;
     669
     670        /* Check we aren't trying to create more threads than the DAG card can
     671         * handle */
     672        max_streams = dag_rx_get_stream_count(FORMAT_DATA_FIRST->device->fd);
     673        if (libtrace->perpkt_thread_count > max_streams) {
     674                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     675                              "trying to create too many threads (max is %u)",
     676                              max_streams);
     677                iserror = 1;
     678                goto cleanup;
     679        }
     680
     681        /* Get the stream names from the uri */
     682        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     683                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     684                              "format uri doesn't specify the DAG streams");
     685                iserror = 1;
     686                goto cleanup;
     687        }
     688
     689        scan++;
     690
     691        tok = strtok(scan, ",");
     692        while (tok != NULL) {
     693                /* Ensure we haven't specified too many streams */
     694                if (stream_count >= libtrace->perpkt_thread_count) {
     695                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     696                                      "format uri specifies too many streams. "
     697                                      "Max is %u", max_streams);
     698                        iserror = 1;
     699                        goto cleanup;
     700                }
     701
     702                /* Save the stream details */
     703                if (stream_count == 0) {
     704                        /* Special case where we update the existing stream
     705                         * data structure */
     706                        FORMAT_DATA_FIRST->dagstream = (uint16_t)atoi(tok);
     707                } else {
     708                        memset(&stream_data, 0, sizeof(stream_data));
     709                        stream_data.device = FORMAT_DATA_FIRST->device;
     710                        stream_data.dagstream = (uint16_t)atoi(tok);
     711                        libtrace_list_push_back(FORMAT_DATA->per_stream,
     712                                                &stream_data);
     713                }
     714
     715                stream_count++;
     716                tok = strtok(NULL, ",");
     717        }
     718
     719        FORMAT_DATA->stream_attached = 1;
     720
     721 cleanup:
     722        if (iserror) {
     723                return -1;
     724        } else {
     725                return 0;
     726        }
     727}
     728
    631729/* Pauses a DAG output trace */
    632 static int dag_pause_output(libtrace_out_t *libtrace) {
    633 
     730static int dag_pause_output(libtrace_out_t *libtrace)
     731{
    634732        /* Stop and detach the stream */
    635733        if (dag_stop_stream(FORMAT_DATA_OUT->device->fd,
    636                         FORMAT_DATA_OUT->dagstream) < 0) {
     734                            FORMAT_DATA_OUT->dagstream) < 0) {
    637735                trace_set_err_out(libtrace, errno, "Could not stop DAG stream");
    638736                return -1;
    639737        }
    640738        if (dag_detach_stream(FORMAT_DATA_OUT->device->fd,
    641                         FORMAT_DATA_OUT->dagstream) < 0) {
    642                 trace_set_err_out(libtrace, errno, "Could not detach DAG stream");
     739                              FORMAT_DATA_OUT->dagstream) < 0) {
     740                trace_set_err_out(libtrace, errno,
     741                                  "Could not detach DAG stream");
    643742                return -1;
    644743        }
     
    648747
    649748/* Pauses a DAG input trace */
    650 static int dag_pause_input(libtrace_t *libtrace) {
    651 
    652         /* Stop and detach the stream */
    653         if (dag_stop_stream(FORMAT_DATA->device->fd,
    654                                 FORMAT_DATA->dagstream) < 0) {
    655                 trace_set_err(libtrace, errno, "Could not stop DAG stream");
    656                 return -1;
    657         }
    658         if (dag_detach_stream(FORMAT_DATA->device->fd,
    659                                 FORMAT_DATA->dagstream) < 0) {
    660                 trace_set_err(libtrace, errno, "Could not detach DAG stream");
    661                 return -1;
    662         }
     749static int dag_pause_input(libtrace_t *libtrace)
     750{
     751        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     752
     753        /* Stop and detach each stream */
     754        while (tmp != NULL) {
     755                if (dag_stop_stream(STREAM_DATA(tmp)->device->fd,
     756                                    STREAM_DATA(tmp)->dagstream) < 0) {
     757                        trace_set_err(libtrace, errno,
     758                                      "Could not stop DAG stream");
     759                        printf("Count not stop DAG stream\n");
     760                        return -1;
     761                }
     762                if (dag_detach_stream(STREAM_DATA(tmp)->device->fd,
     763                                      STREAM_DATA(tmp)->dagstream) < 0) {
     764                        trace_set_err(libtrace, errno,
     765                                      "Could not detach DAG stream");
     766                        printf("Count not detach DAG stream\n");
     767                        return -1;
     768                }
     769
     770                tmp = tmp->next;
     771        }
     772
    663773        FORMAT_DATA->stream_attached = 0;
    664774        return 0;
    665775}
    666776
     777
     778
    667779/* Closes a DAG input trace */
    668 static int dag_fin_input(libtrace_t *libtrace) {
     780static int dag_fin_input(libtrace_t *libtrace)
     781{
     782        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     783
    669784        /* Need the lock, since we're going to be handling the device list */
    670785        pthread_mutex_lock(&open_dag_mutex);
    671        
     786
    672787        /* Detach the stream if we are not paused */
    673788        if (FORMAT_DATA->stream_attached)
    674789                dag_pause_input(libtrace);
    675         FORMAT_DATA->device->ref_count --;
    676 
    677         /* Close the DAG device if there are no more references to it */
    678         if (FORMAT_DATA->device->ref_count == 0)
    679                 dag_close_device(FORMAT_DATA->device);
     790
     791        /* Close any dag devices that have no more references */
     792        while (tmp != NULL) {
     793                STREAM_DATA(tmp)->device->ref_count--;
     794                if (STREAM_DATA(tmp)->device->ref_count == 0)
     795                        dag_close_device(STREAM_DATA(tmp)->device);
     796
     797                tmp = tmp->next;
     798        }
     799
    680800        if (DUCK.dummy_duck)
    681801                trace_destroy_dead(DUCK.dummy_duck);
     802
     803        /* Clear the list */
     804        libtrace_list_deinit(FORMAT_DATA->per_stream);
     805
    682806        if (FORMAT_DATA->device_name)
    683807                free(FORMAT_DATA->device_name);
    684808        free(libtrace->format_data);
    685809        pthread_mutex_unlock(&open_dag_mutex);
    686         return 0; /* success */
     810        return 0; /* success */
    687811}
    688812
    689813/* Closes a DAG output trace */
    690 static int dag_fin_output(libtrace_out_t *libtrace) {
    691        
     814static int dag_fin_output(libtrace_out_t *libtrace)
     815{
     816
    692817        /* Commit any outstanding traffic in the txbuffer */
    693818        if (FORMAT_DATA_OUT->waiting) {
    694                 dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    695                                 FORMAT_DATA_OUT->waiting );
    696         }
    697 
    698         /* Wait until the buffer is nearly clear before exiting the program,
     819                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     820                                           FORMAT_DATA_OUT->dagstream,
     821                                           FORMAT_DATA_OUT->waiting );
     822        }
     823
     824        /* Wait until the buffer is nearly clear before exiting the program,
    699825         * as we will lose packets otherwise */
    700         dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    701                         FORMAT_DATA_OUT->dagstream,
    702                         dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
    703                                         FORMAT_DATA_OUT->dagstream) - 8
    704                         );
     826        dag_tx_get_stream_space
     827                (FORMAT_DATA_OUT->device->fd,
     828                 FORMAT_DATA_OUT->dagstream,
     829                 dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
     830                                            FORMAT_DATA_OUT->dagstream) - 8);
    705831
    706832        /* Need the lock, since we're going to be handling the device list */
     
    752878
    753879        /* Allocate memory for the DUCK data */
    754         if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
    755                         !packet->buffer) {
    756                 packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
    757                 packet->buf_control = TRACE_CTRL_PACKET;
    758                 if (!packet->buffer) {
    759                         trace_set_err(libtrace, errno,
    760                                         "Cannot allocate packet buffer");
    761                         return -1;
    762                 }
    763         }
     880        if (packet->buf_control == TRACE_CTRL_EXTERNAL ||
     881            !packet->buffer) {
     882                packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE);
     883                packet->buf_control = TRACE_CTRL_PACKET;
     884                if (!packet->buffer) {
     885                        trace_set_err(libtrace, errno,
     886                                      "Cannot allocate packet buffer");
     887                        return -1;
     888                }
     889        }
    764890
    765891        /* DUCK doesn't have a format header */
     
    789915
    790916/* Determines the amount of data available to read from the DAG card */
    791 static int dag_available(libtrace_t *libtrace) {
    792         uint32_t diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     917static int dag_available(libtrace_t *libtrace,
     918                         struct dag_per_stream_t *stream_data)
     919{
     920        uint32_t diff = stream_data->top - stream_data->bottom;
    793921
    794922        /* If we've processed more than 4MB of data since we last called
    795923         * dag_advance_stream, then we should call it again to allow the
    796924         * space occupied by that 4MB to be released */
    797         if (diff >= dag_record_size && FORMAT_DATA->processed < 4 * 1024 * 1024)
     925        if (diff >= dag_record_size && stream_data->processed < 4 * 1024 * 1024)
    798926                return diff;
    799        
     927
    800928        /* Update the top and bottom pointers */
    801         FORMAT_DATA->top = dag_advance_stream(FORMAT_DATA->device->fd,
    802                         FORMAT_DATA->dagstream,
    803                         &(FORMAT_DATA->bottom));
    804        
    805         if (FORMAT_DATA->top == NULL) {
     929        stream_data->top = dag_advance_stream(stream_data->device->fd,
     930                                              stream_data->dagstream,
     931                                              &(stream_data->bottom));
     932
     933        if (stream_data->top == NULL) {
    806934                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
    807935                return -1;
    808936        }
    809         FORMAT_DATA->processed = 0;
    810         diff = FORMAT_DATA->top - FORMAT_DATA->bottom;
     937        stream_data->processed = 0;
     938        diff = stream_data->top - stream_data->bottom;
    811939        return diff;
    812940}
    813941
    814942/* Returns a pointer to the start of the next complete ERF record */
    815 static dag_record_t *dag_get_record(libtrace_t *libtrace) {
    816         dag_record_t *erfptr = NULL;
    817         uint16_t size;
    818         erfptr = (dag_record_t *)FORMAT_DATA->bottom;
     943static dag_record_t *dag_get_record(struct dag_per_stream_t *stream_data)
     944{
     945        dag_record_t *erfptr = NULL;
     946        uint16_t size;
     947
     948        erfptr = (dag_record_t *)stream_data->bottom;
    819949        if (!erfptr)
    820                 return NULL;
    821         size = ntohs(erfptr->rlen);
    822         assert( size >= dag_record_size );
     950                return NULL;
     951
     952        size = ntohs(erfptr->rlen);
     953        assert( size >= dag_record_size );
     954
    823955        /* Make certain we have the full packet available */
    824         if (size > (FORMAT_DATA->top - FORMAT_DATA->bottom))
     956        if (size > (stream_data->top - stream_data->bottom))
    825957                return NULL;
    826         FORMAT_DATA->bottom += size;
    827         FORMAT_DATA->processed += size;
     958
     959        stream_data->bottom += size;
     960        stream_data->processed += size;
    828961        return erfptr;
    829962}
     
    831964/* Converts a buffer containing a recently read DAG packet record into a
    832965 * libtrace packet */
    833 static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
    834                 void *buffer, libtrace_rt_types_t rt_type, uint32_t flags) {
    835 
     966static int dag_prepare_packet_real(libtrace_t *libtrace,
     967                                   struct dag_per_stream_t *stream_data,
     968                                   libtrace_packet_t *packet,
     969                                   void *buffer, libtrace_rt_types_t rt_type,
     970                                   uint32_t flags)
     971{
    836972        dag_record_t *erfptr;
    837        
     973
    838974        /* If the packet previously owned a buffer that is not the buffer
    839         * that contains the new packet data, we're going to need to free the
    840         * old one to avoid memory leaks */
     975        * that contains the new packet data, we're going to need to free the
     976        * old one to avoid memory leaks */
    841977        if (packet->buffer != buffer &&
    842                         packet->buf_control == TRACE_CTRL_PACKET) {
     978            packet->buf_control == TRACE_CTRL_PACKET) {
    843979                free(packet->buffer);
    844980        }
     
    853989        erfptr = (dag_record_t *)buffer;
    854990        packet->buffer = erfptr;
    855         packet->header = erfptr;
    856         packet->type = rt_type;
     991        packet->header = erfptr;
     992        packet->type = rt_type;
    857993
    858994        if (erfptr->flags.rxerror == 1) {
    859                 /* rxerror means the payload is corrupt - drop the payload
    860                 * by tweaking rlen */
    861                 packet->payload = NULL;
    862                 erfptr->rlen = htons(erf_get_framing_length(packet));
    863         } else {
    864                 packet->payload = (char*)packet->buffer
    865                         + erf_get_framing_length(packet);
    866         }
     995                /* rxerror means the payload is corrupt - drop the payload
     996                * by tweaking rlen */
     997                packet->payload = NULL;
     998                erfptr->rlen = htons(erf_get_framing_length(packet));
     999        } else {
     1000                packet->payload = (char*)packet->buffer
     1001                        + erf_get_framing_length(packet);
     1002        }
    8671003
    8681004        if (libtrace->format_data == NULL) {
     
    8711007
    8721008        /* Update the dropped packets counter */
    873 
    874         /* No loss counter for DSM coloured records - have to use
    875          * some other API */
     1009        /* No loss counter for DSM coloured records - have to use some
     1010         * other API */
    8761011        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    8771012                /* TODO */
    8781013        } else {
    8791014                /* Use the ERF loss counter */
    880                 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
    881                         FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
     1015                if (stream_data->seeninterface[erfptr->flags.iface]
     1016                    == 0) {
     1017                        stream_data->seeninterface[erfptr->flags.iface]
     1018                                = 1;
    8821019                } else {
    883                         FORMAT_DATA->drops += ntohs(erfptr->lctr);
    884                 }
    885         }
     1020                        stream_data->drops += ntohs(erfptr->lctr);
     1021                }
     1022        }
     1023
     1024        packet->error = 1;
    8861025
    8871026        return 0;
     1027}
     1028
     1029static int dag_prepare_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
     1030                              void *buffer, libtrace_rt_types_t rt_type,
     1031                              uint32_t flags)
     1032{
     1033        return dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
     1034                                       buffer, rt_type, flags);
    8881035}
    8891036
     
    8981045/* Pushes an ERF record onto the transmit stream */
    8991046static int dag_dump_packet(libtrace_out_t *libtrace,
    900                 dag_record_t *erfptr, unsigned int pad, void *buffer) {
     1047                           dag_record_t *erfptr, unsigned int pad,
     1048                           void *buffer)
     1049{
    9011050        int size;
    9021051
    9031052        /*
    904          * If we've got 0 bytes waiting in the txqueue, assume that we haven't
    905          * requested any space yet, and request some, storing the pointer at
    906          * FORMAT_DATA_OUT->txbuffer.
     1053         * If we've got 0 bytes waiting in the txqueue, assume that we
     1054         * haven't requested any space yet, and request some, storing
     1055         * the pointer at FORMAT_DATA_OUT->txbuffer.
    9071056         *
    9081057         * The amount to request is slightly magical at the moment - it's
     
    9111060         */
    9121061        if (FORMAT_DATA_OUT->waiting == 0) {
    913                 FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
    914                                 FORMAT_DATA_OUT->dagstream, 16908288);
     1062                FORMAT_DATA_OUT->txbuffer =
     1063                        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
     1064                                                FORMAT_DATA_OUT->dagstream,
     1065                                                16908288);
    9151066        }
    9161067
     
    9191070         * are in contiguous memory
    9201071         */
    921         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
     1072        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, erfptr,
     1073               (dag_record_size + pad));
    9221074        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
    923 
    924 
    9251075
    9261076        /*
     
    9291079         */
    9301080        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
    931         memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
     1081        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting, buffer,
     1082               size);
    9321083        FORMAT_DATA_OUT->waiting += size;
    9331084
     
    9381089         * case there is still data in the buffer at program exit.
    9391090         */
    940 
    9411091        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
    942                 FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    943                         FORMAT_DATA_OUT->waiting );
     1092                FORMAT_DATA_OUT->txbuffer =
     1093                        dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd,
     1094                                                   FORMAT_DATA_OUT->dagstream,
     1095                                                   FORMAT_DATA_OUT->waiting);
    9441096                FORMAT_DATA_OUT->waiting = 0;
    9451097        }
    9461098
    9471099        return size + pad + dag_record_size;
    948 
    9491100}
    9501101
     
    9521103 * if one is found, false otherwise */
    9531104static bool find_compatible_linktype(libtrace_out_t *libtrace,
    954                                 libtrace_packet_t *packet, char *type)
    955 {
    956          // Keep trying to simplify the packet until we can find
    957          //something we can do with it
     1105                                     libtrace_packet_t *packet, char *type)
     1106{
     1107        /* Keep trying to simplify the packet until we can find
     1108         * something we can do with it */
    9581109
    9591110        do {
    960                 *type=libtrace_to_erf_type(trace_get_link_type(packet));
    961 
    962                 // Success
     1111                *type = libtrace_to_erf_type(trace_get_link_type(packet));
     1112
     1113                /* Success */
    9631114                if (*type != (char)-1)
    9641115                        return true;
     
    9661117                if (!demote_packet(packet)) {
    9671118                        trace_set_err_out(libtrace,
    968                                         TRACE_ERR_NO_CONVERSION,
    969                                         "No erf type for packet (%i)",
    970                                         trace_get_link_type(packet));
     1119                                          TRACE_ERR_NO_CONVERSION,
     1120                                          "No erf type for packet (%i)",
     1121                                          trace_get_link_type(packet));
    9711122                        return false;
    9721123                }
     
    9781129
    9791130/* Writes a packet to the provided DAG output trace */
    980 static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    981         /*
    982          * This is heavily borrowed from erf_write_packet(). Yes, CnP coding
    983          * sucks, sorry about that.
     1131static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet)
     1132{
     1133        /* This is heavily borrowed from erf_write_packet(). Yes, CnP
     1134         * coding sucks, sorry about that.
    9841135         */
    9851136        unsigned int pad = 0;
     
    9901141
    9911142        if(!packet->header) {
    992                 /* No header, probably an RT packet. Lifted from 
     1143                /* No header, probably an RT packet. Lifted from
    9931144                 * erf_write_packet(). */
    9941145                return -1;
     
    10091160
    10101161        if (packet->type == TRACE_RT_DATA_ERF) {
    1011                 numbytes = dag_dump_packet(libtrace,
    1012                                 header,
    1013                                 pad,
    1014                                 payload
    1015                                 );
    1016 
     1162                numbytes = dag_dump_packet(libtrace, header, pad, payload);
    10171163        } else {
    10181164                /* Build up a new packet header from the existing header */
    10191165
    1020                 /* Simplify the packet first - if we can't do this, break 
     1166                /* Simplify the packet first - if we can't do this, break
    10211167                 * early */
    10221168                if (!find_compatible_linktype(libtrace,packet,&erf_type))
     
    10371183
    10381184                /* Packet length (rlen includes format overhead) */
    1039                 assert(trace_get_capture_length(packet)>0
    1040                                 && trace_get_capture_length(packet)<=65536);
    1041                 assert(erf_get_framing_length(packet)>0
    1042                                 && trace_get_framing_length(packet)<=65536);
    1043                 assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
    1044                       &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
     1185                assert(trace_get_capture_length(packet) > 0
     1186                       && trace_get_capture_length(packet) <= 65536);
     1187                assert(erf_get_framing_length(packet) > 0
     1188                       && trace_get_framing_length(packet) <= 65536);
     1189                assert(trace_get_capture_length(packet) +
     1190                       erf_get_framing_length(packet) > 0
     1191                       && trace_get_capture_length(packet) +
     1192                       erf_get_framing_length(packet) <= 65536);
    10451193
    10461194                erfhdr.rlen = htons(trace_get_capture_length(packet)
    1047                         + erf_get_framing_length(packet));
     1195                                    + erf_get_framing_length(packet));
    10481196
    10491197
     
    10541202
    10551203                /* Write it out */
    1056                 numbytes = dag_dump_packet(libtrace,
    1057                                 &erfhdr,
    1058                                 pad,
    1059                                 payload);
    1060 
     1204                numbytes = dag_dump_packet(libtrace, &erfhdr, pad, payload);
    10611205        }
    10621206
     
    10681212 * If DUCK reporting is enabled, the packet returned may be a DUCK update
    10691213 */
    1070 static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1071         int size = 0;
    1072         struct timeval tv;
    1073         dag_record_t *erfptr = NULL;
     1214static int dag_read_packet_real(libtrace_t *libtrace,
     1215                                struct dag_per_stream_t *stream_data,
     1216                                libtrace_thread_t *t, /* Optional */
     1217                                libtrace_packet_t *packet)
     1218{
     1219        dag_record_t *erfptr = NULL;
    10741220        int numbytes = 0;
    10751221        uint32_t flags = 0;
    1076         struct timeval maxwait;
    1077         struct timeval pollwait;
     1222        struct timeval maxwait, pollwait;
    10781223
    10791224        pollwait.tv_sec = 0;
     
    10881233                return size;
    10891234
    1090 
    10911235        /* Don't let anyone try to free our DAG memory hole! */
    10921236        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
     
    10941238        /* If the packet buffer is currently owned by libtrace, free it so
    10951239         * that we can set the packet to point into the DAG memory hole */
    1096         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1097                 free(packet->buffer);
    1098                 packet->buffer = 0;
    1099         }
    1100        
    1101         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1102                         FORMAT_DATA->dagstream, sizeof(dag_record_t), &maxwait,
    1103                         &pollwait) == -1)
    1104         {
     1240        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1241                free(packet->buffer);
     1242                packet->buffer = 0;
     1243        }
     1244
     1245        if (dag_set_stream_poll(stream_data->device->fd, stream_data->dagstream,
     1246                                sizeof(dag_record_t), &maxwait,
     1247                                &pollwait) == -1) {
    11051248                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11061249                return -1;
    11071250        }
    11081251
    1109 
    11101252        /* Grab a full ERF record */
    11111253        do {
    1112                 numbytes = dag_available(libtrace);
     1254                numbytes = dag_available(libtrace, stream_data);
    11131255                if (numbytes < 0)
    11141256                        return numbytes;
    11151257                if (numbytes < dag_record_size) {
     1258                        /* Check the message queue if we have one to check */
     1259                        if (t != NULL &&
     1260                            libtrace_message_queue_count(&t->messages) > 0)
     1261                                return -2;
     1262
    11161263                        if (libtrace_halt)
    11171264                                return 0;
     
    11191266                        continue;
    11201267                }
    1121                 erfptr = dag_get_record(libtrace);
     1268                erfptr = dag_get_record(stream_data);
    11221269        } while (erfptr == NULL);
    11231270
    11241271        /* Prepare the libtrace packet */
    1125         if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
    1126                                 flags))
     1272        if (dag_prepare_packet_real(libtrace, stream_data, packet, erfptr,
     1273                                    TRACE_RT_DATA_ERF, flags))
    11271274                return -1;
    11281275
    1129         /* Update the DUCK timer */
    1130         tv = trace_get_timeval(packet);
    1131         DUCK.last_pkt = tv.tv_sec;
    1132 
    11331276        return packet->payload ? htons(erfptr->rlen) :
    1134                                 erf_get_framing_length(packet);
     1277                erf_get_framing_length(packet);
     1278}
     1279
     1280static int dag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1281{
     1282        return dag_read_packet_real(libtrace, FORMAT_DATA_FIRST, NULL, packet);
     1283}
     1284
     1285static int dag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
     1286                             libtrace_packet_t **packets, size_t nb_packets)
     1287{
     1288        int ret;
     1289        size_t read_packets = 0;
     1290        int numbytes = 0;
     1291
     1292        struct dag_per_stream_t *stream_data =
     1293                (struct dag_per_stream_t *)t->format_data;
     1294
     1295        /* Read as many packets as we can, but read atleast one packet */
     1296        do {
     1297                ret = dag_read_packet_real(libtrace, stream_data, t,
     1298                                           packets[read_packets]);
     1299                if (ret < 0)
     1300                        return ret;
     1301
     1302                read_packets++;
     1303
     1304                /* Make sure we don't read too many packets..! */
     1305                if (read_packets >= nb_packets)
     1306                        break;
     1307
     1308                numbytes = dag_available(libtrace, stream_data);
     1309        } while (numbytes >= dag_record_size);
     1310
     1311        return read_packets;
    11351312}
    11361313
     
    11401317 */
    11411318static libtrace_eventobj_t trace_event_dag(libtrace_t *libtrace,
    1142                                         libtrace_packet_t *packet) {
    1143         libtrace_eventobj_t event = {0,0,0.0,0};
     1319                                           libtrace_packet_t *packet)
     1320{
     1321        libtrace_eventobj_t event = {0,0,0.0,0};
    11441322        dag_record_t *erfptr = NULL;
    11451323        int numbytes;
     
    11601338        }
    11611339       
    1162         if (dag_set_stream_poll(FORMAT_DATA->device->fd,
    1163                         FORMAT_DATA->dagstream, 0, &minwait,
    1164                         &minwait) == -1)
    1165         {
     1340        if (dag_set_stream_poll(FORMAT_DATA_FIRST->device->fd,
     1341                                FORMAT_DATA_FIRST->dagstream, 0, &minwait,
     1342                                &minwait) == -1) {
    11661343                trace_set_err(libtrace, errno, "dag_set_stream_poll");
    11671344                event.type = TRACE_EVENT_TERMINATE;
     
    11721349                erfptr = NULL;
    11731350                numbytes = 0;
    1174        
     1351
    11751352                /* Need to call dag_available so that the top pointer will get
    11761353                 * updated, otherwise we'll never see any data! */
    1177                 numbytes = dag_available(libtrace);
    1178 
    1179                 /* May as well not bother calling dag_get_record if 
     1354                numbytes = dag_available(libtrace, FORMAT_DATA_FIRST);
     1355
     1356                /* May as well not bother calling dag_get_record if
    11801357                 * dag_available suggests that there's no data */
    11811358                if (numbytes != 0)
    1182                         erfptr = dag_get_record(libtrace);
     1359                        erfptr = dag_get_record(FORMAT_DATA_FIRST);
    11831360                if (erfptr == NULL) {
    11841361                        /* No packet available - sleep for a very short time */
    11851362                        if (libtrace_halt) {
    11861363                                event.type = TRACE_EVENT_TERMINATE;
    1187                         } else {                       
     1364                        } else {
    11881365                                event.type = TRACE_EVENT_SLEEP;
    11891366                                event.seconds = 0.0001;
     
    11911368                        break;
    11921369                }
    1193                 if (dag_prepare_packet(libtrace, packet, erfptr,
    1194                                         TRACE_RT_DATA_ERF, flags)) {
     1370                if (dag_prepare_packet_real(libtrace, FORMAT_DATA_FIRST, packet,
     1371                                            erfptr, TRACE_RT_DATA_ERF, flags)) {
    11951372                        event.type = TRACE_EVENT_TERMINATE;
    11961373                        break;
     
    11981375
    11991376
    1200                 event.size = trace_get_capture_length(packet) + 
    1201                                 trace_get_framing_length(packet);
    1202                
     1377                event.size = trace_get_capture_length(packet) +
     1378                        trace_get_framing_length(packet);
     1379
    12031380                /* XXX trace_read_packet() normally applies the following
    12041381                 * config options for us, but this function is called via
     
    12061383
    12071384                if (libtrace->filter) {
    1208                         int filtret = trace_apply_filter(libtrace->filter, 
    1209                                         packet);
     1385                        int filtret = trace_apply_filter(libtrace->filter,
     1386                                                         packet);
    12101387                        if (filtret == -1) {
    12111388                                trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
    1212                                                 "Bad BPF Filter");
     1389                                              "Bad BPF Filter");
    12131390                                event.type = TRACE_EVENT_TERMINATE;
    12141391                                break;
     
    12211398                                 * a sleep event in this case, like we used to
    12221399                                 * do! */
    1223                                 libtrace->filtered_packets ++;
     1400                                libtrace->filtered_packets ++;
    12241401                                trace_clear_cache(packet);
    12251402                                continue;
    12261403                        }
    1227                                
     1404
    12281405                        event.type = TRACE_EVENT_PACKET;
    12291406                } else {
     
    12381415                        trace_set_capture_length(packet, libtrace->snaplen);
    12391416                }
    1240                 libtrace->accepted_packets ++;
     1417                libtrace->accepted_packets ++;
    12411418                break;
    1242         } while (1);
     1419        } while(1);
    12431420
    12441421        return event;
     
    12461423
    12471424/* Gets the number of dropped packets */
    1248 static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
    1249         if (trace->format_data == NULL)
    1250                 return (uint64_t)-1;
    1251         return DATA(trace)->drops;
     1425static uint64_t dag_get_dropped_packets(libtrace_t *libtrace)
     1426{
     1427        uint64_t sum = 0;
     1428        libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     1429
     1430        /* Sum the drop counter for all the packets */
     1431        while (tmp != NULL) {
     1432                sum += STREAM_DATA(tmp)->drops;
     1433                tmp = tmp->next;
     1434        }
     1435
     1436        return sum;
    12521437}
    12531438
    12541439/* Prints some semi-useful help text about the DAG format module */
    12551440static void dag_help(void) {
    1256         printf("dag format module: $Revision: 1755 $\n");
    1257         printf("Supported input URIs:\n");
    1258         printf("\tdag:/dev/dagn\n");
    1259         printf("\n");
    1260         printf("\te.g.: dag:/dev/dag0\n");
    1261         printf("\n");
    1262         printf("Supported output URIs:\n");
    1263         printf("\tnone\n");
    1264         printf("\n");
     1441        printf("dag format module: $Revision: 1755 $\n");
     1442        printf("Supported input URIs:\n");
     1443        printf("\tdag:/dev/dagn\n");
     1444        printf("\n");
     1445        printf("\te.g.: dag:/dev/dag0\n");
     1446        printf("\n");
     1447        printf("Supported output URIs:\n");
     1448        printf("\tnone\n");
     1449        printf("\n");
     1450}
     1451
     1452static int dag_pconfig_input(UNUSED libtrace_t *libtrace,
     1453                             trace_parallel_option_t option, UNUSED void *value)
     1454{
     1455        /* We don't support any of these! Normally you configure the DAG card
     1456         * externally. */
     1457        switch(option) {
     1458        case TRACE_OPTION_SET_HASHER:
     1459        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
     1460        case TRACE_OPTION_TRACETIME:
     1461        case TRACE_OPTION_TICK_INTERVAL:
     1462        case TRACE_OPTION_GET_CONFIG:
     1463        case TRACE_OPTION_SET_CONFIG:
     1464                return -1;
     1465        }
     1466        /* We don't provide a default option to ensure that future options will
     1467         * generate a compiler warning. */
     1468
     1469        return -1;
     1470}
     1471
     1472/* TODO: Should possibly make a more generic dag_start_input, as there's a
     1473 * fair bit of code duplication between that and this */
     1474static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     1475                                bool reader)
     1476{
     1477        struct timeval zero, nopoll;
     1478        uint8_t *top, *bottom, *starttop;
     1479        struct dag_per_stream_t *stream_data;
     1480        top = bottom = NULL;
     1481
     1482        /* Minimum delay is 10mS */
     1483        zero.tv_sec = 0;
     1484        zero.tv_usec = 10000;
     1485        nopoll = zero;
     1486
     1487        if (reader) {
     1488                if (t->type == THREAD_PERPKT) {
     1489                        stream_data =
     1490                                (struct dag_per_stream_t *)
     1491                                libtrace_list_get_index(FORMAT_DATA->per_stream,
     1492                                                        t->perpkt_num)->data;
     1493
     1494                        /* Pass the per thread data to the thread */
     1495                        t->format_data = stream_data;
     1496
     1497                        /* Attach and start the DAG stream */
     1498                        printf("t%u: starting and attaching stream #%u\n",
     1499                               t->perpkt_num, stream_data->dagstream);
     1500                        if (dag_attach_stream(stream_data->device->fd,
     1501                                              stream_data->dagstream, 0,
     1502                                              0) < 0) {
     1503                                printf("can't attach DAG stream #%u\n",
     1504                                       stream_data->dagstream);
     1505                                trace_set_err(libtrace, errno,
     1506                                              "can't attach DAG stream #%u",
     1507                                              stream_data->dagstream);
     1508                                return -1;
     1509                        }
     1510                        if (dag_start_stream(stream_data->device->fd,
     1511                                             stream_data->dagstream) < 0) {
     1512                                trace_set_err(libtrace, errno,
     1513                                              "can't start DAG stream #%u",
     1514                                              stream_data->dagstream);
     1515                                printf("can't start DAG stream #%u\n",
     1516                                       stream_data->dagstream);
     1517                                return -1;
     1518                        }
     1519
     1520                        /* Ensure that dag_advance_stream will return without
     1521                         * blocking */
     1522                        if(dag_set_stream_poll(stream_data->device->fd,
     1523                                               stream_data->dagstream, 0, &zero,
     1524                                               &nopoll) < 0) {
     1525                                trace_set_err(libtrace, errno,
     1526                                              "dag_set_stream_poll failed!");
     1527                                return -1;
     1528                        }
     1529
     1530                        /* Clear all the data from the memory hole */
     1531                        starttop = dag_advance_stream(stream_data->
     1532                                                      device->fd,
     1533                                                      stream_data->dagstream,
     1534                                                      &bottom);
     1535
     1536                        top = starttop;
     1537                        while (starttop - bottom > 0) {
     1538                                bottom += (starttop - bottom);
     1539                                top = dag_advance_stream(stream_data->
     1540                                                         device->fd,
     1541                                                         stream_data->dagstream,
     1542                                                         &bottom);
     1543                        }
     1544                        stream_data->top = top;
     1545                        stream_data->bottom = bottom;
     1546                        stream_data->pkt_count = 0;
     1547                        stream_data->drops = 0;
     1548                } else {
     1549                        /* TODO: Figure out why t->type != THREAD_PERPKT in
     1550                         * order to figure out what this line does */
     1551                        t->format_data = FORMAT_DATA_FIRST;
     1552                }
     1553        }
     1554
     1555        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
     1556
     1557        return 0;
    12651558}
    12661559
    12671560static struct libtrace_format_t dag = {
    1268         "dag",
    1269         "$Id$",
    1270         TRACE_FORMAT_ERF,
     1561        "dag",
     1562        "$Id$",
     1563        TRACE_FORMAT_ERF,
    12711564        dag_probe_filename,             /* probe filename */
    12721565        NULL,                           /* probe magic */
    1273         dag_init_input,                 /* init_input */
    1274         dag_config_input,               /* config_input */
    1275         dag_start_input,                /* start_input */
    1276         dag_pause_input,                /* pause_input */
     1566        dag_init_input,                 /* init_input */
     1567        dag_config_input,               /* config_input */
     1568        dag_start_input,                /* start_input */
     1569        dag_pause_input,                /* pause_input */
    12771570        dag_init_output,                /* init_output */
    1278         NULL,                           /* config_output */
     1571        NULL,                           /* config_output */
    12791572        dag_start_output,               /* start_output */
    1280         dag_fin_input,                  /* fin_input */
     1573        dag_fin_input,                  /* fin_input */
    12811574        dag_fin_output,                 /* fin_output */
    1282         dag_read_packet,                /* read_packet */
    1283         dag_prepare_packet,             /* prepare_packet */
     1575        dag_read_packet,                /* read_packet */
     1576        dag_prepare_packet,             /* prepare_packet */
    12841577        NULL,                           /* fin_packet */
    12851578        dag_write_packet,               /* write_packet */
    1286         erf_get_link_type,              /* get_link_type */
    1287         erf_get_direction,              /* get_direction */
    1288         erf_set_direction,              /* set_direction */
    1289         erf_get_erf_timestamp,          /* get_erf_timestamp */
    1290         NULL,                           /* get_timeval */
    1291         NULL,                           /* get_seconds */
     1579        erf_get_link_type,              /* get_link_type */
     1580        erf_get_direction,              /* get_direction */
     1581        erf_set_direction,              /* set_direction */
     1582        erf_get_erf_timestamp,          /* get_erf_timestamp */
     1583        NULL,                           /* get_timeval */
     1584        NULL,                           /* get_seconds */
    12921585        NULL,                           /* get_timespec */
    1293         NULL,                           /* seek_erf */
    1294         NULL,                           /* seek_timeval */
    1295         NULL,                           /* seek_seconds */
    1296         erf_get_capture_length,         /* get_capture_length */
    1297         erf_get_wire_length,            /* get_wire_length */
    1298         erf_get_framing_length,         /* get_framing_length */
    1299         erf_set_capture_length,         /* set_capture_length */
     1586        NULL,                           /* seek_erf */
     1587        NULL,                           /* seek_timeval */
     1588        NULL,                           /* seek_seconds */
     1589        erf_get_capture_length,         /* get_capture_length */
     1590        erf_get_wire_length,            /* get_wire_length */
     1591        erf_get_framing_length,         /* get_framing_length */
     1592        erf_set_capture_length,         /* set_capture_length */
    13001593        NULL,                           /* get_received_packets */
    13011594        NULL,                           /* get_filtered_packets */
    13021595        dag_get_dropped_packets,        /* get_dropped_packets */
    13031596        NULL,                           /* get_captured_packets */
    1304         NULL,                           /* get_fd */
    1305         trace_event_dag,                /* trace_event */
    1306         dag_help,                       /* help */
    1307         NULL                            /* next pointer */
     1597        NULL,                           /* get_fd */
     1598        trace_event_dag,                /* trace_event */
     1599        dag_help,                       /* help */
     1600        NULL,                            /* next pointer */
     1601        {true, 0}, /* live packet capture, thread limit TBD */
     1602        dag_pstart_input,
     1603        dag_pread_packets,
     1604        dag_pause_input,
     1605        NULL,
     1606        dag_pconfig_input,
     1607        dag_pregister_thread,
     1608        NULL
    13081609};
    13091610
    1310 void dag_constructor(void) {
     1611void dag_constructor(void)
     1612{
    13111613        register_format(&dag);
    13121614}
  • lib/format_dpdk.c

    rb585975 r12ae766  
    33 * This file is part of libtrace
    44 *
    5  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 
     5 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
    66 * New Zealand.
    77 *
    8  * Author: Richard Sanger 
    9  *         
     8 * Author: Richard Sanger
     9 *
    1010 * All rights reserved.
    1111 *
    12  * This code has been developed by the University of Waikato WAND 
     12 * This code has been developed by the University of Waikato WAND
    1313 * research group. For further information please see http://www.wand.net.nz/
    1414 *
     
    3636 * Intel Data Plane Development Kit is a LIVE capture format.
    3737 *
    38  * This format also supports writing which will write packets out to the 
    39  * network as a form of packet replay. This should not be confused with the 
    40  * RT protocol which is intended to transfer captured packet records between 
     38 * This format also supports writing which will write packets out to the
     39 * network as a form of packet replay. This should not be confused with the
     40 * RT protocol which is intended to transfer captured packet records between
    4141 * RT-speaking programs.
    4242 */
     43
     44#define _GNU_SOURCE
    4345
    4446#include "config.h"
     
    4749#include "format_helper.h"
    4850#include "libtrace_arphrd.h"
     51#include "hash_toeplitz.h"
    4952
    5053#ifdef HAVE_INTTYPES_H
     
    5962#include <endian.h>
    6063#include <string.h>
     64
     65#if HAVE_LIBNUMA
     66#include <numa.h>
     67#endif
    6168
    6269/* We can deal with any minor differences by checking the RTE VERSION
     
    151158#include <rte_mempool.h>
    152159#include <rte_mbuf.h>
    153 
    154 /* The default size of memory buffers to use - This is the max size of standard
     160#include <rte_launch.h>
     161#include <rte_lcore.h>
     162#include <rte_per_lcore.h>
     163#include <rte_cycles.h>
     164#include <pthread.h>
     165
     166/* The default size of memory buffers to use - This is the max size of standard
    155167 * ethernet packet less the size of the MAC CHECKSUM */
    156168#define RX_MBUF_SIZE 1514
    157169
    158 /* The minimum number of memory buffers per queue tx or rx. Search for 
     170/* The minimum number of memory buffers per queue tx or rx. Search for
    159171 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
    160172 */
     
    174186#define NB_TX_MBUF 1024
    175187
    176 /* The size of the PCI blacklist needs to be big enough to contain 
     188/* The size of the PCI blacklist needs to be big enough to contain
    177189 * every PCI device address (listed by lspci every bus:device.function tuple).
    178190 */
     
    181193/* The maximum number of characters the mempool name can be */
    182194#define MEMPOOL_NAME_LEN 20
     195
     196/* For single threaded libtrace we read packets as a batch/burst
     197 * this is the maximum size of said burst */
     198#define BURST_SIZE 50
    183199
    184200#define MBUF(x) ((struct rte_mbuf *) x)
     
    186202#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    187203#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     204#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     205
    188206#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    189                         (uint64_t) tv.tv_usec*1000ull)
     207                        (uint64_t) tv.tv_usec*1000ull)
    190208#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
    191                         (uint64_t) ts.tv_nsec)
     209                        (uint64_t) ts.tv_nsec)
    192210
    193211#if RTE_PKTMBUF_HEADROOM != 128
    194212#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
    195         "any libtrace instance processing these packet must be have the" \
    196         "same RTE_PKTMBUF_HEADROOM set"
     213        "any libtrace instance processing these packet must be have the" \
     214        "same RTE_PKTMBUF_HEADROOM set"
    197215#endif
    198216
    199217/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    200  * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 
    201  * 
     218 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
     219 *
    202220 * Make sure you understand what these are doing before enabling them.
    203221 * They might make traces incompatable with other builds etc.
    204  * 
     222 *
    205223 * These are also included to show how to do somethings which aren't
    206224 * obvious in the DPDK documentation.
    207225 */
    208226
    209 /* Print verbose messages to stdout */
     227/* Print verbose messages to stderr */
    210228#define DEBUG 0
    211229
    212 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 
    213  * only turn on if you know clock_gettime is a vsyscall on your system 
     230/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     231 * only turn on if you know clock_gettime is a vsyscall on your system
    214232 * overwise could be a large overhead. Again gettimeofday() should be
    215233 * vsyscall also if it's not you should seriously consider updating your
     
    218236#ifdef HAVE_LIBRT
    219237/* You can turn this on (set to 1) to prefer clock_gettime */
    220 #define USE_CLOCK_GETTIME 0
     238#define USE_CLOCK_GETTIME 1
    221239#else
    222240/* DONT CHANGE THIS !!! */
    223 #define USE_CLOCK_GETTIME 0
     241#define USE_CLOCK_GETTIME 1
    224242#endif
    225243
     
    229247 * hence writing out a port such as int: ring: and dpdk: assumes there
    230248 * is no checksum and will attempt to write the checksum as part of the
    231  * packet 
     249 * packet
    232250 */
    233251#define GET_MAC_CRC_CHECKSUM 0
    234252
    235253/* This requires a modification of the pmd drivers (inside Intel DPDK)
     254 * TODO this requires updating (packet sizes are wrong TS most likely also)
    236255 */
    237256#define HAS_HW_TIMESTAMPS_82580 0
     
    257276};
    258277
     278struct dpdk_per_lcore_t
     279{
     280        uint16_t queue_id;
     281        uint8_t port;
     282        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
     283#if HAS_HW_TIMESTAMPS_82580
     284        /* Timestamping only relevent to RX */
     285        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
     286        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
     287#endif
     288};
     289
    259290/* Used by both input and output however some fields are not used
    260291 * for output */
     
    263294    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
    264295    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
    265     uint8_t paused; /* See paused_state */ 
     296    uint8_t paused; /* See paused_state */
    266297    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
     298    uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
    267299    int snaplen; /* The snap length for the capture - RX only */
    268300    /* We always have to setup both rx and tx queues even if we don't want them */
    269301    int nb_rx_buf; /* The number of packet buffers in the rx ring */
    270302    int nb_tx_buf; /* The number of packet buffers in the tx ring */
     303    int nic_numa_node; /* The NUMA node that the NIC is attached to */
    271304    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
    272305#if DPDK_USE_BLACKLIST
     
    275308#endif
    276309    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    277 #if HAS_HW_TIMESTAMPS_82580
    278     /* Timestamping only relevent to RX */
    279     uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
    280     uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
    281     uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    282 #endif
     310    uint8_t rss_key[40]; // This is the RSS KEY
     311    /* To improve performance we always batch reading packets, in a burst */
     312    struct rte_mbuf* burst_pkts[BURST_SIZE];
     313    int burst_size; /* The total number read in the burst */
     314    int burst_offset; /* The offset we are into the burst */
     315        // DPDK normally seems to have a limit of 8 queues for a given card
     316        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
    283317};
    284318
     
    288322};
    289323
    290 /** 
     324/**
    291325 * A structure placed in front of the packet where we can store
    292326 * additional information about the given packet.
     
    294328 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
    295329 * +--------------------------+
    296  * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
    297  * +--------------------------+
    298330 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
    299331 * +--------------------------+
    300  * |   sizeof(dpdk_addt_hdr)  | 1 byte
    301  * +--------------------------+ 
     332 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
     333 * +--------------------------+
    302334 * *   hw_timestamp_82580     * 16 bytes Optional
    303335 * +--------------------------+
     
    317349 * We want to blacklist all devices except those on the whitelist
    318350 * (I say list, but yes it is only the one).
    319  * 
     351 *
    320352 * The default behaviour of rte_pci_probe() will map every possible device
    321353 * to its DPDK driver. The DPDK driver will take the ethernet device
    322354 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
    323  * 
    324  * So blacklist all devices except the one that we wish to use so that 
     355 *
     356 * So blacklist all devices except the one that we wish to use so that
    325357 * the others can still be used as standard ethernet ports.
    326358 *
     
    336368
    337369        TAILQ_FOREACH(dev, &device_list, next) {
    338         if (whitelist != NULL && whitelist->domain == dev->addr.domain
    339             && whitelist->bus == dev->addr.bus
    340             && whitelist->devid == dev->addr.devid
    341             && whitelist->function == dev->addr.function)
    342             continue;
     370        if (whitelist != NULL && whitelist->domain == dev->addr.domain
     371            && whitelist->bus == dev->addr.bus
     372            && whitelist->devid == dev->addr.devid
     373            && whitelist->function == dev->addr.function)
     374            continue;
    343375                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
    344                                 / sizeof (format_data->blacklist[0])) {
    345                         printf("Warning: too many devices to blacklist consider"
    346                                         " increasing BLACK_LIST_SIZE");
     376                                / sizeof (format_data->blacklist[0])) {
     377                        fprintf(stderr, "Warning: too many devices to blacklist consider"
     378                                        " increasing BLACK_LIST_SIZE");
    347379                        break;
    348380                }
     
    360392        char pci_str[20] = {0};
    361393        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
    362                 whitelist->domain,
    363                 whitelist->bus,
    364                 whitelist->devid,
    365                 whitelist->function);
     394                whitelist->domain,
     395                whitelist->bus,
     396                whitelist->devid,
     397                whitelist->function);
    366398        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
    367399                return -1;
     
    375407 * Fills in addr, note core is optional and is unchanged if
    376408 * a value for it is not provided.
    377  * 
     409 *
    378410 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
    379  * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 
     411 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
    380412 */
    381413static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
     
    391423}
    392424
     425/**
     426 * Convert a pci address to the numa node it is
     427 * connected to.
     428 *
     429 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
     430 * so we can call it before DPDK
     431 *
     432 * @return -1 if unknown otherwise a number 0 or higher of the numa node
     433 */
     434static int pci_to_numa(struct rte_pci_addr * dev_addr) {
     435        char path[50] = {0};
     436        FILE *file;
     437
     438        /* Read from the system */
     439        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
     440                 dev_addr->domain,
     441                 dev_addr->bus,
     442                 dev_addr->devid,
     443                 dev_addr->function);
     444
     445        if((file = fopen(path, "r")) != NULL) {
     446                int numa_node = -1;
     447                fscanf(file, "%d", &numa_node);
     448                fclose(file);
     449                return numa_node;
     450        }
     451        return -1;
     452}
     453
    393454#if DEBUG
    394455/* For debugging */
     
    399460
    400461    if (nb_cpu <= 0) {
    401         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    402         nb_cpu = 1; /* fallback to just 1 core */
     462        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
     463        nb_cpu = 1; /* fallback to just 1 core */
    403464    }
    404465    if (nb_cpu > RTE_MAX_LCORE)
    405         nb_cpu = RTE_MAX_LCORE;
     466        nb_cpu = RTE_MAX_LCORE;
    406467
    407468    global_config = rte_eal_get_configuration();
    408469
    409470    if (global_config != NULL) {
    410         int i;
    411         fprintf(stderr, "Intel DPDK setup\n"
    412                "---Version      : %s\n"
    413                "---Master LCore : %"PRIu32"\n"
    414                "---LCore Count  : %"PRIu32"\n",
    415                rte_version(),
    416                global_config->master_lcore, global_config->lcore_count);
    417 
    418         for (i = 0 ; i < nb_cpu; i++) {
    419             fprintf(stderr, "   ---Core %d : %s\n", i,
    420                    global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
    421         }
    422 
    423         const char * proc_type;
    424         switch (global_config->process_type) {
    425             case RTE_PROC_AUTO:
    426                 proc_type = "auto";
    427                 break;
    428             case RTE_PROC_PRIMARY:
    429                 proc_type = "primary";
    430                 break;
    431             case RTE_PROC_SECONDARY:
    432                 proc_type = "secondary";
    433                 break;
    434             case RTE_PROC_INVALID:
    435                 proc_type = "invalid";
    436                 break;
    437             default:
    438                 proc_type = "something worse than invalid!!";
    439         }
    440         fprintf(stderr, "---Process Type : %s\n", proc_type);
    441     }
    442 
    443 }
    444 #endif
     471        int i;
     472        fprintf(stderr, "Intel DPDK setup\n"
     473               "---Version      : %s\n"
     474               "---Master LCore : %"PRIu32"\n"
     475               "---LCore Count  : %"PRIu32"\n",
     476               rte_version(),
     477               global_config->master_lcore, global_config->lcore_count);
     478
     479        for (i = 0 ; i < nb_cpu; i++) {
     480            fprintf(stderr, "   ---Core %d : %s\n", i,
     481                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
     482        }
     483
     484        const char * proc_type;
     485        switch (global_config->process_type) {
     486            case RTE_PROC_AUTO:
     487                proc_type = "auto";
     488                break;
     489            case RTE_PROC_PRIMARY:
     490                proc_type = "primary";
     491                break;
     492            case RTE_PROC_SECONDARY:
     493                proc_type = "secondary";
     494                break;
     495            case RTE_PROC_INVALID:
     496                proc_type = "invalid";
     497                break;
     498            default:
     499                proc_type = "something worse than invalid!!";
     500        }
     501        fprintf(stderr, "---Process Type : %s\n", proc_type);
     502    }
     503
     504}
     505#endif
     506
     507/**
     508 * Expects to be called from the master lcore and moves it to the given dpdk id
     509 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     510 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     511 *               and not already in use.
     512 * @return 0 is successful otherwise -1 on error.
     513 */
     514static inline int dpdk_move_master_lcore(size_t core) {
     515    struct rte_config *cfg = rte_eal_get_configuration();
     516    cpu_set_t cpuset;
     517    int i;
     518
     519    assert (core < RTE_MAX_LCORE);
     520    assert (rte_get_master_lcore() == rte_lcore_id());
     521
     522    if (core == rte_lcore_id())
     523        return 0;
     524
     525    // Make sure we are not overwriting someone else
     526    assert(!rte_lcore_is_enabled(core));
     527
     528    // Move the core
     529    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     530    cfg->lcore_role[core] = ROLE_RTE;
     531    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     532    rte_eal_get_configuration()->master_lcore = core;
     533    RTE_PER_LCORE(_lcore_id) = core;
     534
     535    // Now change the affinity
     536    CPU_ZERO(&cpuset);
     537
     538    if (lcore_config[core].detected) {
     539        CPU_SET(core, &cpuset);
     540    } else {
     541        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     542            if (lcore_config[i].detected)
     543                CPU_SET(i, &cpuset);
     544        }
     545    }
     546
     547    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     548    if (i != 0) {
     549        // TODO proper libtrace style error here!!
     550        fprintf(stderr, "pthread_setaffinity_np failed\n");
     551        return -1;
     552    }
     553    return 0;
     554}
     555
    445556
    446557/**
     
    473584
    474585static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
    475                                         char * err, int errlen) {
     586                                        char * err, int errlen) {
    476587    int ret; /* Returned error codes */
    477588    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
     
    480591    long nb_cpu; /* The number of CPUs in the system */
    481592    long my_cpu; /* The CPU number we want to bind to */
     593    int i;
     594    struct rte_config *cfg = rte_eal_get_configuration();
    482595        struct saved_getopts save_opts;
    483    
     596
    484597#if DEBUG
    485598    rte_set_log_level(RTE_LOG_DEBUG);
     
    489602    /*
    490603     * Using unique file prefixes mean separate memory is used, unlinking
    491      * the two processes. However be careful we still cannot access a
     604     * the two processes. However be careful we still cannot access a
     605     * port that already in use.
     606     *
     607     * Using unique file prefixes mean separate memory is used, unlinking
     608     * the two processes. However be careful we still cannot access a
    492609     * port that already in use.
    493610     */
     
    510627    /* This initialises the Environment Abstraction Layer (EAL)
    511628     * If we had slave workers these are put into WAITING state
    512      * 
     629     *
    513630     * Basically binds this thread to a fixed core, which we choose as
    514631     * the last core on the machine (assuming fewer interrupts mapped here).
     
    521638     */
    522639
    523     /* Get the number of cpu cores in the system and use the last core */
     640    /* Get the number of cpu cores in the system and use the last core
     641     * on the correct numa node */
    524642    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    525643    if (nb_cpu <= 0) {
    526         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    527         nb_cpu = 1; /* fallback to the first core */
     644        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
     645        nb_cpu = 1; /* fallback to the first core */
    528646    }
    529647    if (nb_cpu > RTE_MAX_LCORE)
    530         nb_cpu = RTE_MAX_LCORE;
    531 
    532     my_cpu = nb_cpu;
    533     /* This allows the user to specify the core - we would try to do this 
     648        nb_cpu = RTE_MAX_LCORE;
     649
     650    my_cpu = -1;
     651    /* This allows the user to specify the core - we would try to do this
    534652     * automatically but it's hard to tell that this is secondary
    535      * before running rte_eal_init(...). Currently we are limited to 1 
     653     * before running rte_eal_init(...). Currently we are limited to 1
    536654     * instance per core due to the way memory is allocated. */
    537655    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
    538         snprintf(err, errlen, "Failed to parse URI");
    539         return -1;
    540     }
     656        snprintf(err, errlen, "Failed to parse URI");
     657        return -1;
     658    }
     659
     660#if HAVE_LIBNUMA
     661        format_data->nic_numa_node = pci_to_numa(&use_addr);
     662        if (my_cpu < 0) {
     663                /* If we can assign to a core on the same numa node */
     664                fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node);
     665                if(format_data->nic_numa_node >= 0) {
     666                        int max_node_cpu = -1;
     667                        struct bitmask *mask = numa_allocate_cpumask();
     668                        assert(mask);
     669                        numa_node_to_cpus(format_data->nic_numa_node, mask);
     670                        for (i = 0 ; i < nb_cpu; ++i) {
     671                                if (numa_bitmask_isbitset(mask,i))
     672                                        max_node_cpu = i+1;
     673                        }
     674                        my_cpu = max_node_cpu;
     675                }
     676        }
     677#endif
     678        if (my_cpu < 0) {
     679                my_cpu = nb_cpu;
     680        }
     681
    541682
    542683    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
    543                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
     684                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
    544685
    545686    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
    546         snprintf(err, errlen,
    547           "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
    548           " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
    549         return -1;
    550     }
    551 
    552     /* Make our mask */
    553     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     687        snprintf(err, errlen,
     688          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
     689          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
     690        return -1;
     691    }
     692
     693    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
     694       info older versions */
     695    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     696    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    554697
    555698#if !DPDK_USE_BLACKLIST
    556699    /* Black list all ports besides the one that we want to use */
    557     if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
    558         snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    559                 " are you sure the address is correct?: %s", strerror(-ret));
    560         return -1;
    561     }
     700        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
     701                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     702                        " are you sure the address is correct?: %s", strerror(-ret));
     703                return -1;
     704        }
    562705#endif
    563706
    564707        /* Give the memory map a unique name */
    565708        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
    566     /* rte_eal_init it makes a call to getopt so we need to reset the 
     709    /* rte_eal_init it makes a call to getopt so we need to reset the
    567710     * global optind variable of getopt otherwise this fails */
    568711        save_getopts(&save_opts);
    569712    optind = 1;
    570713    if ((ret = rte_eal_init(argc, argv)) < 0) {
    571         snprintf(err, errlen,
    572           "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
    573         return -1;
     714        snprintf(err, errlen,
     715          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
     716        return -1;
    574717    }
    575718        restore_getopts(&save_opts);
     719    // These are still running but will never do anything with DPDK v1.7 we
     720    // should remove this XXX in the future
     721    for(i = 0; i < RTE_MAX_LCORE; ++i) {
     722            if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
     723            cfg->lcore_role[i] = ROLE_OFF;
     724            cfg->lcore_count--;
     725        }
     726    }
     727    // Only the master should be running
     728    assert(cfg->lcore_count == 1);
     729
     730    dpdk_move_master_lcore(my_cpu-1);
    576731
    577732#if DEBUG
     
    584739     */
    585740    if ((ret = rte_pmd_init_all()) < 0) {
    586         snprintf(err, errlen,
    587           "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
    588         return -1;
     741        snprintf(err, errlen,
     742          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
     743        return -1;
    589744    }
    590745#endif
     
    594749        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
    595750                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    596                         " are you sure the address is correct?: %s", strerror(-ret));
     751                        " are you sure the address is correct?: %s", strerror(-ret));
    597752                return -1;
    598753        }
     
    602757    /* This loads DPDK drivers against all ports that are not blacklisted */
    603758        if ((ret = rte_eal_pci_probe()) < 0) {
    604         snprintf(err, errlen,
    605             "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
    606         return -1;
     759        snprintf(err, errlen,
     760            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
     761        return -1;
    607762    }
    608763#endif
     
    611766
    612767    if (format_data->nb_ports != 1) {
    613         snprintf(err, errlen,
    614             "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
    615             format_data->nb_ports);
    616         return -1;
    617     }
     768        snprintf(err, errlen,
     769            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
     770            format_data->nb_ports);
     771        return -1;
     772    }
     773
     774    struct rte_eth_dev_info dev_info;
     775    rte_eth_dev_info_get(0, &dev_info);
     776    fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
     777                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
    618778
    619779    return 0;
     
    623783    char err[500];
    624784    err[0] = 0;
    625    
     785
    626786    libtrace->format_data = (struct dpdk_format_data_t *)
    627                             malloc(sizeof(struct dpdk_format_data_t));
     787                            malloc(sizeof(struct dpdk_format_data_t));
    628788    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    629789    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
     
    632792    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
    633793    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
     794    FORMAT(libtrace)->nic_numa_node = -1;
    634795    FORMAT(libtrace)->promisc = -1;
    635796    FORMAT(libtrace)->pktmbuf_pool = NULL;
     
    639800    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    640801    FORMAT(libtrace)->mempool_name[0] = 0;
    641 #if HAS_HW_TIMESTAMPS_82580
    642     FORMAT(libtrace)->ts_first_sys = 0;
    643     FORMAT(libtrace)->ts_last_sys = 0;
    644     FORMAT(libtrace)->wrap_count = 0;
    645 #endif
     802    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     803    FORMAT(libtrace)->burst_size = 0;
     804    FORMAT(libtrace)->burst_offset = 0;
    646805
    647806    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    648         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    649         free(libtrace->format_data);
    650         libtrace->format_data = NULL;
    651         return -1;
     807        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     808        free(libtrace->format_data);
     809        libtrace->format_data = NULL;
     810        return -1;
    652811    }
    653812    return 0;
     
    658817    char err[500];
    659818    err[0] = 0;
    660    
     819
    661820    libtrace->format_data = (struct dpdk_format_data_t *)
    662                             malloc(sizeof(struct dpdk_format_data_t));
     821                            malloc(sizeof(struct dpdk_format_data_t));
    663822    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    664823    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
     
    667826    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
    668827    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
     828    FORMAT(libtrace)->nic_numa_node = -1;
    669829    FORMAT(libtrace)->promisc = -1;
    670830    FORMAT(libtrace)->pktmbuf_pool = NULL;
     
    674834    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    675835    FORMAT(libtrace)->mempool_name[0] = 0;
    676 #if HAS_HW_TIMESTAMPS_82580
    677     FORMAT(libtrace)->ts_first_sys = 0;
    678     FORMAT(libtrace)->ts_last_sys = 0;
    679     FORMAT(libtrace)->wrap_count = 0;
    680 #endif
     836    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     837    FORMAT(libtrace)->burst_size = 0;
     838    FORMAT(libtrace)->burst_offset = 0;
    681839
    682840    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    683         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    684         free(libtrace->format_data);
    685         libtrace->format_data = NULL;
    686         return -1;
     841        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     842        free(libtrace->format_data);
     843        libtrace->format_data = NULL;
     844        return -1;
    687845    }
    688846    return 0;
    689847};
    690848
     849static int dpdk_pconfig_input (libtrace_t *libtrace,
     850                                trace_parallel_option_t option,
     851                                void *data) {
     852        switch (option) {
     853                case TRACE_OPTION_SET_HASHER:
     854                        switch (*((enum hasher_types *) data))
     855                        {
     856                                case HASHER_BALANCE:
     857                                case HASHER_UNIDIRECTIONAL:
     858                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     859                                        return 0;
     860                                case HASHER_BIDIRECTIONAL:
     861                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     862                                        return 0;
     863                                case HASHER_HARDWARE:
     864                                case HASHER_CUSTOM:
     865                                        // We don't support these
     866                                        return -1;
     867                        }
     868        break;
     869        }
     870        return -1;
     871}
    691872/**
    692873 * Note here snaplen excludes the MAC checksum. Packets over
    693874 * the requested snaplen will be dropped. (Excluding MAC checksum)
    694  * 
     875 *
    695876 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
    696877 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
    697878 * is set the maximum size of the returned packet would be 1518 otherwise
    698879 * 1514 would be the largest size possibly returned.
    699  * 
     880 *
    700881 */
    701882static int dpdk_config_input (libtrace_t *libtrace,
    702                                         trace_option_t option,
    703                                         void *data) {
     883                                        trace_option_t option,
     884                                        void *data) {
    704885    switch (option) {
    705         case TRACE_OPTION_SNAPLEN:
    706             /* Only support changing snaplen before a call to start is
    707              * made */
    708             if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
    709                 FORMAT(libtrace)->snaplen=*(int*)data;
    710             else
    711                 return -1;
    712             return 0;
     886        case TRACE_OPTION_SNAPLEN:
     887            /* Only support changing snaplen before a call to start is
     888             * made */
     889            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
     890                FORMAT(libtrace)->snaplen=*(int*)data;
     891            else
     892                return -1;
     893            return 0;
    713894                case TRACE_OPTION_PROMISC:
    714895                        FORMAT(libtrace)->promisc=*(int*)data;
    715             return 0;
    716         case TRACE_OPTION_FILTER:
    717             /* TODO filtering */
    718             break;
    719         case TRACE_OPTION_META_FREQ:
    720             break;
    721         case TRACE_OPTION_EVENT_REALTIME:
    722             break;
    723         /* Avoid default: so that future options will cause a warning
    724         * here to remind us to implement it, or flag it as
    725         * unimplementable
    726         */
     896            return 0;
     897        case TRACE_OPTION_FILTER:
     898            /* TODO filtering */
     899            break;
     900        case TRACE_OPTION_META_FREQ:
     901            break;
     902        case TRACE_OPTION_EVENT_REALTIME:
     903            break;
     904        /* Avoid default: so that future options will cause a warning
     905        * here to remind us to implement it, or flag it as
     906        * unimplementable
     907        */
    727908    }
    728909
     
    734915/* Can set jumbo frames/ or limit the size of a frame by setting both
    735916 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
    736  * 
     917 *
    737918 */
    738919static struct rte_eth_conf port_conf = {
    739920        .rxmode = {
     921                .mq_mode = ETH_RSS,
    740922                .split_hdr_size = 0,
    741923                .header_split   = 0, /**< Header Split disabled */
     
    743925                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
    744926                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
    745         .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
     927                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
    746928#if GET_MAC_CRC_CHECKSUM
    747929/* So it appears that if hw_strip_crc is turned off the driver will still
     
    756938 * always cut off the checksum in the future
    757939 */
    758         .hw_strip_crc   = 1, /**< CRC stripped by hardware */
     940                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
    759941#endif
    760942        },
     
    762944                .mq_mode = ETH_DCB_NONE,
    763945        },
     946        .rx_adv_conf = {
     947                .rss_conf = {
     948                        // .rss_key = &rss_key, // We set this per format
     949                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     950                },
     951        },
     952        .intr_conf = {
     953                .lsc = 1
     954        }
    764955};
    765956
     
    776967static const struct rte_eth_txconf tx_conf = {
    777968        .tx_thresh = {
    778         /**
    779         * TX_PTHRESH prefetch
    780         * Set on the NIC, if the number of unprocessed descriptors to queued on
    781         * the card fall below this try grab at least hthresh more unprocessed
    782         * descriptors.
    783         */
     969        /**
     970        * TX_PTHRESH prefetch
     971        * Set on the NIC, if the number of unprocessed descriptors to queued on
     972        * the card fall below this try grab at least hthresh more unprocessed
     973        * descriptors.
     974        */
    784975                .pthresh = 36,
    785976
    786         /* TX_HTHRESH host
    787         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
    788         */
     977        /* TX_HTHRESH host
     978        * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
     979        */
    789980                .hthresh = 0,
    790        
    791         /* TX_WTHRESH writeback
    792         * Set on the NIC, the number of sent descriptors before writing back
    793         * status to confirm the transmission. This is done more efficiently as
    794         * a bulk DMA-transfer rather than writing one at a time.
    795         * Similar to tx_free_thresh however this is applied to the NIC, where
    796         * as tx_free_thresh is when DPDK will check these. This is extended
    797         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
    798         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
    799         */
     981
     982        /* TX_WTHRESH writeback
     983        * Set on the NIC, the number of sent descriptors before writing back
     984        * status to confirm the transmission. This is done more efficiently as
     985        * a bulk DMA-transfer rather than writing one at a time.
     986        * Similar to tx_free_thresh however this is applied to the NIC, where
     987        * as tx_free_thresh is when DPDK will check these. This is extended
     988        * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
     989        * descriptors rather only every n'th item, reducing DMA memory bandwidth.
     990        */
    800991                .wthresh = 4,
    801992        },
     
    808999
    8091000    /* This is the Report Status threshold, used by 10Gbit cards,
    810      * This signals the card to only write back status (such as 
     1001     * This signals the card to only write back status (such as
    8111002     * transmission successful) after this minimum number of transmit
    8121003     * descriptors are seen. The default is 32 (if set to 0) however if set
     
    8171008};
    8181009
     1010/**
     1011 * A callback for a link state change (LSC).
     1012 *
     1013 * Packets may be received before this notification. In fact the DPDK IGXBE
     1014 * driver likes to put a delay upto 5sec before sending this.
     1015 *
     1016 * We use this to ensure the link speed is correct for our timestamp
     1017 * calculations. Because packets might be received before the link up we still
     1018 * update this when the packet is received.
     1019 *
     1020 * @param port The DPDK port
     1021 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
     1022 * @param cb_arg The dpdk_format_data_t structure associated with the format
     1023 */
     1024static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
     1025                              void *cb_arg) {
     1026        struct dpdk_format_data_t * format_data = cb_arg;
     1027        struct rte_eth_link link_info;
     1028        assert(event == RTE_ETH_EVENT_INTR_LSC);
     1029        assert(port == format_data->port);
     1030
     1031        rte_eth_link_get_nowait(port, &link_info);
     1032
     1033        if (link_info.link_status)
     1034                format_data->link_speed = link_info.link_speed;
     1035        else
     1036                format_data->link_speed = 0;
     1037
     1038#if DEBUG
     1039        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
     1040                link_info.link_status ? "up" : "down",
     1041                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
     1042                                          "full-duplex" : "half-duplex",
     1043                (int) link_info.link_speed);
     1044#endif
     1045
     1046        /* Turns out DPDK drivers might not come back up if the link speed
     1047         * changes. So we reset the autoneg procedure. This is very unsafe
     1048         * we have have threads reading packets and we stop the port. */
     1049#if 0
     1050        if (!link_info.link_status) {
     1051                int ret;
     1052                rte_eth_dev_stop(port);
     1053                ret = rte_eth_dev_start(port);
     1054                if (ret < 0) {
     1055                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
     1056                                strerror(-ret));
     1057                }
     1058        }
     1059#endif
     1060}
     1061
    8191062/* Attach memory to the port and start the port or restart the port.
    8201063 */
     
    8221065    int ret; /* Check return values for errors */
    8231066    struct rte_eth_link link_info; /* Wait for link */
    824    
     1067    unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id());
     1068
    8251069    /* Already started */
    8261070    if (format_data->paused == DPDK_RUNNING)
    827         return 0;
    828 
    829     /* First time started we need to alloc our memory, doing this here 
     1071        return 0;
     1072
     1073    /* First time started we need to alloc our memory, doing this here
    8301074     * rather than in environment setup because we don't have snaplen then */
    8311075    if (format_data->paused == DPDK_NEVER_STARTED) {
    832         if (format_data->snaplen == 0) {
    833             format_data->snaplen = RX_MBUF_SIZE;
    834             port_conf.rxmode.jumbo_frame = 0;
    835             port_conf.rxmode.max_rx_pkt_len = 0;
    836         } else {
    837             /* Use jumbo frames */
    838             port_conf.rxmode.jumbo_frame = 1;
    839             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    840         }
    841 
    842         /* This is additional overhead so make sure we allow space for this */
     1076        if (format_data->snaplen == 0) {
     1077            format_data->snaplen = RX_MBUF_SIZE;
     1078            port_conf.rxmode.jumbo_frame = 0;
     1079            port_conf.rxmode.max_rx_pkt_len = 0;
     1080        } else {
     1081            /* Use jumbo frames */
     1082            port_conf.rxmode.jumbo_frame = 1;
     1083            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1084        }
     1085
     1086        /* This is additional overhead so make sure we allow space for this */
    8431087#if GET_MAC_CRC_CHECKSUM
    844         format_data->snaplen += ETHER_CRC_LEN;
     1088        format_data->snaplen += ETHER_CRC_LEN;
    8451089#endif
    8461090#if HAS_HW_TIMESTAMPS_82580
    847         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    848 #endif
    849 
    850         /* Create the mbuf pool, which is the place our packets are allocated
    851          * from - TODO figure out if there is is a free function (I cannot see one)
    852          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    853          * allocate however that extra 1 packet is not used.
    854         * (I assume <= vs < error some where in DPDK code)
    855          * TX requires nb_tx_buffers + 1 in the case the queue is full
    856         * so that will fill the new buffer and wait until slots in the
    857         * ring become available.
    858         */
     1091        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1092#endif
     1093
     1094        /* Create the mbuf pool, which is the place our packets are allocated
     1095         * from - TODO figure out if there is is a free function (I cannot see one)
     1096         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1097         * allocate however that extra 1 packet is not used.
     1098        * (I assume <= vs < error some where in DPDK code)
     1099         * TX requires nb_tx_buffers + 1 in the case the queue is full
     1100        * so that will fill the new buffer and wait until slots in the
     1101        * ring become available.
     1102        */
    8591103#if DEBUG
    8601104    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    8611105#endif
    862         format_data->pktmbuf_pool =
    863             rte_mempool_create(format_data->mempool_name,
    864                        format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
    865                        format_data->snaplen + sizeof(struct rte_mbuf)
    866                                         + RTE_PKTMBUF_HEADROOM,
    867                        8, sizeof(struct rte_pktmbuf_pool_private),
    868                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    869                        rte_socket_id(), MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
    870 
    871         if (format_data->pktmbuf_pool == NULL) {
    872             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
    873                         "pool failed: %s", strerror(rte_errno));
    874             return -1;
    875         }
    876     }
    877    
     1106    format_data->pktmbuf_pool =
     1107            rte_mempool_create(format_data->mempool_name,
     1108                       (format_data->nb_rx_buf + format_data->nb_tx_buf + 1),
     1109                       format_data->snaplen + sizeof(struct rte_mbuf)
     1110                                        + RTE_PKTMBUF_HEADROOM,
     1111                       128, sizeof(struct rte_pktmbuf_pool_private),
     1112                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     1113                       cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/);
     1114
     1115    if (format_data->pktmbuf_pool == NULL) {
     1116            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"
     1117                        "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node);
     1118            return -1;
     1119        }
     1120    }
     1121
    8781122    /* ----------- Now do the setup for the port mapping ------------ */
    879     /* Order of calls must be 
     1123    /* Order of calls must be
    8801124     * rte_eth_dev_configure()
    8811125     * rte_eth_tx_queue_setup()
     
    8841128     * other rte_eth calls
    8851129     */
    886    
     1130
     1131
     1132    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
     1133
    8871134    /* This must be called first before another *eth* function
    8881135     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    8891136    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
    8901137    if (ret < 0) {
    891         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    892                             " %"PRIu8" : %s", format_data->port,
    893                             strerror(-ret));
    894         return -1;
     1138        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1139                            " %"PRIu8" : %s", format_data->port,
     1140                            strerror(-ret));
     1141        return -1;
    8951142    }
    8961143    /* Initialise the TX queue a minimum value if using this port for
     
    8981145     */
    8991146    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    900                         format_data->nb_tx_buf, rte_socket_id(), &tx_conf);
     1147                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
    9011148    if (ret < 0) {
    902         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    903                             " %"PRIu8" : %s", format_data->port,
    904                             strerror(-ret));
    905         return -1;
     1149        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     1150                            " %"PRIu8" : %s", format_data->port,
     1151                            strerror(-ret));
     1152        return -1;
    9061153    }
    9071154    /* Initialise the RX queue with some packets from memory */
    9081155    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
    909                             format_data->nb_rx_buf, rte_socket_id(),
    910                             &rx_conf, format_data->pktmbuf_pool);
     1156                                 format_data->nb_rx_buf, cpu_numa_node,
     1157                                 &rx_conf, format_data->pktmbuf_pool);
    9111158    if (ret < 0) {
    912         snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
    913                     " %"PRIu8" : %s", format_data->port,
    914                     strerror(-ret));
    915         return -1;
    916     }
    917    
     1159        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     1160                    " %"PRIu8" : %s", format_data->port,
     1161                    strerror(-ret));
     1162        return -1;
     1163    }
     1164
    9181165    /* Start device */
    9191166    ret = rte_eth_dev_start(format_data->port);
    9201167    if (ret < 0) {
    921         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    922                     strerror(-ret));
    923         return -1;
     1168        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1169                    strerror(-ret));
     1170        return -1;
    9241171    }
    9251172
    9261173    /* Default promiscuous to on */
    9271174    if (format_data->promisc == -1)
    928         format_data->promisc = 1;
    929    
     1175        format_data->promisc = 1;
     1176
    9301177    if (format_data->promisc == 1)
    931         rte_eth_promiscuous_enable(format_data->port);
     1178        rte_eth_promiscuous_enable(format_data->port);
    9321179    else
    933         rte_eth_promiscuous_disable(format_data->port);
    934    
    935     /* Wait for the link to come up */
    936     rte_eth_link_get(format_data->port, &link_info);
     1180        rte_eth_promiscuous_disable(format_data->port);
     1181
     1182        /* Register a callback for link state changes */
     1183        ret = rte_eth_dev_callback_register(format_data->port,
     1184                                            RTE_ETH_EVENT_INTR_LSC,
     1185                                            dpdk_lsc_callback,
     1186                                            format_data);
     1187        /* If this fails it is not a show stopper */
     1188#if DEBUG
     1189        fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
     1190                ret, strerror(-ret));
     1191#endif
     1192
     1193    /* Get the current link status */
     1194    rte_eth_link_get_nowait(format_data->port, &link_info);
     1195    format_data->link_speed = link_info.link_speed;
    9371196#if DEBUG
    9381197    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    939             (int) link_info.link_duplex, (int) link_info.link_speed);
    940 #endif
    941 
     1198            (int) link_info.link_duplex, (int) link_info.link_speed);
     1199#endif
    9421200    /* We have now successfully started/unpaused */
    9431201    format_data->paused = DPDK_RUNNING;
    944    
     1202
     1203    return 0;
     1204}
     1205
     1206/* Attach memory to the port and start (or restart) the port/s.
     1207 */
     1208static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) {
     1209    int ret, i; /* Check return values for errors */
     1210    struct rte_eth_link link_info; /* Wait for link */
     1211
     1212    /* Already started */
     1213    if (format_data->paused == DPDK_RUNNING)
     1214        return 0;
     1215
     1216    /* First time started we need to alloc our memory, doing this here
     1217     * rather than in environment setup because we don't have snaplen then */
     1218    if (format_data->paused == DPDK_NEVER_STARTED) {
     1219        if (format_data->snaplen == 0) {
     1220            format_data->snaplen = RX_MBUF_SIZE;
     1221            port_conf.rxmode.jumbo_frame = 0;
     1222            port_conf.rxmode.max_rx_pkt_len = 0;
     1223        } else {
     1224            /* Use jumbo frames */
     1225            port_conf.rxmode.jumbo_frame = 1;
     1226            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1227        }
     1228
     1229        /* This is additional overhead so make sure we allow space for this */
     1230#if GET_MAC_CRC_CHECKSUM
     1231        format_data->snaplen += ETHER_CRC_LEN;
     1232#endif
     1233#if HAS_HW_TIMESTAMPS_82580
     1234        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1235#endif
     1236
     1237        /* Create the mbuf pool, which is the place our packets are allocated
     1238         * from - TODO figure out if there is a free function (I cannot see one)
     1239         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1240         * allocate however that extra 1 packet is not used.
     1241         * (I assume <= vs < error some where in DPDK code)
     1242         * TX requires nb_tx_buffers + 1 in the case the queue is full
     1243         * so that will fill the new buffer and wait until slots in the
     1244         * ring become available.
     1245         */
     1246#if DEBUG
     1247    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
     1248#endif
     1249    format_data->pktmbuf_pool =
     1250            rte_mempool_create(format_data->mempool_name,
     1251                       (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
     1252                       format_data->snaplen + sizeof(struct rte_mbuf)
     1253                                        + RTE_PKTMBUF_HEADROOM,
     1254                       128, sizeof(struct rte_pktmbuf_pool_private),
     1255                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     1256                       format_data->nic_numa_node, 0);
     1257
     1258        if (format_data->pktmbuf_pool == NULL) {
     1259            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1260                        "pool failed: %s", strerror(rte_errno));
     1261            return -1;
     1262        }
     1263    }
     1264
     1265    /* ----------- Now do the setup for the port mapping ------------ */
     1266    /* Order of calls must be
     1267     * rte_eth_dev_configure()
     1268     * rte_eth_tx_queue_setup()
     1269     * rte_eth_rx_queue_setup()
     1270     * rte_eth_dev_start()
     1271     * other rte_eth calls
     1272     */
     1273
     1274    /* This must be called first before another *eth* function
     1275     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
     1276    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     1277    if (ret < 0) {
     1278        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1279                            " %"PRIu8" : %s", format_data->port,
     1280                            strerror(-ret));
     1281        return -1;
     1282    }
     1283#if DEBUG
     1284    fprintf(stderr, "Doing dev configure\n");
     1285#endif
     1286    /* Initialise the TX queue a minimum value if using this port for
     1287     * receiving. Otherwise a larger size if writing packets.
     1288     */
     1289    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
     1290                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
     1291    if (ret < 0) {
     1292        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     1293                            " %"PRIu8" : %s", format_data->port,
     1294                            strerror(-ret));
     1295        return -1;
     1296    }
     1297
     1298    for (i=0; i < rx_queues; i++) {
     1299#if DEBUG
     1300    fprintf(stderr, "Doing queue configure\n");
     1301#endif
     1302
     1303                /* Initialise the RX queue with some packets from memory */
     1304                ret = rte_eth_rx_queue_setup(format_data->port, i,
     1305                                             format_data->nb_rx_buf, format_data->nic_numa_node,
     1306                                             &rx_conf, format_data->pktmbuf_pool);
     1307        /* Init per_thread data structures */
     1308        format_data->per_lcore[i].port = format_data->port;
     1309        format_data->per_lcore[i].queue_id = i;
     1310
     1311                if (ret < 0) {
     1312                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     1313                                                " %"PRIu8" : %s", format_data->port,
     1314                                                strerror(-ret));
     1315                        return -1;
     1316                }
     1317        }
     1318
     1319#if DEBUG
     1320    fprintf(stderr, "Doing start device\n");
     1321#endif
     1322    /* Start device */
     1323    ret = rte_eth_dev_start(format_data->port);
     1324#if DEBUG
     1325    fprintf(stderr, "Done start device\n");
     1326#endif
     1327    if (ret < 0) {
     1328        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1329                    strerror(-ret));
     1330        return -1;
     1331    }
     1332
     1333
     1334    /* Default promiscuous to on */
     1335    if (format_data->promisc == -1)
     1336        format_data->promisc = 1;
     1337
     1338    if (format_data->promisc == 1)
     1339        rte_eth_promiscuous_enable(format_data->port);
     1340    else
     1341        rte_eth_promiscuous_disable(format_data->port);
     1342
     1343
     1344    /* We have now successfully started/unpased */
     1345    format_data->paused = DPDK_RUNNING;
     1346
     1347    // Can use remote launch for all
     1348    /*RTE_LCORE_FOREACH_SLAVE(i) {
     1349                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
     1350        }*/
     1351
     1352    /* Register a callback for link state changes */
     1353    ret = rte_eth_dev_callback_register(format_data->port,
     1354                                        RTE_ETH_EVENT_INTR_LSC,
     1355                                        dpdk_lsc_callback,
     1356                                        format_data);
     1357    /* If this fails it is not a show stopper */
     1358#if DEBUG
     1359    fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
     1360            ret, strerror(-ret));
     1361#endif
     1362
     1363    /* Get the current link status */
     1364    rte_eth_link_get_nowait(format_data->port, &link_info);
     1365    format_data->link_speed = link_info.link_speed;
     1366#if DEBUG
     1367    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
     1368            (int) link_info.link_duplex, (int) link_info.link_speed);
     1369        struct rte_eth_rss_reta reta_conf = {0};
     1370        reta_conf.mask_lo = ~reta_conf.mask_lo;
     1371        reta_conf.mask_hi = ~reta_conf.mask_hi;
     1372        int qew = rte_eth_dev_rss_reta_query(format_data->port, &reta_conf);
     1373        fprintf(stderr, "err=%d", qew);
     1374        for (i = 0; i < ETH_RSS_RETA_NUM_ENTRIES; i++) {
     1375                fprintf(stderr, "[%d] = %d\n", i, (int)reta_conf.reta[i]);
     1376        }
     1377
     1378#endif
     1379
    9451380    return 0;
    9461381}
     
    9511386
    9521387    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    953         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    954         free(libtrace->format_data);
    955         libtrace->format_data = NULL;
    956         return -1;
     1388        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1389        free(libtrace->format_data);
     1390        libtrace->format_data = NULL;
     1391        return -1;
    9571392    }
    9581393    return 0;
     1394}
     1395
     1396static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
     1397    struct rte_eth_dev_info dev_info;
     1398    rte_eth_dev_info_get(port_id, &dev_info);
     1399    return dev_info.max_rx_queues;
     1400}
     1401
     1402static inline size_t dpdk_processor_count () {
     1403    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1404    if (nb_cpu <= 0)
     1405        return 1;
     1406    else
     1407        return (size_t) nb_cpu;
     1408}
     1409
     1410static int dpdk_pstart_input (libtrace_t *libtrace) {
     1411    char err[500];
     1412    int i=0, phys_cores=0;
     1413    int tot = libtrace->perpkt_thread_count;
     1414    err[0] = 0;
     1415
     1416    if (rte_lcore_id() != rte_get_master_lcore())
     1417        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
     1418
     1419    // If the master is not on the last thread we move it there
     1420    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1421        // Consider error handling here
     1422        dpdk_move_master_lcore(RTE_MAX_LCORE - 1);
     1423    }
     1424
     1425    // Don't exceed the number of cores in the system/detected by dpdk
     1426    // We don't have to force this but performance wont be good if we don't
     1427    for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1428        if (lcore_config[i].detected) {
     1429            if (rte_lcore_is_enabled(i))
     1430                fprintf(stderr, "Found core %d already in use!\n", i);
     1431            else
     1432                phys_cores++;
     1433        }
     1434    }
     1435
     1436        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1437        tot = MIN(tot, phys_cores);
     1438
     1439        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
     1440
     1441    if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1442        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1443        free(libtrace->format_data);
     1444        libtrace->format_data = NULL;
     1445        return -1;
     1446    }
     1447
     1448    // Make sure we only start the number that we should
     1449    libtrace->perpkt_thread_count = tot;
     1450    return 0;
     1451}
     1452
     1453
     1454/**
     1455 * Register a thread with the DPDK system,
     1456 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1457 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1458 * gives it.
     1459 *
     1460 * We then allow a mapper thread to be started on every real core as DPDK would,
     1461 * we also bind these to the corresponding CPU cores.
     1462 *
     1463 * @param libtrace A pointer to the trace
     1464 * @param reading True if the thread will be used to read packets, i.e. will
     1465 *                call pread_packet(), false if thread used to process packet
     1466 *                in any other manner including statistics functions.
     1467 */
     1468static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1469{
     1470    struct rte_config *cfg = rte_eal_get_configuration();
     1471    int i;
     1472    int new_id = -1;
     1473
     1474    // If 'reading packets' fill in cores from 0 up and bind affinity
     1475    // otherwise start from the MAX core (which is also the master) and work backwards
     1476    // in this case physical cores on the system will not exist so we don't bind
     1477    // these to any particular physical core
     1478    pthread_mutex_lock(&libtrace->libtrace_lock);
     1479    if (reading) {
     1480#if HAVE_LIBNUMA
     1481        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1482                if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
     1483                                new_id = i;
     1484                        if (!lcore_config[i].detected)
     1485                                new_id = -1;
     1486                        break;
     1487                }
     1488        }
     1489#endif
     1490        /* Retry without the the numa restriction */
     1491        if (new_id == -1) {
     1492                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1493                                if (!rte_lcore_is_enabled(i)) {
     1494                                        new_id = i;
     1495                                if (!lcore_config[i].detected)
     1496                                        fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
     1497                                break;
     1498                        }
     1499                }
     1500        }
     1501    } else {
     1502        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1503            if (!rte_lcore_is_enabled(i)) {
     1504                new_id = i;
     1505                break;
     1506            }
     1507        }
     1508    }
     1509
     1510    if (new_id == -1) {
     1511        assert(cfg->lcore_count == RTE_MAX_LCORE);
     1512        // TODO proper libtrace style error here!!
     1513        fprintf(stderr, "Too many threads for DPDK!!\n");
     1514        pthread_mutex_unlock(&libtrace->libtrace_lock);
     1515        return -1;
     1516    }
     1517
     1518    // Enable the core in global DPDK structs
     1519    cfg->lcore_role[new_id] = ROLE_RTE;
     1520    cfg->lcore_count++;
     1521    // Set TLS to reflect our new number
     1522    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
     1523    fprintf(stderr, "original id%d", rte_lcore_id());
     1524    RTE_PER_LCORE(_lcore_id) = new_id;
     1525        char name[99];
     1526        pthread_getname_np(pthread_self(),
     1527                              name, sizeof(name));
     1528
     1529    fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
     1530
     1531    if (reading) {
     1532        // Set affinity bind to corresponding core
     1533        cpu_set_t cpuset;
     1534        CPU_ZERO(&cpuset);
     1535        CPU_SET(rte_lcore_id(), &cpuset);
     1536        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1537        if (i != 0) {
     1538            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
     1539            pthread_mutex_unlock(&libtrace->libtrace_lock);
     1540            return -1;
     1541        }
     1542    }
     1543
     1544    // Map our TLS to the thread data
     1545    if (reading) {
     1546        if(t->type == THREAD_PERPKT) {
     1547            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
     1548        } else {
     1549            t->format_data = &FORMAT(libtrace)->per_lcore[0];
     1550        }
     1551    }
     1552    pthread_mutex_unlock(&libtrace->libtrace_lock);
     1553    return 0;
     1554}
     1555
     1556
     1557/**
     1558 * Unregister a thread with the DPDK system.
     1559 *
     1560 * Only previously registered threads should be calling this just before
     1561 * they are destroyed.
     1562 */
     1563static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
     1564{
     1565    struct rte_config *cfg = rte_eal_get_configuration();
     1566
     1567    assert(rte_lcore_id() < RTE_MAX_LCORE);
     1568    pthread_mutex_lock(&libtrace->libtrace_lock);
     1569    // Skip if master!!
     1570    if (rte_lcore_id() == rte_get_master_lcore()) {
     1571        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1572        pthread_mutex_unlock(&libtrace->libtrace_lock);
     1573        return;
     1574    }
     1575
     1576    // Disable this core in global DPDK structs
     1577    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1578    cfg->lcore_count--;
     1579    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1580    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1581    pthread_mutex_unlock(&libtrace->libtrace_lock);
     1582    return;
    9591583}
    9601584
     
    9631587    char err[500];
    9641588    err[0] = 0;
    965    
     1589
    9661590    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    967         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    968         free(libtrace->format_data);
    969         libtrace->format_data = NULL;
    970         return -1;
     1591        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1592        free(libtrace->format_data);
     1593        libtrace->format_data = NULL;
     1594        return -1;
    9711595    }
    9721596    return 0;
    9731597}
    9741598
    975 static int dpdk_pause_input(libtrace_t * libtrace){
     1599static int dpdk_pause_input(libtrace_t * libtrace) {
    9761600    /* This stops the device, but can be restarted using rte_eth_dev_start() */
    9771601    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
    978 #if DEBUG     
    979         fprintf(stderr, "Pausing port\n");
    980 #endif
    981         rte_eth_dev_stop(FORMAT(libtrace)->port);
    982         FORMAT(libtrace)->paused = DPDK_PAUSED;
    983         /* If we pause it the driver will be reset and likely our counter */
     1602#if DEBUG
     1603        fprintf(stderr, "Pausing DPDK port\n");
     1604#endif
     1605        rte_eth_dev_stop(FORMAT(libtrace)->port);
     1606        FORMAT(libtrace)->paused = DPDK_PAUSED;
     1607        /* Empty the queue of packets */
     1608        for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
     1609                rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
     1610        }
     1611        FORMAT(libtrace)->burst_offset = 0;
     1612        FORMAT(libtrace)->burst_size = 0;
     1613        /* If we pause it the driver will be reset and likely our counter */
     1614
     1615        FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0;
    9841616#if HAS_HW_TIMESTAMPS_82580
    985         FORMAT(libtrace)->ts_first_sys = 0;
    986         FORMAT(libtrace)->ts_last_sys = 0;
     1617        FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0;
    9871618#endif
    9881619    }
     
    9901621}
    9911622
    992 static int dpdk_write_packet(libtrace_out_t *trace, 
     1623static int dpdk_write_packet(libtrace_out_t *trace,
    9931624                libtrace_packet_t *packet){
    9941625    struct rte_mbuf* m_buff[1];
    995    
     1626
    9961627    int wirelen = trace_get_wire_length(packet);
    9971628    int caplen = trace_get_capture_length(packet);
    998    
     1629
    9991630    /* Check for a checksum and remove it */
    10001631    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
    1001                                             wirelen == caplen)
    1002         caplen -= ETHER_CRC_LEN;
     1632                                            wirelen == caplen)
     1633        caplen -= ETHER_CRC_LEN;
    10031634
    10041635    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
    10051636    if (m_buff[0] == NULL) {
    1006         trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
    1007         return -1;
     1637        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
     1638        return -1;
    10081639    } else {
    1009         int ret;
    1010         memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
    1011         do {
    1012             ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
    1013         } while (ret != 1);
     1640        int ret;
     1641        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
     1642        do {
     1643            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
     1644        } while (ret != 1);
    10141645    }
    10151646
     
    10201651    /* Free our memory structures */
    10211652    if (libtrace->format_data != NULL) {
    1022         /* Close the device completely, device cannot be restarted */
    1023         if (FORMAT(libtrace)->port != 0xFF)
    1024             rte_eth_dev_close(FORMAT(libtrace)->port);
    1025         /* filter here if we used it */
     1653        /* Close the device completely, device cannot be restarted */
     1654        if (FORMAT(libtrace)->port != 0xFF)
     1655                rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
     1656                                                RTE_ETH_EVENT_INTR_LSC,
     1657                                                dpdk_lsc_callback,
     1658                                                FORMAT(libtrace));
     1659                rte_eth_dev_close(FORMAT(libtrace)->port);
     1660                /* filter here if we used it */
    10261661                free(libtrace->format_data);
    10271662        }
     
    10371672    /* Free our memory structures */
    10381673    if (libtrace->format_data != NULL) {
    1039         /* Close the device completely, device cannot be restarted */
    1040         if (FORMAT(libtrace)->port != 0xFF)
    1041             rte_eth_dev_close(FORMAT(libtrace)->port);
    1042         /* filter here if we used it */
     1674        /* Close the device completely, device cannot be restarted */
     1675        if (FORMAT(libtrace)->port != 0xFF)
     1676            rte_eth_dev_close(FORMAT(libtrace)->port);
     1677        /* filter here if we used it */
    10431678                free(libtrace->format_data);
    10441679        }
     
    10501685}
    10511686
    1052 /** 
    1053  * Get the start of additional header that we added to a packet.
     1687/**
     1688 * Get the start of the additional header that we added to a packet.
    10541689 */
    10551690static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
    1056     uint8_t *hdrsize;
    10571691    assert(packet);
    10581692    assert(packet->buffer);
    1059     hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
    1060     /* The byte before the original packet data denotes the size in bytes
    1061      * of our additional header that we added sits before the 'size byte' */
    1062     hdrsize--;
    1063     return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
     1693    /* Our header sits straight after the mbuf header */
     1694    return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
    10641695}
    10651696
     
    10721703    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    10731704    if (size > hdr->cap_len) {
    1074         /* Cannot make a packet bigger */
     1705        /* Cannot make a packet bigger */
    10751706                return trace_get_capture_length(packet);
    10761707        }
     
    10861717    int org_cap_size; /* The original capture size */
    10871718    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
    1088         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1089                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
    1090                             sizeof(struct hw_timestamp_82580);
     1719        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
     1720                            sizeof(struct hw_timestamp_82580);
    10911721    } else {
    1092         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1093                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
     1722        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
    10941723    }
    10951724    if (hdr->flags & INCLUDES_CHECKSUM) {
    1096         return org_cap_size;
     1725        return org_cap_size;
    10971726    } else {
    1098         /* DPDK packets are always TRACE_TYPE_ETH packets */
    1099         return org_cap_size + ETHER_CRC_LEN;
     1727        /* DPDK packets are always TRACE_TYPE_ETH packets */
     1728        return org_cap_size + ETHER_CRC_LEN;
    11001729    }
    11011730}
     
    11031732    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    11041733    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
    1105         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
    1106                 sizeof(struct hw_timestamp_82580);
     1734        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
     1735                sizeof(struct hw_timestamp_82580);
    11071736    else
    1108         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1737        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
    11091738}
    11101739
     
    11141743    assert(packet);
    11151744    if (packet->buffer != buffer &&
    1116         packet->buf_control == TRACE_CTRL_PACKET) {
    1117         free(packet->buffer);
     1745        packet->buf_control == TRACE_CTRL_PACKET) {
     1746        free(packet->buffer);
    11181747    }
    11191748
    11201749    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    1121         packet->buf_control = TRACE_CTRL_PACKET;
     1750        packet->buf_control = TRACE_CTRL_PACKET;
    11221751    } else
    1123         packet->buf_control = TRACE_CTRL_EXTERNAL;
     1752        packet->buf_control = TRACE_CTRL_EXTERNAL;
    11241753
    11251754    packet->buffer = buffer;
     
    11321761}
    11331762
     1763
     1764/**
     1765 * Given a packet size and a link speed, computes the
     1766 * time to transmit in nanoseconds.
     1767 *
     1768 * @param format_data The dpdk format data from which we get the link speed
     1769 *        and if unset updates it in a thread safe manner
     1770 * @param pkt_size The size of the packet in bytes
     1771 * @return The wire time in nanoseconds
     1772 */
     1773static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
     1774        uint32_t wire_time;
     1775        /* 20 extra bytes of interframe gap and preamble */
     1776# if GET_MAC_CRC_CHECKSUM
     1777        wire_time = ((pkt_size + 20) * 8000);
     1778# else
     1779        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
     1780# endif
     1781
     1782        /* Division is really slow and introduces a pipeline stall
     1783         * The compiler will optimise this into magical multiplication and shifting
     1784         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
     1785         */
     1786retry_calc_wiretime:
     1787        switch (format_data->link_speed) {
     1788        case ETH_LINK_SPEED_40G:
     1789                wire_time /=  ETH_LINK_SPEED_40G;
     1790                break;
     1791        case ETH_LINK_SPEED_20G:
     1792                wire_time /= ETH_LINK_SPEED_20G;
     1793                break;
     1794        case ETH_LINK_SPEED_10G:
     1795                wire_time /= ETH_LINK_SPEED_10G;
     1796                break;
     1797        case ETH_LINK_SPEED_1000:
     1798                wire_time /= ETH_LINK_SPEED_1000;
     1799                break;
     1800        case 0:
     1801                {
     1802                /* Maybe the link was down originally, but now it should be up */
     1803                struct rte_eth_link link = {0};
     1804                rte_eth_link_get_nowait(format_data->port, &link);
     1805                if (link.link_status && link.link_speed) {
     1806                        format_data->link_speed = link.link_speed;
     1807#ifdef DEBUG
     1808                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
     1809#endif
     1810                        goto retry_calc_wiretime;
     1811                }
     1812                /* We don't know the link speed, make sure numbers are counting up */
     1813                wire_time = 1;
     1814                break;
     1815                }
     1816        default:
     1817                wire_time /= format_data->link_speed;
     1818        }
     1819        return wire_time;
     1820}
     1821
     1822
     1823
    11341824/*
    1135  * Does any extra preperation to a captured packet.
    1136  * This includes adding our extra header to it with the timestamp
    1137  */
    1138 static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
    1139                                                         struct rte_mbuf* pkt){
    1140     uint8_t * hdr_size;
    1141     struct dpdk_addt_hdr *hdr;
     1825 * Does any extra preperation to all captured packets
     1826 * This includes adding our extra header to it with the timestamp,
     1827 * and any snapping
     1828 *
     1829 * @param format_data The DPDK format data
     1830 * @param plc The DPDK per lcore format data
     1831 * @param pkts An array of size nb_pkts of DPDK packets
     1832 * @param nb_pkts The number of packets in pkts and optionally packets
     1833 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
     1834 */
     1835static inline void dpdk_ready_pkts(struct dpdk_format_data_t *format_data, struct dpdk_per_lcore_t *plc,
     1836                                   struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
     1837        struct dpdk_addt_hdr *hdr;
     1838        size_t i;
     1839        uint64_t cur_sys_time_ns;
    11421840#if HAS_HW_TIMESTAMPS_82580
    1143     struct hw_timestamp_82580 *hw_ts;
    1144     struct timeval cur_sys_time;
    1145     uint64_t cur_sys_time_ns;
    1146     uint64_t estimated_wraps;
    1147    
    1148     /* Using gettimeofday because it's most likely to be a vsyscall
    1149      * We don't want to slow down anything with systemcalls we dont need
    1150      * accauracy */
    1151     gettimeofday(&cur_sys_time, NULL);
     1841        struct hw_timestamp_82580 *hw_ts;
     1842        uint64_t estimated_wraps;
    11521843#else
    1153 # if USE_CLOCK_GETTIME
    1154     struct timespec cur_sys_time;
    1155    
    1156     /* This looks terrible and I feel bad doing it. But it's OK
    1157      * on new kernels, because this is a vsyscall */
    1158     clock_gettime(CLOCK_REALTIME, &cur_sys_time);
    1159 # else
    1160     struct timeval cur_sys_time;
    1161     /* Should be a vsyscall */
    1162     gettimeofday(&cur_sys_time, NULL);
    1163 # endif
    1164 #endif
    1165 
    1166     /* Record the size of our header */
    1167     hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
    1168     *hdr_size = sizeof(struct dpdk_addt_hdr);
    1169     /* Now put our header in front of that size */
    1170     hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
    1171     memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
    1172    
     1844
     1845#endif
     1846
     1847#if USE_CLOCK_GETTIME
     1848        struct timespec cur_sys_time = {0};
     1849        /* This looks terrible and I feel bad doing it. But it's OK
     1850         * on new kernels, because this is a fast vsyscall */
     1851        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
     1852        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
     1853#else
     1854        struct timeval cur_sys_time = {0};
     1855        /* Also a fast vsyscall */
     1856        gettimeofday(&cur_sys_time, NULL);
     1857        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
     1858#endif
     1859
     1860        /* The system clock is not perfect so when running
     1861         * at linerate we could timestamp a packet in the past.
     1862         * To avoid this we munge the timestamp to appear 1ns
     1863         * after the previous packet. We should eventually catch up
     1864         * to system time since a 64byte packet on a 10G link takes 67ns.
     1865         *
     1866         * Note with parallel readers timestamping packets
     1867         * with duplicate stamps or out of order is unavoidable without
     1868         * hardware timestamping from the NIC.
     1869         */
     1870#if !HAS_HW_TIMESTAMPS_82580
     1871        if (plc->ts_last_sys >= cur_sys_time_ns) {
     1872                cur_sys_time_ns = plc->ts_last_sys + 1;
     1873        }
     1874#endif
     1875
     1876        assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); // TODO static compile time assert sometime??
     1877        for (i = 0 ; i < nb_pkts ; ++i) {
     1878
     1879                /* We put our header straight after the dpdk header */
     1880                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
     1881                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
     1882
    11731883#if GET_MAC_CRC_CHECKSUM
     1884<<<<<<< HEAD
     1885                /* Add back in the CRC sum */
     1886                pkts[i]->pkt.pkt_len += ETHER_CRC_LEN;
     1887                pkts[i]->pkt.data_len += ETHER_CRC_LEN;
     1888                hdr->flags |= INCLUDES_CHECKSUM;
     1889=======
    11741890    /* Add back in the CRC sum */
    11751891    rte_pktmbuf_pkt_len(pkt) += ETHER_CRC_LEN;
    11761892    rte_pktmbuf_data_len(pkt) += ETHER_CRC_LEN;
    11771893    hdr->flags |= INCLUDES_CHECKSUM;
    1178 #endif
     1894>>>>>>> master
     1895#endif
     1896
     1897                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
    11791898
    11801899#if HAS_HW_TIMESTAMPS_82580
    1181     /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
    1182      *
    1183      *        +----------+---+   +--------------+
    1184      *  82580 |    24    | 8 |   |      32      |
    1185      *        +----------+---+   +--------------+
    1186      *          reserved  \______ 40 bits _____/
    1187      *
    1188      * The 40 bit 82580 SYSTIM overflows every
    1189      *   2^40 * 10^-9 /  60  = 18.3 minutes.
    1190      *
    1191      * NOTE picture is in Big Endian order, in memory it's acutally in Little
    1192      * Endian (for the full 64 bits) i.e. picture is mirrored
    1193      */
    1194    
    1195     /* The timestamp is sitting before our packet and is included in pkt_len */
    1196     hdr->flags |= INCLUDES_HW_TIMESTAMP;
    1197     hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
    1198    
    1199     /* Despite what the documentation says this is in Little
    1200      * Endian byteorder. Mask the reserved section out.
    1201      */
    1202     hdr->timestamp = le64toh(hw_ts->timestamp) &
    1203                 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
    1204                
    1205     cur_sys_time_ns = TV_TO_NS(cur_sys_time);
    1206     if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
    1207         FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
    1208         FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
    1209     }
    1210    
    1211     /* This will have serious problems if packets aren't read quickly
    1212      * that is within a couple of seconds because our clock cycles every
    1213      * 18 seconds */
    1214     estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
    1215                             / (1ull<<TS_NBITS_82580);
    1216    
    1217     /* Estimated_wraps gives the number of times the counter should have
    1218      * wrapped (however depending on value last time it could have wrapped
    1219      * twice more (if hw clock is close to its max value) or once less (allowing
    1220      * for a bit of variance between hw and sys clock). But if the clock
    1221      * shouldn't have wrapped once then don't allow it to go backwards in time */
    1222     if (unlikely(estimated_wraps >= 2)) {
    1223         /* 2 or more wrap arounds add all but the very last wrap */
    1224         FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
    1225     }
    1226    
    1227     /* Set the timestamp to the lowest possible value we're considering */
    1228     hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
    1229                         FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
    1230    
    1231     /* In most runs only the first if() will need evaluating - i.e our
    1232      * estimate is correct. */
    1233     if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
    1234                                 hdr->timestamp, MAXSKEW_82580))) {
    1235         /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
    1236         FORMAT(libtrace)->wrap_count++;
    1237         hdr->timestamp += (1ull<<TS_NBITS_82580);
    1238         if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1239                                 hdr->timestamp, MAXSKEW_82580)) {
    1240             /* Failed to match estimated_wraps */
    1241             FORMAT(libtrace)->wrap_count++;
    1242             hdr->timestamp += (1ull<<TS_NBITS_82580);
    1243             if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1244                                 hdr->timestamp, MAXSKEW_82580)) {
    1245                 if (estimated_wraps == 0) {
    1246                     /* 0 case Failed to match estimated_wraps+2 */
    1247                     printf("WARNING - Hardware Timestamp failed to"
    1248                                             " match using systemtime!\n");
    1249                     hdr->timestamp = cur_sys_time_ns;
    1250                 } else {
    1251                     /* Failed to match estimated_wraps+1 */
    1252                     FORMAT(libtrace)->wrap_count++;
    1253                     hdr->timestamp += (1ull<<TS_NBITS_82580);
    1254                     if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1255                                 hdr->timestamp, MAXSKEW_82580)) {
    1256                         /* Failed to match estimated_wraps+2 */
    1257                         printf("WARNING - Hardware Timestamp failed to"
    1258                                             " match using systemtime!!\n");
    1259                     }
    1260                 }
    1261             }
    1262         }
    1263     }
    1264 
    1265     /* Log our previous for the next loop */
    1266     FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
    1267 
     1900                /* The timestamp is sitting before our packet and is included in pkt_len */
     1901                hdr->flags |= INCLUDES_HW_TIMESTAMP;
     1902                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
     1903                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
     1904
     1905                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
     1906                 *
     1907                 *        +----------+---+   +--------------+
     1908                 *  82580 |    24    | 8 |   |      32      |
     1909                 *        +----------+---+   +--------------+
     1910                 *          reserved  \______ 40 bits _____/
     1911                 *
     1912                 * The 40 bit 82580 SYSTIM overflows every
     1913                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
     1914                 *
     1915                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
     1916                 * Endian (for the full 64 bits) i.e. picture is mirrored
     1917                 */
     1918
     1919                /* Despite what the documentation says this is in Little
     1920                 * Endian byteorder. Mask the reserved section out.
     1921                 */
     1922                hdr->timestamp = le64toh(hw_ts->timestamp) &
     1923                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
     1924
     1925                if (unlikely(plc->ts_first_sys == 0)) {
     1926                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
     1927                        plc->ts_last_sys = plc->ts_first_sys;
     1928                }
     1929
     1930                /* This will have serious problems if packets aren't read quickly
     1931                 * that is within a couple of seconds because our clock cycles every
     1932                 * 18 seconds */
     1933                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
     1934                                  / (1ull<<TS_NBITS_82580);
     1935
     1936                /* Estimated_wraps gives the number of times the counter should have
     1937                 * wrapped (however depending on value last time it could have wrapped
     1938                 * twice more (if hw clock is close to its max value) or once less (allowing
     1939                 * for a bit of variance between hw and sys clock). But if the clock
     1940                 * shouldn't have wrapped once then don't allow it to go backwards in time */
     1941                if (unlikely(estimated_wraps >= 2)) {
     1942                        /* 2 or more wrap arounds add all but the very last wrap */
     1943                        plc->wrap_count += estimated_wraps - 1;
     1944                }
     1945
     1946                /* Set the timestamp to the lowest possible value we're considering */
     1947                hdr->timestamp += plc->ts_first_sys +
     1948                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
     1949
     1950                /* In most runs only the first if() will need evaluating - i.e our
     1951                 * estimate is correct. */
     1952                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
     1953                                              hdr->timestamp, MAXSKEW_82580))) {
     1954                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
     1955                        plc->wrap_count++;
     1956                        hdr->timestamp += (1ull<<TS_NBITS_82580);
     1957                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1958                                             hdr->timestamp, MAXSKEW_82580)) {
     1959                                /* Failed to match estimated_wraps */
     1960                                plc->wrap_count++;
     1961                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     1962                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1963                                                     hdr->timestamp, MAXSKEW_82580)) {
     1964                                        if (estimated_wraps == 0) {
     1965                                                /* 0 case Failed to match estimated_wraps+2 */
     1966                                                printf("WARNING - Hardware Timestamp failed to"
     1967                                                       " match using systemtime!\n");
     1968                                                hdr->timestamp = cur_sys_time_ns;
     1969                                        } else {
     1970                                                /* Failed to match estimated_wraps+1 */
     1971                                                plc->wrap_count++;
     1972                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     1973                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1974                                                                     hdr->timestamp, MAXSKEW_82580)) {
     1975                                                        /* Failed to match estimated_wraps+2 */
     1976                                                        printf("WARNING - Hardware Timestamp failed to"
     1977                                                               " match using systemtime!!\n");
     1978                                                }
     1979                                        }
     1980                                }
     1981                        }
     1982                }
    12681983#else
    1269 # if USE_CLOCK_GETTIME
    1270     hdr->timestamp = TS_TO_NS(cur_sys_time);
    1271 # else
    1272     hdr->timestamp = TV_TO_NS(cur_sys_time);
    1273 # endif
    1274 #endif
    1275 
    1276     /* Intels samples prefetch into level 0 cache lets assume it is a good
    1277      * idea and do the same */
    1278     rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
    1279     packet->buffer = pkt;
    1280     dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
    1281 
    1282     /* Set our capture length for the first time */
    1283     hdr->cap_len = dpdk_get_wire_length(packet);
    1284     if (!(hdr->flags & INCLUDES_CHECKSUM)) {
    1285         hdr->cap_len -= ETHER_CRC_LEN;
    1286     }
    1287    
    1288 
    1289     return dpdk_get_framing_length(packet) +
    1290                         dpdk_get_capture_length(packet);
    1291 }
     1984
     1985                hdr->timestamp = cur_sys_time_ns;
     1986                /* Offset the next packet by the wire time of previous */
     1987                calculate_wire_time(format_data, hdr->cap_len);
     1988
     1989#endif
     1990                if(packets) {
     1991                        packets[i]->buffer = pkts[i];
     1992                        packets[i]->header = pkts[i];
     1993#if HAS_HW_TIMESTAMPS_82580
     1994                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
     1995                                              RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580);
     1996#else
     1997                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
     1998                                              RTE_PKTMBUF_HEADROOM;
     1999#endif
     2000                        packets[i]->error = 1;
     2001                }
     2002        }
     2003
     2004        plc->ts_last_sys = cur_sys_time_ns;
     2005
     2006        return;
     2007}
     2008
     2009
     2010static void dpdk_fin_packet(libtrace_packet_t *packet)
     2011{
     2012        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     2013                rte_pktmbuf_free(packet->buffer);
     2014                packet->buffer = NULL;
     2015        }
     2016}
     2017
    12922018
    12932019static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    1294     int nb_rx; /* Number of rx packets we've recevied */
    1295     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
     2020    int nb_rx; /* Number of rx packets we've received */
    12962021
    12972022    /* Free the last packet buffer */
    12982023    if (packet->buffer != NULL) {
    1299         /* Buffer is owned by DPDK */
    1300         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1301             rte_pktmbuf_free(packet->buffer);
    1302             packet->buffer = NULL;
    1303         } else
    1304         /* Buffer is owned by packet i.e. has been malloc'd */
    1305         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1306             free(packet->buffer);
    1307             packet->buffer = NULL;
    1308         }
    1309     }
    1310    
     2024        /* Buffer is owned by DPDK */
     2025        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     2026            rte_pktmbuf_free(packet->buffer);
     2027            packet->buffer = NULL;
     2028        } else
     2029        /* Buffer is owned by packet i.e. has been malloc'd */
     2030        if (packet->buf_control == TRACE_CTRL_PACKET) {
     2031            free(packet->buffer);
     2032            packet->buffer = NULL;
     2033        }
     2034    }
     2035
    13112036    packet->buf_control = TRACE_CTRL_EXTERNAL;
    13122037    packet->type = TRACE_RT_DATA_DPDK;
    1313    
     2038
     2039    /* Check if we already have some packets buffered */
     2040    if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
     2041            packet->buffer =  FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
     2042            dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2043            return 1; // TODO should be bytes read, which essentially useless anyway
     2044    }
    13142045    /* Wait for a packet */
    13152046    while (1) {
    1316         /* Poll for a single packet */
    1317         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    1318                             FORMAT(libtrace)->queue_id, pkts_burst, 1);
    1319         if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    1320             return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    1321         }
    1322         if (libtrace_halt) {
    1323             return 0;
    1324         }
    1325     }
    1326    
     2047        /* Poll for a single packet */
     2048        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
     2049                                 FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE);
     2050        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
     2051                FORMAT(libtrace)->burst_size = nb_rx;
     2052                FORMAT(libtrace)->burst_offset = 1;
     2053                dpdk_ready_pkts(FORMAT(libtrace), &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
     2054                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
     2055                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2056                return 1; // TODO should be bytes read, which essentially useless anyway
     2057        }
     2058        if (libtrace_halt) {
     2059                return 0;
     2060        }
     2061        /* Wait a while, polling on memory degrades performance
     2062         * This relieves the pressure on memory allowing the NIC to DMA */
     2063        rte_delay_us(10);
     2064    }
     2065
     2066    /* We'll never get here - but if we did it would be bad */
     2067    return -1;
     2068}
     2069
     2070static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
     2071    size_t nb_rx; /* Number of rx packets we've recevied */
     2072    struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
     2073    size_t i;
     2074
     2075    for (i = 0 ; i < nb_packets ; ++i) {
     2076            /* Free the last packet buffer */
     2077            if (packets[i]->buffer != NULL) {
     2078                /* Buffer is owned by DPDK */
     2079                if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) {
     2080                    rte_pktmbuf_free(packets[i]->buffer);
     2081                    packets[i]->buffer = NULL;
     2082                } else
     2083                /* Buffer is owned by packet i.e. has been malloc'd */
     2084                if (packets[i]->buf_control == TRACE_CTRL_PACKET) {
     2085                    free(packets[i]->buffer);
     2086                    packets[i]->buffer = NULL;
     2087                }
     2088            }
     2089            packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
     2090            packets[i]->type = TRACE_RT_DATA_DPDK;
     2091    }
     2092
     2093    /* Wait for a packet */
     2094    while (1) {
     2095        /* Poll for a single packet */
     2096        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
     2097                            PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets);
     2098        if (nb_rx > 0) {
     2099                /* Got some packets - otherwise we keep spining */
     2100                //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     2101                dpdk_ready_pkts(FORMAT(libtrace), PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
     2102                return nb_rx;
     2103        }
     2104        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
     2105        if (libtrace_message_queue_count(&t->messages) > 0) {
     2106                printf("Extra message yay");
     2107                return -2;
     2108        }
     2109        if (libtrace_halt) {
     2110                return 0;
     2111        }
     2112        /* Wait a while, polling on memory degrades performance
     2113         * This relieves the pressure on memory allowing the NIC to DMA */
     2114        rte_delay_us(10);
     2115    }
     2116
    13272117    /* We'll never get here - but if we did it would be bad */
    13282118    return -1;
     
    13322122    struct timeval tv;
    13332123    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1334    
     2124
    13352125    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    13362126    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
     
    13412131    struct timespec ts;
    13422132    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1343    
     2133
    13442134    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    13452135    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
     
    13682158static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
    13692159    struct rte_eth_stats stats = {0};
    1370    
     2160
    13712161    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1372         return UINT64_MAX;
     2162        return UINT64_MAX;
    13732163    /* Grab the current stats */
    13742164    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1375    
     2165
    13762166    /* Get the drop counter */
    13772167    return (uint64_t) stats.ierrors;
     
    13802170static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
    13812171    struct rte_eth_stats stats = {0};
    1382    
     2172
    13832173    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1384         return UINT64_MAX;
     2174        return UINT64_MAX;
    13852175    /* Grab the current stats */
    13862176    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1387    
     2177
    13882178    /* Get the drop counter */
    13892179    return (uint64_t) stats.ipackets;
     
    13932183 * This is the number of packets filtered by the NIC
    13942184 * and maybe ahead of number read using libtrace.
    1395  * 
     2185 *
    13962186 * XXX we are yet to implement any filtering, but if it was this should
    13972187 * get the result. So this will just return 0 for now.
     
    13992189static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
    14002190    struct rte_eth_stats stats = {0};
    1401    
     2191
    14022192    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1403         return UINT64_MAX;
     2193        return UINT64_MAX;
    14042194    /* Grab the current stats */
    14052195    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1406    
     2196
    14072197    /* Get the drop counter */
    14082198    return (uint64_t) stats.fdirmiss;
     
    14142204 */
    14152205static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
    1416                                         libtrace_packet_t *packet) {
     2206                                        libtrace_packet_t *packet) {
    14172207    libtrace_eventobj_t event = {0,0,0.0,0};
    14182208    int nb_rx; /* Number of receive packets we've read */
    14192209    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
    1420    
     2210
    14212211    do {
    1422    
    1423         /* See if we already have a packet waiting */
    1424         nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
    1425                         FORMAT(trace)->queue_id, pkts_burst, 1);
    1426        
    1427         if (nb_rx > 0) {
    1428             /* Free the last packet buffer */
    1429             if (packet->buffer != NULL) {
    1430                 /* Buffer is owned by DPDK */
    1431                 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1432                     rte_pktmbuf_free(packet->buffer);
    1433                     packet->buffer = NULL;
    1434                 } else
    1435                 /* Buffer is owned by packet i.e. has been malloc'd */
    1436                 if (packet->buf_control == TRACE_CTRL_PACKET) {
    1437                     free(packet->buffer);
    1438                     packet->buffer = NULL;
    1439                 }
    1440             }
    1441            
    1442             packet->buf_control = TRACE_CTRL_EXTERNAL;
    1443             packet->type = TRACE_RT_DATA_DPDK;
    1444             event.type = TRACE_EVENT_PACKET;
    1445             event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
    1446            
    1447             /* XXX - Check this passes the filter trace_read_packet normally
    1448              * does this for us but this wont */
    1449             if (trace->filter) {
    1450                 if (!trace_apply_filter(trace->filter, packet)) {
    1451                     /* Failed the filter so we loop for another packet */
    1452                     trace->filtered_packets ++;
    1453                     continue;
    1454                 }
    1455             }
    1456             trace->accepted_packets ++;
    1457         } else {
    1458             /* We only want to sleep for a very short time - we are non-blocking */
    1459             event.type = TRACE_EVENT_SLEEP;
    1460             event.seconds = 0.0001;
    1461             event.size = 0;
    1462         }
    1463        
    1464         /* If we get here we have our event */
    1465         break;
     2212
     2213        /* See if we already have a packet waiting */
     2214        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
     2215                        FORMAT(trace)->queue_id, pkts_burst, 1);
     2216
     2217        if (nb_rx > 0) {
     2218            /* Free the last packet buffer */
     2219            if (packet->buffer != NULL) {
     2220                /* Buffer is owned by DPDK */
     2221                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     2222                    rte_pktmbuf_free(packet->buffer);
     2223                    packet->buffer = NULL;
     2224                } else
     2225                /* Buffer is owned by packet i.e. has been malloc'd */
     2226                if (packet->buf_control == TRACE_CTRL_PACKET) {
     2227                    free(packet->buffer);
     2228                    packet->buffer = NULL;
     2229                }
     2230            }
     2231
     2232            packet->buf_control = TRACE_CTRL_EXTERNAL;
     2233            packet->type = TRACE_RT_DATA_DPDK;
     2234            event.type = TRACE_EVENT_PACKET;
     2235            dpdk_ready_pkts(FORMAT(trace), &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
     2236            event.size = 1; // TODO should be bytes read, which essentially useless anyway
     2237
     2238            /* XXX - Check this passes the filter trace_read_packet normally
     2239             * does this for us but this wont */
     2240            if (trace->filter) {
     2241                if (!trace_apply_filter(trace->filter, packet)) {
     2242                    /* Failed the filter so we loop for another packet */
     2243                    trace->filtered_packets ++;
     2244                    continue;
     2245                }
     2246            }
     2247            trace->accepted_packets ++;
     2248        } else {
     2249            /* We only want to sleep for a very short time - we are non-blocking */
     2250            event.type = TRACE_EVENT_SLEEP;
     2251            event.seconds = 0.0001;
     2252            event.size = 0;
     2253        }
     2254
     2255        /* If we get here we have our event */
     2256        break;
    14662257    } while (1);
    14672258
     
    14882279}
    14892280
    1490  static struct libtrace_format_t dpdk = {
     2281static struct libtrace_format_t dpdk = {
    14912282        "dpdk",
    14922283        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
     
    15052296        dpdk_read_packet,           /* read_packet */
    15062297        dpdk_prepare_packet,    /* prepare_packet */
    1507         NULL,                               /* fin_packet */
     2298        dpdk_fin_packet,                                    /* fin_packet */
    15082299        dpdk_write_packet,          /* write_packet */
    15092300        dpdk_get_link_type,         /* get_link_type */
     
    15282319        dpdk_trace_event,               /* trace_event */
    15292320    dpdk_help,              /* help */
    1530         NULL
     2321    NULL,                   /* next pointer */
     2322    {true, 8},              /* Live, NICs typically have 8 threads */
     2323    dpdk_pstart_input, /* pstart_input */
     2324        dpdk_pread_packets, /* pread_packets */
     2325        dpdk_pause_input, /* ppause */
     2326        dpdk_fin_input, /* p_fin */
     2327        dpdk_pconfig_input, /* pconfig_input */
     2328    dpdk_pregister_thread, /* pregister_thread */
     2329    dpdk_punregister_thread /* unpregister_thread */
    15312330};
    15322331
  • lib/format_duck.c

    rc5ac872 rc5ac872  
    366366        NULL,                           /* trace_event */
    367367        duck_help,                      /* help */
    368         NULL                            /* next pointer */
     368        NULL,                            /* next pointer */
     369        NON_PARALLEL(false)
    369370};
    370371
  • lib/format_erf.c

    rc69aecb rc69aecb  
    837837        erf_event,                      /* trace_event */
    838838        erf_help,                       /* help */
    839         NULL                            /* next pointer */
     839        NULL,                           /* next pointer */
     840        NON_PARALLEL(false)
    840841};
    841842
     
    880881        erf_event,                      /* trace_event */
    881882        erf_help,                       /* help */
    882         NULL                            /* next pointer */
     883        NULL,                           /* next pointer */
     884        NON_PARALLEL(false)
    883885};
    884886
  • lib/format_legacy.c

    r1ca603b rb13b939  
    552552        trace_event_trace,              /* trace_event */
    553553        legacyatm_help,                 /* help */
    554         NULL                            /* next pointer */
     554        NULL,                           /* next pointer */
     555        NON_PARALLEL(false)
    555556};
    556557
     
    595596        trace_event_trace,              /* trace_event */
    596597        legacyeth_help,                 /* help */
    597         NULL                            /* next pointer */
     598        NULL,                           /* next pointer */
     599        NON_PARALLEL(false)
    598600};
    599601
     
    639641        legacypos_help,                 /* help */
    640642        NULL,                           /* next pointer */
     643        NON_PARALLEL(false)
    641644};
    642645
     
    682685        legacynzix_help,                /* help */
    683686        NULL,                           /* next pointer */
     687        NON_PARALLEL(false)
    684688};
    685689       
  • lib/format_pcap.c

    r4649fea r4649fea  
    834834        trace_event_trace,              /* trace_event */
    835835        pcap_help,                      /* help */
    836         NULL                            /* next pointer */
     836        NULL,                   /* next pointer */
     837        NON_PARALLEL(false)
    837838};
    838839
     
    877878        trace_event_device,             /* trace_event */
    878879        pcapint_help,                   /* help */
    879         NULL                            /* next pointer */
     880        NULL,                   /* next pointer */
     881        NON_PARALLEL(true)
    880882};
    881883
  • lib/format_pcapfile.c

    rc69aecb rc69aecb  
    786786        pcapfile_event,         /* trace_event */
    787787        pcapfile_help,                  /* help */
    788         NULL                            /* next pointer */
     788        NULL,                   /* next pointer */
     789        NON_PARALLEL(false)
    789790};
    790791
  • lib/format_rt.c

    rc5ac872 rc5ac872  
    458458                                /* This may fail on a non-Linux machine */
    459459                                if (trace_is_err(RT_INFO->dummy_ring)) {
    460                                         trace_perror(RT_INFO->dummy_ring, "Creating dead int trace");
     460                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
    461461                                        return -1;
    462462                                }
     
    863863        trace_event_rt,             /* trace_event */
    864864        rt_help,                        /* help */
    865         NULL                            /* next pointer */
     865        NULL,                   /* next pointer */
     866        NON_PARALLEL(true) /* This is normally live */
    866867};
    867868
  • lib/format_tsh.c

    rc69aecb rc69aecb  
    274274        trace_event_trace,              /* trace_event */
    275275        tsh_help,                       /* help */
    276         NULL                            /* next pointer */
     276        NULL,                   /* next pointer */
     277        NON_PARALLEL(false)
    277278};
    278279
     
    322323        trace_event_trace,              /* trace_event */
    323324        tsh_help,                       /* help */
    324         NULL                            /* next pointer */
     325        NULL,                   /* next pointer */
     326        NON_PARALLEL(false)
    325327};
    326328
  • lib/libtrace.h.in

    rc5ac872 rc5ac872  
    117117/** DAG driver version installed on the current system */
    118118#define DAG_DRIVER_V "@DAG_VERSION_NUM@"
     119
     120/**
     121  * A version of assert that always runs the first argument even
     122  * when not debugging, however only asserts the condition if debugging
     123  * Intended for use mainly with pthread locks etc. which have error
     124  * returns but *should* never actually fail.
     125  */
     126#ifdef NDEBUG
     127#define ASSERT_RET(run, cond) run
     128#else
     129#define ASSERT_RET(run, cond) assert(run cond)
     130//#define ASSERT_RET(run, cond) run
     131#endif
    119132   
    120133#ifdef __cplusplus
     
    197210#endif
    198211
     212// Used to fight against false sharing
     213#define CACHE_LINE_SIZE 64
     214#define ALIGN_STRUCT(x) __attribute__((aligned(x)))
     215
    199216#ifdef _MSC_VER
    200217    #ifdef LT_BUILDING_DLL
     
    225242/** Opaque structure holding information about a bpf filter */
    226243typedef struct libtrace_filter_t libtrace_filter_t;
     244
     245typedef struct libtrace_thread_t libtrace_thread_t;
    227246
    228247/** If the packet has allocated its own memory the buffer_control should be
     
    512531        uint8_t transport_proto;        /**< Cached transport protocol */
    513532        uint32_t l4_remaining;          /**< Cached transport remaining */
     533        uint64_t order; /**< Notes the order of this packet in relation to the input */
     534        uint64_t hash; /**< A hash of the packet as supplied by the user */
     535        int error; /**< The error status of pread_packet */
    514536} libtrace_packet_t;
    515537
     
    32313253/*@}*/
    32323254
     3255/**
     3256 * A collection of types for convenience used in place of a
     3257 * simple void* to allow a any type of data to be stored.
     3258 *
     3259 * This is expected to be 8 bytes in length.
     3260 */
     3261typedef union {
     3262        /* Pointers */
     3263        void *ptr;
     3264        libtrace_packet_t *pkt;
     3265
     3266        /* C99 Integer types */
     3267        /* NOTE: Standard doesn't require 64-bit
     3268     * but x32 and x64 gcc does */
     3269        int64_t sint64;
     3270        uint64_t uint64;
     3271
     3272        uint32_t uint32s[2];
     3273        int32_t sint32s[2];
     3274        uint32_t uint32;
     3275        int32_t sint32;
     3276
     3277        uint16_t uint16s[4];
     3278        int16_t sint16s[4];
     3279        uint16_t uint16;
     3280        int16_t sint16;
     3281
     3282        uint8_t uint8s[8];
     3283        int8_t sint8s[8];
     3284        uint8_t uint8;
     3285        int8_t sint8;
     3286
     3287        size_t size;
     3288
     3289        /* C basic types - we cannot be certian of the size */
     3290        int sint;
     3291        unsigned int uint;
     3292
     3293        signed char schars[8];
     3294        unsigned char uchars[8];
     3295        signed char schar;
     3296        unsigned char uchar;
     3297
     3298        /* Real numbers */
     3299        float rfloat;
     3300        double rdouble;
     3301} libtrace_generic_types_t;
     3302
     3303typedef struct libtrace_message_t {
     3304        int code;
     3305        libtrace_generic_types_t additional;
     3306        libtrace_thread_t *sender;
     3307} libtrace_message_t;
     3308
     3309/** Structure holding information about a result */
     3310typedef struct libtrace_result_t {
     3311        uint64_t key;
     3312        libtrace_generic_types_t value;
     3313        int type;
     3314} libtrace_result_t;
     3315#define RESULT_NORMAL 0
     3316#define RESULT_PACKET 1
     3317#define RESULT_TICK   2
     3318
     3319
     3320typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
     3321typedef void (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
     3322typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
     3323
     3324DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter);
     3325DLLEXPORT int trace_ppause(libtrace_t *libtrace);
     3326DLLEXPORT int trace_pstop(libtrace_t *libtrace);
     3327DLLEXPORT void trace_join(libtrace_t * trace);
     3328DLLEXPORT void print_contention_stats (libtrace_t *libtrace);
     3329
     3330DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
     3331DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result);
     3332DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value);
     3333DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result);
     3334DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value);
     3335DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
     3336
     3337// Ways to access Global and TLS storage that we provide the user
     3338DLLEXPORT void * trace_get_global(libtrace_t *trace);
     3339DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data);
     3340DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data);
     3341DLLEXPORT void * trace_get_tls(libtrace_thread_t *t);
     3342
     3343
     3344DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type);
     3345typedef struct libtrace_vector libtrace_vector_t;
     3346
     3347DLLEXPORT int trace_post_reporter(libtrace_t *libtrace);
     3348DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace);
     3349DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     3350DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     3351DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message);
     3352DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
     3353DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message);
     3354DLLEXPORT int trace_finished(libtrace_t * libtrace);
     3355DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet);
     3356DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet);
     3357DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order);
     3358DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash);
     3359DLLEXPORT uint64_t tv_to_usec(struct timeval *tv);
     3360
     3361DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
     3362
     3363DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt);
     3364DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res);
     3365
     3366typedef enum {
     3367        /**
     3368         * Sets the hasher function, if NULL(default) no hashing is used a
     3369         * cores will get packets on a first in first served basis
     3370         */
     3371        TRACE_OPTION_SET_HASHER,
     3372       
     3373        /**
     3374         * Libtrace set perpkt thread count
     3375         */
     3376        TRACE_OPTION_SET_PERPKT_THREAD_COUNT,
     3377       
     3378        /**
     3379         * Delays packets so they are played back in trace-time rather than as fast
     3380         * as possible.
     3381         */
     3382        TRACE_OPTION_TRACETIME,
     3383
     3384        /**
     3385         * Specifies the interval between tick packets in milliseconds, if 0
     3386         * or less this is ignored.
     3387         */
     3388        TRACE_OPTION_TICK_INTERVAL,
     3389        TRACE_OPTION_GET_CONFIG,
     3390        TRACE_OPTION_SET_CONFIG
     3391} trace_parallel_option_t;
     3392
     3393enum libtrace_messages {
     3394        MESSAGE_STARTING,
     3395        MESSAGE_RESUMING,
     3396        MESSAGE_STOPPING,
     3397        MESSAGE_PAUSING,
     3398        MESSAGE_DO_PAUSE,
     3399        MESSAGE_DO_STOP,
     3400        MESSAGE_FIRST_PACKET,
     3401        MESSAGE_PERPKT_ENDED,
     3402        MESSAGE_PERPKT_RESUMED,
     3403        MESSAGE_PERPKT_PAUSED,
     3404        MESSAGE_PERPKT_EOF,
     3405        MESSAGE_POST_REPORTER,
     3406        MESSAGE_POST_RANGE,
     3407        MESSAGE_TICK,
     3408        MESSAGE_USER
     3409};
     3410
     3411enum hasher_types {
     3412        /**
     3413         * Balance load across CPUs best as possible, this is basically to say do
     3414         * not care about hash. This might still might be implemented
     3415         * using a hash or round robin etc. under the hood depending on the format
     3416         */
     3417        HASHER_BALANCE,
     3418
     3419        /** Use a hash which is bi-directional for TCP flows, that is packets with
     3420         * the same hash are sent to the same thread. All non TCP packets will be
     3421         * sent to the same thread. UDP may or may not be sent to separate
     3422         * threads like TCP, this depends on the format support.
     3423         */
     3424        HASHER_BIDIRECTIONAL,
     3425       
     3426        /**
     3427         * Use a hash which is uni-directional across TCP flows, that means the
     3428         * opposite directions of the same 5 tuple might end up on separate cores.
     3429         * Otherwise is identical to HASHER_BIDIRECTIONAL
     3430         */
     3431        HASHER_UNIDIRECTIONAL,
     3432
     3433        /**
     3434         * Always use the user supplied hasher, this currently disables native
     3435         * support and is likely significantly slower.
     3436         */
     3437        HASHER_CUSTOM,
     3438
     3439        /**
     3440         * This is not a valid option, used internally only!!! TODO remove
     3441         * Set by the format if the hashing is going to be done in hardware
     3442         */
     3443        HASHER_HARDWARE
     3444};
     3445
     3446typedef struct libtrace_info_t {
     3447        /**
     3448         * True if a live format (i.e. packets have to be tracetime).
     3449         * Otherwise false, indicating packets can be read as fast
     3450         * as possible from the format.
     3451         */
     3452        bool live;
     3453
     3454        /**
     3455         * The maximum number of threads supported by a parallel trace. 1
     3456         * if parallel support is not native (in this case libtrace will simulate
     3457         * an unlimited number of threads), -1 means unlimited and 0 unknown.
     3458         */
     3459        int max_threads;
     3460
     3461        /* TODO hash fn supported list */
     3462
     3463        /* TODO consider time/clock details?? */
     3464} libtrace_info_t;
     3465
     3466DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value);
     3467DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3468DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3469DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data);
     3470DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace);
     3471
     3472/**
     3473 * Tuning the parallel sizes
     3474 */
     3475struct user_configuration {
     3476        // Packet memory cache settings (ocache_init) total
     3477        /**
     3478         * See diagrams, this sets the maximum size of freelist used to
     3479         * maintain packets and their memory buffers.
     3480         * NOTE setting this to less than recommend could cause deadlock a
     3481         * trace that manages its own packets.
     3482         * A unblockable error message will be printed.
     3483         */
     3484        size_t packet_cache_size;
     3485        /**
     3486         * Per thread local cache size for the packet freelist
     3487         */
     3488        size_t packet_thread_cache_size;
     3489        /**
     3490         * If true the total number of packets that can be created by a trace is limited
     3491         * to the packet_cache_size, otherwise once packet_cache_size is exceeded alloc
     3492         * and free will be used to create and free packets, this will be slower than
     3493         * using the freelist and could run a machine out of memory.
     3494         *
     3495         * However this does make it easier to ensure that deadlocks will not occur
     3496         * due to running out of packets
     3497         */
     3498        bool fixed_packet_count;
     3499        /**
     3500         * When reading from a single threaded input source to reduce
     3501         * lock contention a 'burst' of packets is read per pkt thread
     3502         * this determines the bursts size.
     3503         */
     3504        size_t burst_size;
     3505        // Each perpkt thread has a queue leading into the reporter
     3506        //size_t reporter_queue_size;
     3507
     3508        /**
     3509         * The tick interval - in milliseconds
     3510         * When a live trace is used messages are sent at the tick
     3511         * interval to ensure that all perpkt threads receive data
     3512         * this allows results to be printed in cases flows are
     3513         * not being directed to a certian thread, while still
     3514         * maintaining order.
     3515         */
     3516        size_t tick_interval;
     3517
     3518        /**
     3519         * Like the tick interval but used in the case of file format
     3520         * This specifies the number of packets before inserting a tick to
     3521         * every thread.
     3522         */
     3523        size_t tick_count;
     3524
     3525        /**
     3526         * The number of per packet threads requested, 0 means use default.
     3527         * Default typically be the number of processor threads detected less one or two.
     3528         */
     3529        size_t perpkt_threads;
     3530
     3531        /**
     3532         * See diagrams, this sets the maximum size of buffers used between
     3533         * the single hasher thread and the buffer.
     3534         * NOTE setting this to less than recommend could cause deadlock a
     3535         * trace that manages its own packets.
     3536         * A unblockable warning message will be printed to stderr in this case.
     3537         */
     3538        /** The number of packets that can queue per thread from hasher thread */
     3539        size_t hasher_queue_size;
     3540
     3541        /**
     3542         * If true use a polling hasher queue, that means that we will spin/or yeild
     3543         * when rather than blocking on a lock. This applies to both the hasher thread
     3544         * and perpkts reading the queues.
     3545         */
     3546        bool hasher_polling;
     3547
     3548        /**
     3549         * If true the reporter thread will continuously poll waiting for results
     3550         * if false they are only checked when a message is received, this message
     3551         * is controlled by reporter_thold.
     3552         */
     3553        bool reporter_polling;
     3554
     3555        /**
     3556         * Perpkt thread result queue size before triggering the reporter step to read results
     3557         */
     3558        size_t reporter_thold;
     3559
     3560        /**
     3561         * Prints a line to standard error for every state change
     3562         * for both the trace as a whole and for each thread.
     3563         */
     3564        bool debug_state;
     3565};
     3566#include <stdio.h>
     3567DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str);
     3568DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file);
     3569DLLEXPORT int libtrace_get_perpkt_count(libtrace_t* t);
     3570
     3571/**
     3572 * The methods we use to combine multiple outputs into a single output
     3573 * This is not considered a stable API however is public.
     3574 * Where possible use built in combiners
     3575 *
     3576 * NOTE this structure is duplicated per trace and as such can
     3577 * have functions rewritten, and in fact should if possible.
     3578 */
     3579typedef struct libtrace_combine libtrace_combine_t;
     3580struct libtrace_combine {
     3581
     3582        /**
     3583         * Called at the start of the trace to allow datastructures
     3584         * to be initilised and allow functions to be swapped if approriate.
     3585         *
     3586         * Also factors such as whether the trace is live or not can
     3587         * be used to determine the functions used.
     3588         * @return 0 if successful, -1 if an error occurs
     3589         */
     3590        int (*initialise)(libtrace_t *,libtrace_combine_t *);
     3591
     3592        /**
     3593         * Called when the trace ends, clean up any memory here
     3594         * from libtrace_t * init.
     3595         */
     3596        void (*destroy)(libtrace_t *, libtrace_combine_t *);
     3597
     3598        /**
     3599         * Publish a result against it's a threads queue.
     3600         * If null publish directly, expected to be used
     3601         * as a single threaded optimisation and can be
     3602         * set to NULL by init if this case is detected.
     3603         */
     3604        void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *);
     3605
     3606        /**
     3607         * Read as many results as possible from the trace.
     3608         * Directy calls the users code to handle results from here.
     3609         *
     3610         * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE
     3611         * If publish is NULL, this probably should be NULL also otherwise
     3612         * it will not be called.
     3613         */
     3614        void (*read)(libtrace_t *, libtrace_combine_t *);
     3615
     3616        /**
     3617         * Called when the trace is finished to flush the final
     3618         * results to the reporter thread.
     3619         *
     3620         * There may be no results, in which case this should
     3621         * just return.
     3622         *
     3623         * Libtrace state:
     3624         * Called from reporter thread
     3625         * No perpkt threads will be running, i.e. publish will not be
     3626         * called again.
     3627         *
     3628         * If publish is NULL, this probably should be NULL also otherwise
     3629         * it will not be called.
     3630         */
     3631        void (*read_final)(libtrace_t *, libtrace_combine_t *);
     3632
     3633        /**
     3634         * Pause must make sure any results of the type packet are safe.
     3635         * That means trace_copy_packet() and destroy the original.
     3636         * This also should be NULL if publish is NULL.
     3637         */
     3638        void (*pause)(libtrace_t *, libtrace_combine_t *);
     3639
     3640        /**
     3641         * Data storage for all the combiner threads
     3642         */
     3643        void *queues;
     3644
     3645        /**
     3646         * Configuration options, what this does is upto the combiner
     3647         * chosen.
     3648         */
     3649        libtrace_generic_types_t configuration;
     3650};
     3651
     3652DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config);
     3653
     3654#define READ_EOF 0
     3655#define READ_ERROR -1
     3656#define READ_MESSAGE -2
     3657// Used for inband tick message
     3658#define READ_TICK -3
     3659
     3660#define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration));
     3661
    32333662#ifdef __cplusplus
    32343663} /* extern "C" */
  • lib/libtrace_int.h

    r6fc1ae7 r6cf3ca0  
    148148#endif
    149149
     150#include "data-struct/ring_buffer.h"
     151#include "data-struct/object_cache.h"
     152#include "data-struct/vector.h"
     153#include "data-struct/message_queue.h"
     154#include "data-struct/deque.h"
     155#include "data-struct/linked_list.h"
     156#include "data-struct/sliding_window.h"
    150157
    151158//#define RP_BUFSIZE 65536U
     
    166173        bool waiting;
    167174};
     175
     176enum thread_types {
     177        THREAD_EMPTY,
     178        THREAD_HASHER,
     179        THREAD_PERPKT,
     180        THREAD_REPORTER,
     181        THREAD_KEEPALIVE
     182};
     183
     184enum thread_states {
     185        THREAD_RUNNING,
     186        THREAD_FINISHING,
     187        THREAD_FINISHED,
     188        THREAD_PAUSED,
     189        THREAD_STATE_MAX
     190};
     191
     192/**
     193 * Information of this thread
     194 */
     195struct libtrace_thread_t {
     196        uint64_t accepted_packets; // The number of packets accepted only used if pread
     197        uint64_t filtered_packets;
     198        // is retreving packets
     199        // Set to true once the first packet has been stored
     200        bool recorded_first;
     201        // For thread safety reason we actually must store this here
     202        int64_t tracetime_offset_usec;
     203        void* user_data; // TLS for the user to use
     204        void* format_data; // TLS for the format to use
     205        libtrace_message_queue_t messages; // Message handling
     206        libtrace_ringbuffer_t rbuffer; // Input
     207        libtrace_t * trace;
     208        void* ret;
     209        enum thread_types type;
     210        enum thread_states state;
     211        pthread_t tid;
     212        int perpkt_num; // A number from 0-X that represents this perpkt threads number
     213                                // in the table, intended to quickly identify this thread
     214                                // -1 represents NA (such as the case this is not a perpkt thread)
     215};
     216
     217/**
     218 * Storage to note time value against each.
     219 * Used both internally to do trace time playback
     220 * and can be used externally to assist applications which need
     221 * a trace starting time such as tracertstats.
     222 */
     223struct first_packets {
     224        pthread_spinlock_t lock;
     225        size_t count; // If == perpkt_thread_count threads we have all
     226        size_t first; // Valid if count != 0
     227        struct __packet_storage_magic_type {
     228                libtrace_packet_t * packet;
     229                struct timeval tv;
     230        } * packets;
     231};
     232
     233#define TRACE_STATES \
     234        X(STATE_NEW) \
     235        X(STATE_RUNNING) \
     236        X(STATE_PAUSING) \
     237        X(STATE_PAUSED) \
     238        X(STATE_FINSHED) \
     239        X(STATE_DESTROYED) \
     240        X(STATE_JOINED) \
     241        X(STATE_ERROR)
     242
     243#define X(a) a,
     244enum trace_state {
     245        TRACE_STATES
     246};
     247#undef X
     248
     249#define X(a) case a: return #a;
     250static inline char *get_trace_state_name(enum trace_state ts){
     251        switch(ts) {
     252                TRACE_STATES
     253                default:
     254                        return "UNKNOWN";
     255        }
     256}
     257#undef X
    168258
    169259/** A libtrace input trace
     
    188278        uint64_t filtered_packets;     
    189279        /** The filename from the uri for the trace */
    190         char *uridata;                 
     280        char *uridata;
    191281        /** The libtrace IO reader for this trace (if applicable) */
    192         io_t *io;                       
     282        io_t *io;
    193283        /** Error information for the trace */
    194         libtrace_err_t err;             
     284        libtrace_err_t err;
    195285        /** Boolean flag indicating whether the trace has been started */
    196         bool started;                   
     286        bool started;
     287        /** Synchronise writes/reads across this format object and attached threads etc */
     288        pthread_mutex_t libtrace_lock;
     289        /** State */
     290        enum trace_state state;
     291        /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */
     292        pthread_cond_t perpkt_cond;
     293        /* Keep track of counts of threads in any given state */
     294        int perpkt_thread_states[THREAD_STATE_MAX];
     295
     296        /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */
     297        bool perpkt_queue_full;
     298        /** Global storage for this trace, shared among all the threads  */
     299        void* global_blob;
     300        /** The actual freelist */
     301        libtrace_ocache_t packet_freelist;
     302        /** User defined per_pkt function called when a pkt is ready */
     303        fn_per_pkt per_pkt;
     304        /** User defined reporter function entry point XXX not hooked up */
     305        fn_reporter reporter;
     306        /** The hasher function */
     307        enum hasher_types hasher_type;
     308        /** The hasher function - NULL implies they don't care or balance */
     309        fn_hasher hasher; // If valid using a separate thread
     310        void *hasher_data;
     311        /** The pread_packet choosen path for the configuration */
     312        int (*pread)(libtrace_t *, libtrace_thread_t *, libtrace_packet_t **, size_t);
     313
     314        libtrace_thread_t hasher_thread;
     315        libtrace_thread_t reporter_thread;
     316        libtrace_thread_t keepalive_thread;
     317        int perpkt_thread_count;
     318        libtrace_thread_t * perpkt_threads; // All our perpkt threads
     319        // Used to keep track of the first packet seen on each thread
     320        struct first_packets first_packets;
     321        int tracetime;
     322
     323        /*
     324         * Caches statistic counters in the case that our trace is
     325         * paused or stopped before this counter is taken
     326         */
     327        uint64_t dropped_packets;
     328        uint64_t received_packets;
     329        struct user_configuration config;
     330        libtrace_combine_t combiner;
    197331};
     332
     333void trace_fin_packet(libtrace_packet_t *packet);
     334void libtrace_zero_thread(libtrace_thread_t * t);
     335void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
     336libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
     337int get_thread_table_num(libtrace_t *libtrace);
     338
    198339
    199340/** A libtrace output trace
     
    202343struct libtrace_out_t {
    203344        /** The capture format for the output trace */
    204         struct libtrace_format_t *format;
     345        struct libtrace_format_t *format;
    205346        /** Pointer to the "global" data for the capture format module */
    206347        void *format_data;             
     
    210351        libtrace_err_t err;
    211352        /** Boolean flag indicating whether the trace has been started */
    212         bool started;                   
     353        bool started;
    213354};
    214355
     
    303444} PACKED libtrace_pflog_header_t;
    304445
    305 
    306 
    307446/** A libtrace capture format module */
    308447/* All functions should return -1, or NULL on failure */
     
    734873        /** Prints some useful help information to standard output. */
    735874        void (*help)(void);
    736 
     875       
    737876        /** Next pointer, should always be NULL - used by the format module
    738877         * manager. */
    739878        struct libtrace_format_t *next;
     879
     880        /** Holds information about the trace format */
     881        struct libtrace_info_t info;
     882
     883        /**
     884         * Starts or unpauses an input trace in parallel mode - note that
     885         * this function is often the one that opens the file or device for
     886         * reading.
     887         *
     888         * @param libtrace      The input trace to be started or unpaused
     889         * @return 0 upon success.
     890         *         Otherwise in event of an error -1 is returned.
     891         *
     892         */
     893        int (*pstart_input)(libtrace_t *trace);
     894       
     895        /**
     896         * Read a batch of packets from the input stream related to thread.
     897         * At most read nb_packets, however should return with less if packets
     898         * are not waiting. However still must return at least 1, 0 still indicates
     899         * EOF.
     900         *
     901         * @param libtrace      The input trace
     902         * @param t     The thread
     903         * @param packets       An array of packets
     904         * @param nb_packets    The number of packets in the array (the maximum to read)
     905         * @return The number of packets read, or 0 in the case of EOF or -1 in error or -2 to represent
     906         * interrupted due to message waiting before packets had been read.
     907         */
     908        int (*pread_packets)(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets);
     909       
     910        /** Pause a parallel trace
     911         *
     912         * @param libtrace      The input trace to be paused
     913         */
     914        int (*ppause_input)(libtrace_t *trace);
     915       
     916        /** Called after all threads have been paused, Finish (close) a parallel trace
     917         *
     918         * @param libtrace      The input trace to be stopped
     919         */
     920        int (*pfin_input)(libtrace_t *trace);
     921       
     922        /** Applies a configuration option to an input trace.
     923         *
     924         * @param libtrace      The input trace to apply the option to
     925         * @param option        The option that is being configured
     926         * @param value         A pointer to the value that the option is to be
     927         *                      set to
     928         * @return 0 if successful, -1 if the option is unsupported or an error
     929         * occurs
     930         */
     931        int (*pconfig_input)(libtrace_t *libtrace,trace_parallel_option_t option,void *value);
     932
     933        /**
     934         * Register a thread for use with the format or using the packets produced
     935         * by it. This is NOT only used for threads reading packets in fact all
     936         * threads use this.
     937         *
     938         * The libtrace lock is not held by this format but can be aquired
     939         * by the format.
     940         *
     941         * Some use cases include setting up any thread local storage required for
     942         * to read packets and free packets. For DPDK we require any thread that
     943         * may release or read a packet to have have an internal number associated
     944         * with it.
     945         *
     946         * The thread type can be used to see if this thread is going to be used
     947         * to read packets or otherwise.
     948         *
     949         * @return 0 if successful, -1 if the option is unsupported or an error
     950         * occurs (such as a maximum of threads being reached)
     951         */
     952        int (*pregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t, bool reader);
     953
     954        /**
     955         * If needed any memory allocated with pregister_thread can be released
     956         * in this function. The thread will be destroyed directly after this
     957         * function is called.
     958         */
     959        void (*punregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t);
    740960};
     961
     962/** Macro to zero out a single thread format */
     963#define NON_PARALLEL(live) \
     964{live, 1},              /* trace info */ \
     965NULL,                   /* pstart_input */ \
     966NULL,                   /* pread_packet */ \
     967NULL,                   /* ppause_input */ \
     968NULL,                   /* pfin_input */ \
     969NULL,                   /* pconfig_input */ \
     970NULL,                   /* pregister_thread */ \
     971NULL                    /* punregister_thread */
    741972
    742973/** The list of registered capture formats */
     
    9431174/** Constructor for the Linux Native format module */
    9441175void linuxnative_constructor(void);
     1176/** Constructor for the Linux Ring format module */
     1177void linuxring_constructor(void);
    9451178/** Constructor for the PCAP format module */
    9461179void pcap_constructor(void);
  • lib/trace.c

    r6fc1ae7 r6fc1ae7  
    9999#include "rt_protocol.h"
    100100
     101#include <pthread.h>
     102#include <signal.h>
     103
    101104#define MAXOPTS 1024
    102105
     
    106109
    107110volatile int libtrace_halt = 0;
     111/* Set once pstart is called used for backwards compatibility reasons */
     112int libtrace_parallel = 0;
    108113
    109114/* strncpy is not assured to copy the final \0, so we
     
    137142                legacy_constructor();
    138143                atmhdr_constructor();
     144                linuxring_constructor();
    139145                linuxnative_constructor();
    140146#ifdef HAVE_LIBPCAP
     
    253259        libtrace->filtered_packets = 0;
    254260        libtrace->accepted_packets = 0;
     261       
     262        /* Parallel inits */
     263        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     264        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
     265        libtrace->state = STATE_NEW;
     266        libtrace->perpkt_queue_full = false;
     267        libtrace->global_blob = NULL;
     268        libtrace->per_pkt = NULL;
     269        libtrace->reporter = NULL;
     270        libtrace->hasher = NULL;
     271        libtrace_zero_ocache(&libtrace->packet_freelist);
     272        libtrace_zero_thread(&libtrace->hasher_thread);
     273        libtrace_zero_thread(&libtrace->reporter_thread);
     274        libtrace_zero_thread(&libtrace->keepalive_thread);
     275        libtrace->reporter_thread.type = THREAD_EMPTY;
     276        libtrace->perpkt_thread_count = 0;
     277        libtrace->perpkt_threads = NULL;
     278        libtrace->tracetime = 0;
     279        libtrace->first_packets.first = 0;
     280        libtrace->first_packets.count = 0;
     281        libtrace->first_packets.packets = NULL;
     282        libtrace->dropped_packets = UINT64_MAX;
     283        libtrace->received_packets = UINT64_MAX;
     284        libtrace->pread = NULL;
     285        ZERO_USER_CONFIG(libtrace->config);
    255286
    256287        /* Parse the URI to determine what sort of trace we are dealing with */
     
    348379        libtrace->io = NULL;
    349380        libtrace->filtered_packets = 0;
     381       
     382        /* Parallel inits */
     383        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     384        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
     385        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
     386        libtrace->perpkt_queue_full = false;
     387        libtrace->global_blob = NULL;
     388        libtrace->per_pkt = NULL;
     389        libtrace->reporter = NULL;
     390        libtrace->hasher = NULL;
     391        libtrace_zero_ocache(&libtrace->packet_freelist);
     392        libtrace_zero_thread(&libtrace->hasher_thread);
     393        libtrace_zero_thread(&libtrace->reporter_thread);
     394        libtrace_zero_thread(&libtrace->keepalive_thread);
     395        libtrace->reporter_thread.type = THREAD_EMPTY;
     396        libtrace->perpkt_thread_count = 0;
     397        libtrace->perpkt_threads = NULL;
     398        libtrace->tracetime = 0;
     399        libtrace->pread = NULL;
     400        ZERO_USER_CONFIG(libtrace->config);
    350401       
    351402        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    583634 */
    584635DLLEXPORT void trace_destroy(libtrace_t *libtrace) {
    585         assert(libtrace);
     636        int i;
     637        assert(libtrace);
     638
     639        ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0);
     640        ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0);
     641
     642        /* destroy any packets that are still around */
     643        if (libtrace->state != STATE_NEW && libtrace->first_packets.packets) {