Changes in / [21f3226:97d170d]


Ignore:
Files:
34 added
24 edited

Legend:

Unmodified
Added
Removed
  • README

    ra7c8f4a ra7c8f4a  
     1This fork of Libtrace aims to support parallel packet processing.
     2
     3This is still work in progress and is full of bugs, some of the original
     4Libtrace functions might not function correctly breaking the supplied tools.
     5
    16libtrace 3.0.21
    27
  • configure.in

    r092a09c rd7fd648  
    392392fi
    393393
     394# If we use DPDK we might be able to use libnuma
     395AC_CHECK_LIB(numa, numa_node_to_cpus, have_numa=1, have_numa=0)
     396
    394397# Checks for various "optional" libraries
    395398AC_CHECK_LIB(pthread, pthread_create, have_pthread=1, have_pthread=0)
     
    411414AC_CHECK_LIB(rt, clock_gettime, have_clock_gettime=1, have_clock_gettime=0)
    412415LIBS=
     416
     417if test "$have_numa" = 1; then
     418        LIBTRACE_LIBS="$LIBTRACE_LIBS -lnuma"
     419        AC_DEFINE(HAVE_LIBNUMA, 1, [Set to 1 if libnuma is found supported])
     420        with_numa=yes
     421else
     422        AC_DEFINE(HAVE_LIBNUMA, 0, [Set to 1 if libnuma is found supported])
     423        with_numa=no
     424fi
    413425
    414426if test "$dlfound" = 0; then
     
    685697if test x"$libtrace_dpdk" = xtrue; then
    686698    AC_MSG_NOTICE([Compiled with DPDK live capture support: Yes])
     699    reportopt "Compiled with DPDK trace NUMA support" $with_numa
    687700elif test x"$want_dpdk" != "xno"; then
    688701#   We don't officially support DPDK so only report failure if the user
  • lib/Makefile.am

    re5dedd5 r2498008  
    22include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 
    33
    4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@
    5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@
     4AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread
     5AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread
    66
    77extra_DIST = format_template.c
     
    4444endif
    4545
    46 libtrace_la_SOURCES = trace.c common.h \
     46libtrace_la_SOURCES = trace.c trace_parallel.c common.h \
    4747                format_erf.c format_pcap.c format_legacy.c \
    4848                format_rt.c format_helper.c format_helper.h format_pcapfile.c \
     
    5656                $(DAGSOURCE) format_erf.h \
    5757                $(BPFJITSOURCE) \
    58                 libtrace_arphrd.h
     58                libtrace_arphrd.h \
     59                data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \
     60                data-struct/deque.c data-struct/sliding_window.c data-struct/object_cache.c \
     61                hash_toeplitz.c combiner_ordered.c combiner_sorted.c combiner_unordered.c
    5962
    6063if DAG2_4
  • lib/format_atmhdr.c

    r5952ff0 rb13b939  
    231231        trace_event_trace,              /* trace_event */
    232232        NULL,                           /* help */
    233         NULL                            /* next pointer */
     233        NULL,                            /* next pointer */
     234        NON_PARALLEL(false)
    234235};
    235236       
  • lib/format_bpf.c

    r08f5060 r08f5060  
    614614        trace_event_device,     /* trace_event */
    615615        bpf_help,               /* help */
    616         NULL
     616        NULL,                   /* next pointer */
     617        NON_PARALLEL(true)
    617618};
    618619#else   /* HAVE_DECL_BIOCSETIF */
     
    663664        NULL,                   /* trace_event */
    664665        bpf_help,               /* help */
    665         NULL
     666        NULL,                   /* next pointer */
     667        NON_PARALLEL(true)
    666668};
    667669#endif  /* HAVE_DECL_BIOCSETIF */
  • lib/format_dag24.c

    rc70f59f rc70f59f  
    558558        trace_event_dag,                /* trace_event */
    559559        dag_help,                       /* help */
    560         NULL                            /* next pointer */
     560        NULL,                            /* next pointer */
     561    NON_PARALLEL(true)
    561562};
    562563
  • lib/format_dag25.c

    r70bf39a rc66a465  
    8282#define DATA(x) ((struct dag_format_data_t *)x->format_data)
    8383#define DATA_OUT(x) ((struct dag_format_data_out_t *)x->format_data)
     84#define PERPKT_DATA(x) ((struct dag_per_thread_t *)(x->format_data))
    8485
    8586#define FORMAT_DATA DATA(libtrace)
     
    114115        /* A buffer to hold the data to be transmittted */
    115116        uint8_t *txbuffer;
     117};
     118
     119/* Data that is stored for each libtrace_thread_t */
     120struct dag_per_thread_t {
     121        struct dag_dev_t *device; /* DAG device */
     122        uint16_t stream; /* Stream number */
     123        uint8_t *top; /* Pointer to the last unread byte in the DAG memory */
     124        uint8_t *bottom; /* Pointer to the first unread byte in the DAG memory */
     125        uint32_t processed; /* Amount of data processed from the bottom pointer */
     126        uint64_t pkt_count; /* Number of packets seen by the thread */
     127        uint64_t drops;
    116128};
    117129
     
    146158        /* The number of packets that have been dropped */
    147159        uint64_t drops;
     160        /* When running in parallel mode this is malloc'd with an array of thread
     161         * structures. Most of the stuff above doesn't get used in parallel mode. */
     162        struct dag_per_thread_t *per_thread;
    148163
    149164        uint8_t seeninterface[4];
     
    453468        char *dag_dev_name = NULL;
    454469        char *scan = NULL;
    455         int stream = 0;
     470        int stream = 0, thread_count = 1;
    456471        struct dag_dev_t *dag_device = NULL;
    457472
     
    460475         * list */
    461476        pthread_mutex_lock(&open_dag_mutex);
    462        
    463        
    464         /* Specific streams are signified using a comma in the libtrace URI,
     477
     478
     479        /* DAG cards support multiple streams. In a single threaded capture,
     480         * these are specified using a comma in the libtrace URI,
    465481         * e.g. dag:/dev/dag0,2 refers to stream 2 on the dag0 device.
    466482         *
    467          * If no stream is specified, we will read from stream 0 */
    468         if ((scan = strchr(libtrace->uridata,',')) == NULL) {
     483         * If no stream is specified, we will read from stream 0 with one thread
     484         */
     485        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
    469486                dag_dev_name = strdup(libtrace->uridata);
     487               
    470488        } else {
    471489                dag_dev_name = (char *)strndup(libtrace->uridata,
    472                                 (size_t)(scan - libtrace->uridata));
     490                                                                           (size_t)(scan - libtrace->uridata));
    473491                stream = atoi(++scan);
    474492        }
     
    805823
    806824        dag_record_t *erfptr;
     825        libtrace_thread_t *t;
    807826       
    808827        /* If the packet previously owned a buffer that is not the buffer
     
    841860
    842861        /* Update the dropped packets counter */
    843 
    844862        /* No loss counter for DSM coloured records - have to use
    845863         * some other API */
     864        /* Adding multithread support for this isn't actually that useful for the
     865         * DAG7.5G2, as there's no way to use multiple receive streams without DSM */
    846866        if (erfptr->type == TYPE_DSM_COLOR_ETH) {
    847867                /* TODO */
    848868        } else {
    849869                /* Use the ERF loss counter */
    850                 if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
    851                         FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
     870                if (DATA(libtrace)->per_thread) {
     871                        t = get_thread_table(libtrace);
     872                        PERPKT_DATA(t)->drops += ntohs(erfptr->lctr);
    852873                } else {
    853                         FORMAT_DATA->drops += ntohs(erfptr->lctr);
    854                 }
    855         }
     874                        if (FORMAT_DATA->seeninterface[erfptr->flags.iface] == 0) {
     875                                FORMAT_DATA->seeninterface[erfptr->flags.iface] = 1;
     876                        } else {
     877                                FORMAT_DATA->drops += ntohs(erfptr->lctr);
     878                        }
     879                }
     880        }
     881
    856882
    857883        return 0;
     
    12081234/* Gets the number of dropped packets */
    12091235static uint64_t dag_get_dropped_packets(libtrace_t *trace) {
     1236        uint64_t sum = 0;
     1237        int i, tot;
     1238
    12101239        if (trace->format_data == NULL)
    12111240                return (uint64_t)-1;
    1212         return DATA(trace)->drops;
     1241
     1242        if (DATA(trace)->per_thread) {
     1243                tot = trace->perpkt_thread_count;
     1244
     1245                for (i = 0; i < tot; i++) {
     1246                        printf("t%d: drops %" PRIu64 "\n",
     1247                                   DATA(trace)->per_thread[i].drops);
     1248                        sum += DATA(trace)->per_thread[i].drops;
     1249                }
     1250        }
     1251
     1252        sum += DATA(trace)->drops;
     1253
     1254        return sum;
    12131255}
    12141256
     
    12241266        printf("\tnone\n");
    12251267        printf("\n");
     1268}
     1269
     1270static int dag_pstart_input(libtrace_t *libtrace) {
     1271        char *scan, *tok;
     1272        uint16_t stream_count = 0, max_streams;
     1273        /* We keep our own pointer to per_thread as the system will free
     1274         * up FORMAT_DATA without freeing this if something goes wrong */
     1275        struct dag_per_thread_t *per_thread = NULL;
     1276        int iserror = 0;
     1277
     1278        /* Check we aren't trying to create more threads than the DAG card can
     1279         * handle */
     1280        max_streams = dag_rx_get_stream_count(FORMAT_DATA->device->fd);
     1281        if (libtrace->perpkt_thread_count > max_streams) {
     1282                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "trying to create too "
     1283                                          "many threads (max is %u)", max_streams);
     1284                iserror = 1;
     1285                goto cleanup;
     1286        }
     1287
     1288        /* Create the thread structures */
     1289        per_thread = calloc(libtrace->perpkt_thread_count,
     1290                                                                         sizeof(struct dag_per_thread_t));
     1291        FORMAT_DATA->per_thread = per_thread;
     1292
     1293        /* Get the stream names from the uri */
     1294        if ((scan = strchr(libtrace->uridata, ',')) == NULL) {
     1295                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri doesn't "
     1296                                          "specify the DAG streams");
     1297                iserror = 1;
     1298                goto cleanup;
     1299        }
     1300
     1301        scan++;
     1302 
     1303        tok = strtok(scan, ",");
     1304        while (tok != NULL) {
     1305                /* Ensure we haven't specified too many streams */
     1306                if (stream_count >= libtrace->perpkt_thread_count) {
     1307                        trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, "format uri specifies too "
     1308                                          "many streams. Max is %u", max_streams);
     1309                        iserror = 1;
     1310                        goto cleanup;
     1311                }
     1312
     1313                /* Save the stream details */
     1314                per_thread[stream_count].device = FORMAT_DATA->device;
     1315                per_thread[stream_count++].stream = (uint16_t)atoi(tok);
     1316
     1317                tok = strtok(NULL, ",");
     1318        }
     1319
     1320 cleanup:
     1321        if (iserror) {
     1322                /* Free the per_thread memory */
     1323                free(per_thread);
     1324               
     1325                return -1;
     1326        } else {
     1327                return 0;
     1328        }
     1329}
     1330
     1331
     1332
     1333/* TODO: Fold this into dag_available */
     1334static int dag_pavailable(libtrace_t *libtrace, libtrace_thread_t *t) {
     1335        uint32_t diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
     1336
     1337        /* If we've processed more than 4MB of data since we last called
     1338         * dag_advance_stream, then we should call it again to allow the
     1339         * space occupied by that 4MB to be released */
     1340        if (diff >= dag_record_size && PERPKT_DATA(t)->processed < 4 * 1024 * 1024)
     1341                return diff;
     1342
     1343        /* Update top and bottom pointers */
     1344        PERPKT_DATA(t)->top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
     1345                                                                                         PERPKT_DATA(t)->stream,
     1346                                                                                         &(PERPKT_DATA(t)->bottom));
     1347
     1348        if (PERPKT_DATA(t)->top == NULL) {
     1349                trace_set_err(libtrace, errno, "dag_advance_stream failed!");
     1350                return -1;
     1351        }
     1352
     1353        PERPKT_DATA(t)->processed = 0;
     1354        diff = PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom;
     1355        return diff;
     1356}
     1357
     1358/* TODO: Fold this into dag_get_record */
     1359static dag_record_t *dag_pget_record(libtrace_t *libtrace,
     1360                                                                         libtrace_thread_t *t) {
     1361        dag_record_t *erfptr = NULL;
     1362        uint16_t size;
     1363
     1364        erfptr = (dag_record_t *)PERPKT_DATA(t)->bottom;
     1365        if (!erfptr)
     1366                return NULL;
     1367
     1368        /* Ensure we have a whole record */
     1369        size = ntohs(erfptr->rlen);
     1370        assert(size >= dag_record_size);
     1371        if (size > (PERPKT_DATA(t)->top - PERPKT_DATA(t)->bottom))
     1372                return NULL;
     1373
     1374        /* Advance the buffer pointers */
     1375        PERPKT_DATA(t)->bottom += size;
     1376        PERPKT_DATA(t)->processed += size;
     1377       
     1378        return erfptr;
     1379}
     1380
     1381static int dag_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,
     1382                                                        libtrace_packet_t *packet) {
     1383        dag_record_t *erfptr = NULL;
     1384        int numbytes = 0;
     1385        uint32_t flags = 0;
     1386        struct timeval maxwait, pollwait;
     1387
     1388        pollwait.tv_sec = 0;
     1389        pollwait.tv_usec = 10000;
     1390        maxwait.tv_sec = 0;
     1391        maxwait.tv_usec = 250000;
     1392
     1393        /* TODO: Support DUCK reporting */
     1394
     1395        /* Don't let anyone try to free our DAG memory hole! */
     1396        flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
     1397
     1398        /* If the packet buffer is currently owned by libtrace, free it so
     1399         * that we can set the packet to point into the DAG memory hole */
     1400        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1401                free(packet->buffer);
     1402                packet->buffer = 0;
     1403        }
     1404
     1405        /* Configure DAG card stream polling */
     1406        if (dag_set_stream_poll(PERPKT_DATA(t)->device->fd, PERPKT_DATA(t)->stream,
     1407                                                        sizeof(dag_record_t), &maxwait, &pollwait) < 0) {
     1408                trace_set_err(libtrace, errno, "dag_set_stream_poll");
     1409                return -1;
     1410        }
     1411
     1412        /* Grab an ERF record */
     1413        do {
     1414                numbytes = dag_pavailable(libtrace, t);
     1415                if (numbytes < 0)
     1416                        return numbytes;
     1417                if (numbytes < dag_record_size) {
     1418                        if (libtrace_halt)
     1419                                return 0;
     1420
     1421                        /* Check message queue to see if we should abort early */
     1422                        if (libtrace_message_queue_count(&t->messages) > 0)
     1423                                return -2;
     1424
     1425                        /* Keep trying until we see a packet */
     1426                        continue;
     1427                }
     1428
     1429                erfptr = dag_pget_record(libtrace, t);
     1430        } while (erfptr == NULL);
     1431
     1432        /* Prepare the libtrace packet */
     1433        if (dag_prepare_packet(libtrace, packet, erfptr, TRACE_RT_DATA_ERF,
     1434                                                   flags))
     1435                return -1;
     1436
     1437        PERPKT_DATA(t)->pkt_count++;
     1438
     1439        return packet->payload ? htons(erfptr->rlen) :
     1440                erf_get_framing_length(packet);
     1441}
     1442
     1443static int dag_ppause_input(libtrace_t *libtrace) {
     1444        int i, tot = libtrace->perpkt_thread_count;
     1445        struct dag_per_thread_t *t_data;
     1446
     1447        /* Stop and detach all the streams */
     1448        printf("Stopping and detaching all streams\n");
     1449        for (i = 0; i < tot; i++) {
     1450                t_data = &FORMAT_DATA->per_thread[i];
     1451
     1452                if (dag_stop_stream(t_data->device->fd,
     1453                                                        t_data->stream) < 0) {
     1454                        trace_set_err(libtrace, errno, "can't stop DAG stream #%u",
     1455                                                  t_data->stream);
     1456                        return -1;
     1457                }
     1458
     1459                if (dag_detach_stream(t_data->device->fd,
     1460                                                          t_data->stream) < 0) {
     1461                        trace_set_err(libtrace, errno, "can't detach DAG stream #%u",
     1462                                                  t_data->stream);
     1463                        return -1;
     1464                }
     1465        }
     1466
     1467        /* Free up the per_thread array */
     1468        free(FORMAT_DATA->per_thread);
     1469        FORMAT_DATA->per_thread = NULL;
     1470
     1471        return 0;
     1472}
     1473
     1474static int dag_pconfig_input(libtrace_t *libtrace,
     1475                                                         trace_parallel_option_t option, void *value) {
     1476
     1477        /* We don't support any of these! Normally you configure the DAG card
     1478         * externally. */
     1479        switch(option) {
     1480        case TRACE_OPTION_SET_HASHER:
     1481        case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
     1482        case TRACE_OPTION_TRACETIME:
     1483        case TRACE_OPTION_TICK_INTERVAL:
     1484        case TRACE_OPTION_GET_CONFIG:
     1485        case TRACE_OPTION_SET_CONFIG:
     1486                return -1;
     1487        }
     1488        /* We don't provide a default option to ensure that future options will
     1489         * generate a compiler warning. */
     1490
     1491        return -1;
     1492}
     1493
     1494static int dag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     1495                                                                bool reader) {
     1496        /* XXX: This function gets run sequentially for all
     1497         * threads. Should investigate making it parallel as draining the
     1498         * memory could be needlessly time consuming.
     1499         */
     1500        uint8_t *top, *bottom;
     1501        uint8_t diff = 0; /* XXX: Investigate this type, as I would assume the value
     1502                                           * could be larger than 255 */
     1503        struct timeval zero, nopoll;
     1504
     1505        top = bottom = NULL;
     1506
     1507        /* Minimum delay is 10mS */
     1508        zero.tv_sec = 0;
     1509        zero.tv_usec = 10000;
     1510        nopoll = zero;
     1511
     1512        if (reader) {
     1513                if (t->type == THREAD_PERPKT) {
     1514                        /* Pass the per thread data to the thread */
     1515                        t->format_data = &FORMAT_DATA->per_thread[t->perpkt_num];
     1516
     1517                        /* Attach and start the DAG stream */
     1518                        printf("t%u: starting and attaching stream #%u\n", t->perpkt_num,
     1519                                   PERPKT_DATA(t)->stream);
     1520                        if (dag_attach_stream(PERPKT_DATA(t)->device->fd,
     1521                                                                  PERPKT_DATA(t)->stream, 0, 0) < 0) {
     1522                                trace_set_err(libtrace, errno, "can't attach DAG stream #%u",
     1523                                                          PERPKT_DATA(t)->stream);
     1524                                return -1;
     1525                        }
     1526                        if (dag_start_stream(PERPKT_DATA(t)->device->fd,
     1527                                                                 PERPKT_DATA(t)->stream) < 0) {
     1528                                trace_set_err(libtrace, errno, "can't start DAG stream #%u",
     1529                                                          PERPKT_DATA(t)->stream);
     1530                                return -1;
     1531                        }
     1532
     1533                        /* Ensure that dag_advance_stream will return without blocking */
     1534                        if(dag_set_stream_poll(PERPKT_DATA(t)->device->fd,
     1535                                                                   PERPKT_DATA(t)->stream, 0, &zero,
     1536                                                                   &nopoll) < 0) {
     1537                                trace_set_err(libtrace, errno, "dag_set_stream_poll failed!");
     1538                                return -1;
     1539                        }
     1540
     1541                        /* Clear all the data from the memory hole */
     1542                        do {
     1543                                top = dag_advance_stream(PERPKT_DATA(t)->device->fd,
     1544                                                                                 PERPKT_DATA(t)->stream,
     1545                                                                                 &bottom);
     1546
     1547                                assert(top && bottom);
     1548                                diff = top - bottom;
     1549                                bottom -= diff;
     1550                        } while (diff != 0);
     1551
     1552                        PERPKT_DATA(t)->top = NULL;
     1553                        PERPKT_DATA(t)->bottom = NULL;
     1554                        PERPKT_DATA(t)->pkt_count = 0;
     1555                        PERPKT_DATA(t)->drops = 0;
     1556                } else {
     1557                        /* TODO: Figure out why we need this */
     1558                        t->format_data = &FORMAT_DATA->per_thread[0];
     1559                }
     1560        }
     1561
     1562        fprintf(stderr, "t%u: registered thread\n", t->perpkt_num);
     1563
     1564        return 0;
    12261565}
    12271566
     
    12661605        trace_event_dag,                /* trace_event */
    12671606        dag_help,                       /* help */
    1268         NULL                            /* next pointer */
     1607        NULL,                            /* next pointer */
     1608                {true, 0}, /* live packet capture, thread limit TBD */
     1609                dag_pstart_input,
     1610                dag_pread_packet,
     1611                dag_ppause_input,
     1612                NULL,
     1613                dag_pconfig_input,
     1614                dag_pregister_thread,
     1615                NULL
    12691616};
    12701617
  • lib/format_dpdk.c

    r9f43919 r1960910  
    22 * This file is part of libtrace
    33 *
    4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton, 
     4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
    55 * New Zealand.
    66 *
    7  * Author: Richard Sanger 
    8  *         
     7 * Author: Richard Sanger
     8 *
    99 * All rights reserved.
    1010 *
    11  * This code has been developed by the University of Waikato WAND 
     11 * This code has been developed by the University of Waikato WAND
    1212 * research group. For further information please see http://www.wand.net.nz/
    1313 *
     
    3535 * Intel Data Plane Development Kit is a LIVE capture format.
    3636 *
    37  * This format also supports writing which will write packets out to the 
    38  * network as a form of packet replay. This should not be confused with the 
    39  * RT protocol which is intended to transfer captured packet records between 
     37 * This format also supports writing which will write packets out to the
     38 * network as a form of packet replay. This should not be confused with the
     39 * RT protocol which is intended to transfer captured packet records between
    4040 * RT-speaking programs.
    4141 */
     42
     43#define _GNU_SOURCE
    4244
    4345#include "config.h"
     
    4648#include "format_helper.h"
    4749#include "libtrace_arphrd.h"
     50#include "hash_toeplitz.h"
    4851
    4952#ifdef HAVE_INTTYPES_H
     
    5962#include <string.h>
    6063
     64#if HAVE_LIBNUMA
     65#include <numa.h>
     66#endif
     67
    6168/* We can deal with any minor differences by checking the RTE VERSION
    6269 * Typically DPDK backports some fixes (typically for building against
     
    8693/* 1.6.0r2 :
    8794 *      rte_eal_pci_set_blacklist() is removed
    88  *      device_list is renamed ot pci_device_list
     95 *      device_list is renamed to pci_device_list
     96 *      In the 1.7.0 release rte_eal_pci_probe is called by rte_eal_init
     97 *      as such we do apply the whitelist before rte_eal_init.
     98 *      This also works correctly with DPDK 1.6.0r2.
    8999 *
    90100 * Replaced by:
     
    124134#include <rte_mempool.h>
    125135#include <rte_mbuf.h>
    126 
    127 /* The default size of memory buffers to use - This is the max size of standard
     136#include <rte_launch.h>
     137#include <rte_lcore.h>
     138#include <rte_per_lcore.h>
     139#include <rte_cycles.h>
     140#include <pthread.h>
     141
     142/* The default size of memory buffers to use - This is the max size of standard
    128143 * ethernet packet less the size of the MAC CHECKSUM */
    129144#define RX_MBUF_SIZE 1514
    130145
    131 /* The minimum number of memory buffers per queue tx or rx. Search for 
     146/* The minimum number of memory buffers per queue tx or rx. Search for
    132147 * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
    133148 */
     
    147162#define NB_TX_MBUF 1024
    148163
    149 /* The size of the PCI blacklist needs to be big enough to contain 
     164/* The size of the PCI blacklist needs to be big enough to contain
    150165 * every PCI device address (listed by lspci every bus:device.function tuple).
    151166 */
     
    154169/* The maximum number of characters the mempool name can be */
    155170#define MEMPOOL_NAME_LEN 20
     171
     172/* For single threaded libtrace we read packets as a batch/burst
     173 * this is the maximum size of said burst */
     174#define BURST_SIZE 50
    156175
    157176#define MBUF(x) ((struct rte_mbuf *) x)
     
    159178#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    160179#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     180#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     181
    161182#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    162                         (uint64_t) tv.tv_usec*1000ull)
     183                        (uint64_t) tv.tv_usec*1000ull)
    163184#define TS_TO_NS(ts) ((uint64_t) ts.tv_sec*1000000000ull + \
    164                         (uint64_t) ts.tv_nsec)
     185                        (uint64_t) ts.tv_nsec)
    165186
    166187#if RTE_PKTMBUF_HEADROOM != 128
    167188#warning "RTE_PKT_MBUF_HEADROOM is not set to the default value of 128 - " \
    168         "any libtrace instance processing these packet must be have the" \
    169         "same RTE_PKTMBUF_HEADROOM set"
     189        "any libtrace instance processing these packet must be have the" \
     190        "same RTE_PKTMBUF_HEADROOM set"
    170191#endif
    171192
    172193/* ~~~~~~~~~~~~~~~~~~~~~~ Advance settings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    173  * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK 
    174  * 
     194 * THESE MAY REQUIRE MODIFICATIONS TO INTEL DPDK
     195 *
    175196 * Make sure you understand what these are doing before enabling them.
    176197 * They might make traces incompatable with other builds etc.
    177  * 
     198 *
    178199 * These are also included to show how to do somethings which aren't
    179200 * obvious in the DPDK documentation.
     
    181202
    182203/* Print verbose messages to stdout */
    183 #define DEBUG 0
    184 
    185 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() 
    186  * only turn on if you know clock_gettime is a vsyscall on your system 
     204#define DEBUG 1
     205
     206/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     207 * only turn on if you know clock_gettime is a vsyscall on your system
    187208 * overwise could be a large overhead. Again gettimeofday() should be
    188209 * vsyscall also if it's not you should seriously consider updating your
     
    191212#ifdef HAVE_LIBRT
    192213/* You can turn this on (set to 1) to prefer clock_gettime */
    193 #define USE_CLOCK_GETTIME 0
     214#define USE_CLOCK_GETTIME 1
    194215#else
    195216/* DONT CHANGE THIS !!! */
    196 #define USE_CLOCK_GETTIME 0
     217#define USE_CLOCK_GETTIME 1
    197218#endif
    198219
     
    202223 * hence writing out a port such as int: ring: and dpdk: assumes there
    203224 * is no checksum and will attempt to write the checksum as part of the
    204  * packet 
     225 * packet
    205226 */
    206227#define GET_MAC_CRC_CHECKSUM 0
    207228
    208229/* This requires a modification of the pmd drivers (inside Intel DPDK)
     230 * TODO this requires updating (packet sizes are wrong TS most likely also)
    209231 */
    210232#define HAS_HW_TIMESTAMPS_82580 0
     
    230252};
    231253
     254struct dpdk_per_lcore_t
     255{
     256        uint16_t queue_id;
     257        uint8_t port;
     258        uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
     259#if HAS_HW_TIMESTAMPS_82580
     260        /* Timestamping only relevent to RX */
     261        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
     262        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
     263#endif
     264};
     265
    232266/* Used by both input and output however some fields are not used
    233267 * for output */
     
    236270    uint8_t port; /* Always 0 we only whitelist a single port - Shared TX & RX */
    237271    uint8_t nb_ports; /* Total number of usable ports on system should be 1 */
    238     uint8_t paused; /* See paused_state */ 
     272    uint8_t paused; /* See paused_state */
    239273    uint16_t queue_id; /* Always 0 we use a single queue - Shared TX & RX */
     274    uint16_t link_speed; /* Link speed 10,100,1000,10000 etc. */
    240275    int snaplen; /* The snap length for the capture - RX only */
    241276    /* We always have to setup both rx and tx queues even if we don't want them */
    242277    int nb_rx_buf; /* The number of packet buffers in the rx ring */
    243278    int nb_tx_buf; /* The number of packet buffers in the tx ring */
     279    int nic_numa_node; /* The NUMA node that the NIC is attached to */
    244280    struct rte_mempool * pktmbuf_pool; /* Our packet memory pool */
    245281#if DPDK_USE_BLACKLIST
     
    248284#endif
    249285    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    250 #if HAS_HW_TIMESTAMPS_82580
    251     /* Timestamping only relevent to RX */
    252     uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
    253     uint64_t ts_last_sys; /* System timestamp of our most recent packet in nanoseconds */
    254     uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    255 #endif
     286    uint8_t rss_key[40]; // This is the RSS KEY
     287    /* To improve performance we always batch reading packets, in a burst */
     288    struct rte_mbuf* burst_pkts[BURST_SIZE];
     289    int burst_size; /* The total number read in the burst */
     290    int burst_offset; /* The offset we are into the burst */
     291        // DPDK normally seems to have a limit of 8 queues for a given card
     292        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
    256293};
    257294
     
    261298};
    262299
    263 /** 
     300/**
    264301 * A structure placed in front of the packet where we can store
    265302 * additional information about the given packet.
     
    267304 * |       rte_mbuf (pkt)     | sizeof(rte_mbuf)
    268305 * +--------------------------+
    269  * |           padding        | RTE_PKTMBUF_HEADROOM-1-sizeof(dpdk_addt_hdr)
    270  * +--------------------------+
    271306 * |       dpdk_addt_hdr      | sizeof(dpdk_addt_hdr)
    272307 * +--------------------------+
    273  * |   sizeof(dpdk_addt_hdr)  | 1 byte
    274  * +--------------------------+ 
     308 * |           padding        | RTE_PKTMBUF_HEADROOM-sizeof(dpdk_addt_hdr)
     309 * +--------------------------+
    275310 * *   hw_timestamp_82580     * 16 bytes Optional
    276311 * +--------------------------+
     
    290325 * We want to blacklist all devices except those on the whitelist
    291326 * (I say list, but yes it is only the one).
    292  * 
     327 *
    293328 * The default behaviour of rte_pci_probe() will map every possible device
    294329 * to its DPDK driver. The DPDK driver will take the ethernet device
    295330 * out of the kernel (i.e. no longer /dev/ethx) and cannot be used.
    296  * 
    297  * So blacklist all devices except the one that we wish to use so that 
     331 *
     332 * So blacklist all devices except the one that we wish to use so that
    298333 * the others can still be used as standard ethernet ports.
    299334 *
     
    309344
    310345        TAILQ_FOREACH(dev, &device_list, next) {
    311         if (whitelist != NULL && whitelist->domain == dev->addr.domain
    312             && whitelist->bus == dev->addr.bus
    313             && whitelist->devid == dev->addr.devid
    314             && whitelist->function == dev->addr.function)
    315             continue;
     346        if (whitelist != NULL && whitelist->domain == dev->addr.domain
     347            && whitelist->bus == dev->addr.bus
     348            && whitelist->devid == dev->addr.devid
     349            && whitelist->function == dev->addr.function)
     350            continue;
    316351                if (format_data->nb_blacklist >= sizeof (format_data->blacklist)
    317                                 / sizeof (format_data->blacklist[0])) {
     352                                / sizeof (format_data->blacklist[0])) {
    318353                        printf("Warning: too many devices to blacklist consider"
    319                                         " increasing BLACK_LIST_SIZE");
     354                                        " increasing BLACK_LIST_SIZE");
    320355                        break;
    321356                }
     
    329364#else /* DPDK_USE_BLACKLIST */
    330365#include <rte_devargs.h>
    331 static int blacklist_devices(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
     366static int whitelist_device(struct dpdk_format_data_t *format_data UNUSED, struct rte_pci_addr *whitelist)
    332367{
    333368        char pci_str[20] = {0};
    334369        snprintf(pci_str, sizeof(pci_str), PCI_PRI_FMT,
    335                 whitelist->domain,
    336                 whitelist->bus,
    337                 whitelist->devid,
    338                 whitelist->function);
     370                whitelist->domain,
     371                whitelist->bus,
     372                whitelist->devid,
     373                whitelist->function);
    339374        if (rte_eal_devargs_add(RTE_DEVTYPE_WHITELISTED_PCI, pci_str) < 0) {
    340375                return -1;
     
    348383 * Fills in addr, note core is optional and is unchanged if
    349384 * a value for it is not provided.
    350  * 
     385 *
    351386 * i.e. ./libtrace dpdk:0:1:0.0 -> 0:1:0.0
    352  * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2) 
     387 * or ./libtrace dpdk:0:1:0.1-2 -> 0:1:0.1 (Using CPU core #2)
    353388 */
    354389static int parse_pciaddr(char * str, struct rte_pci_addr * addr, long * core) {
    355     char * wrkstr;
    356     char * pch;
     390    int matches;
    357391    assert(str);
    358     wrkstr = strdup(str);
    359    
    360     pch = strtok(wrkstr,":");
    361     if (pch == NULL || pch[0] == 0) {
    362         free(wrkstr); return -1;
    363     }
    364     addr->domain = (uint16_t) atoi(pch);
    365 
    366     pch = strtok(NULL,":");
    367     if (pch == NULL || pch[0] == 0) {
    368         free(wrkstr); return -1;
    369     }
    370     addr->bus = (uint8_t) atoi(pch);
    371 
    372     pch = strtok(NULL,".");
    373     if (pch == NULL || pch[0] == 0) {
    374         free(wrkstr); return -1;
    375     }
    376     addr->devid = (uint8_t) atoi(pch);
    377 
    378     pch = strtok(NULL,"-"); /* Might not find the '-' it's optional */
    379     if (pch == NULL || pch[0] == 0) {
    380         free(wrkstr); return -1;
    381     }
    382     addr->function = (uint8_t) atoi(pch);
    383 
    384     pch = strtok(NULL, ""); /* Find end of string */
    385    
    386     if (pch != NULL && pch[0] != 0) {
    387         *core = (long) atoi(pch);
    388     }
    389 
    390     free(wrkstr);
    391     return 0;
     392    matches = sscanf(str, "%4"SCNx16":%2"SCNx8":%2"SCNx8".%2"SCNx8"-%ld", &addr->domain, &addr->bus, &addr->devid, &addr->function, core);
     393    if (matches >= 4) {
     394        return 0;
     395    } else {
     396        return -1;
     397    }
     398}
     399
     400/**
     401 * Convert a pci address to the numa node it is
     402 * connected to.
     403 *
     404 * This checks /sys/bus/pci/devices/XXXX:XX:XX.X/numa_node
     405 * so we can call it before DPDK
     406 *
     407 * @return -1 if unknown otherwise a number 0 or higher of the numa node
     408 */
     409static int pci_to_numa(struct rte_pci_addr * dev_addr) {
     410        char path[50] = {0};
     411        FILE *file;
     412
     413        /* Read from the system */
     414        snprintf(path, sizeof(path), "/sys/bus/pci/devices/"PCI_PRI_FMT"/numa_node",
     415                 dev_addr->domain,
     416                 dev_addr->bus,
     417                 dev_addr->devid,
     418                 dev_addr->function);
     419
     420        if((file = fopen(path, "r")) != NULL) {
     421                int numa_node = -1;
     422                fscanf(file, "%d", &numa_node);
     423                fclose(file);
     424                return numa_node;
     425        }
     426        return -1;
    392427}
    393428
     
    398433    struct rte_config * global_config;
    399434    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    400    
     435
    401436    if (nb_cpu <= 0) {
    402         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    403         nb_cpu = 1; /* fallback to just 1 core */
     437        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
     438        nb_cpu = 1; /* fallback to just 1 core */
    404439    }
    405440    if (nb_cpu > RTE_MAX_LCORE)
    406         nb_cpu = RTE_MAX_LCORE;
    407    
     441        nb_cpu = RTE_MAX_LCORE;
     442
    408443    global_config = rte_eal_get_configuration();
    409    
     444
    410445    if (global_config != NULL) {
    411         int i;
    412         fprintf(stderr, "Intel DPDK setup\n"
    413                "---Version      : %"PRIu32"\n"
    414                "---Magic        : %"PRIu32"\n"
    415                "---Master LCore : %"PRIu32"\n"
    416                "---LCore Count  : %"PRIu32"\n",
    417                global_config->version, global_config->magic,
    418                global_config->master_lcore, global_config->lcore_count);
    419        
    420         for (i = 0 ; i < nb_cpu; i++) {
    421             fprintf(stderr, "   ---Core %d : %s\n", i,
    422                    global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
    423         }
    424        
    425         const char * proc_type;
    426         switch (global_config->process_type) {
    427             case RTE_PROC_AUTO:
    428                 proc_type = "auto";
    429                 break;
    430             case RTE_PROC_PRIMARY:
    431                 proc_type = "primary";
    432                 break;
    433             case RTE_PROC_SECONDARY:
    434                 proc_type = "secondary";
    435                 break;
    436             case RTE_PROC_INVALID:
    437                 proc_type = "invalid";
    438                 break;
    439             default:
    440                 proc_type = "something worse than invalid!!";
    441         }
    442         fprintf(stderr, "---Process Type : %s\n", proc_type);
    443     }
    444    
    445 }
    446 #endif
     446        int i;
     447        fprintf(stderr, "Intel DPDK setup\n"
     448               "---Version      : %s\n"
     449               "---Master LCore : %"PRIu32"\n"
     450               "---LCore Count  : %"PRIu32"\n",
     451               rte_version(),
     452               global_config->master_lcore, global_config->lcore_count);
     453
     454        for (i = 0 ; i < nb_cpu; i++) {
     455            fprintf(stderr, "   ---Core %d : %s\n", i,
     456                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
     457        }
     458
     459        const char * proc_type;
     460        switch (global_config->process_type) {
     461            case RTE_PROC_AUTO:
     462                proc_type = "auto";
     463                break;
     464            case RTE_PROC_PRIMARY:
     465                proc_type = "primary";
     466                break;
     467            case RTE_PROC_SECONDARY:
     468                proc_type = "secondary";
     469                break;
     470            case RTE_PROC_INVALID:
     471                proc_type = "invalid";
     472                break;
     473            default:
     474                proc_type = "something worse than invalid!!";
     475        }
     476        fprintf(stderr, "---Process Type : %s\n", proc_type);
     477    }
     478
     479}
     480#endif
     481
     482/**
     483 * Expects to be called from the master lcore and moves it to the given dpdk id
     484 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     485 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     486 *               and not already in use.
     487 * @return 0 is successful otherwise -1 on error.
     488 */
     489static inline int dpdk_move_master_lcore(size_t core) {
     490    struct rte_config *cfg = rte_eal_get_configuration();
     491    cpu_set_t cpuset;
     492    int i;
     493
     494    assert (core < RTE_MAX_LCORE);
     495    assert (rte_get_master_lcore() == rte_lcore_id());
     496
     497    if (core == rte_lcore_id())
     498        return 0;
     499
     500    // Make sure we are not overwriting someone else
     501    assert(!rte_lcore_is_enabled(core));
     502
     503    // Move the core
     504    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     505    cfg->lcore_role[core] = ROLE_RTE;
     506    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     507    rte_eal_get_configuration()->master_lcore = core;
     508    RTE_PER_LCORE(_lcore_id) = core;
     509
     510    // Now change the affinity
     511    CPU_ZERO(&cpuset);
     512
     513    if (lcore_config[core].detected) {
     514        CPU_SET(core, &cpuset);
     515    } else {
     516        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     517            if (lcore_config[i].detected)
     518                CPU_SET(i, &cpuset);
     519        }
     520    }
     521
     522    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     523    if (i != 0) {
     524        // TODO proper libtrace style error here!!
     525        fprintf(stderr, "pthread_setaffinity_np failed\n");
     526        return -1;
     527    }
     528    return 0;
     529}
     530
    447531
    448532/**
     
    475559
    476560static inline int dpdk_init_environment(char * uridata, struct dpdk_format_data_t * format_data,
    477                                         char * err, int errlen) {
     561                                        char * err, int errlen) {
    478562    int ret; /* Returned error codes */
    479563    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
     
    482566    long nb_cpu; /* The number of CPUs in the system */
    483567    long my_cpu; /* The CPU number we want to bind to */
     568    int i;
     569    struct rte_config *cfg = rte_eal_get_configuration();
    484570        struct saved_getopts save_opts;
    485    
     571
    486572#if DEBUG
    487573    rte_set_log_level(RTE_LOG_DEBUG);
    488 #else 
     574#else
    489575    rte_set_log_level(RTE_LOG_WARNING);
    490576#endif
    491577    /*
    492578     * Using unique file prefixes mean separate memory is used, unlinking
    493      * the two processes. However be careful we still cannot access a
     579     * the two processes. However be careful we still cannot access a
     580     * port that already in use.
     581     *
     582     * Using unique file prefixes mean separate memory is used, unlinking
     583     * the two processes. However be careful we still cannot access a
    494584     * port that already in use.
    495585     */
    496586    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
    497                 "--file-prefix", mem_map, "-m", "256", NULL};
     587                "--file-prefix", mem_map, "-m", "980", NULL};
    498588    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    499589
    500590    /* This initialises the Environment Abstraction Layer (EAL)
    501591     * If we had slave workers these are put into WAITING state
    502      * 
     592     *
    503593     * Basically binds this thread to a fixed core, which we choose as
    504594     * the last core on the machine (assuming fewer interrupts mapped here).
     
    511601     */
    512602
    513     /* Get the number of cpu cores in the system and use the last core */
     603    /* Get the number of cpu cores in the system and use the last core
     604     * on the correct numa node */
    514605    nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
    515606    if (nb_cpu <= 0) {
    516         perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
    517         nb_cpu = 1; /* fallback to the first core */
     607        perror("sysconf(_SC_NPROCESSORS_ONLN) failed. Falling back to the first core.");
     608        nb_cpu = 1; /* fallback to the first core */
    518609    }
    519610    if (nb_cpu > RTE_MAX_LCORE)
    520         nb_cpu = RTE_MAX_LCORE;
    521 
    522     my_cpu = nb_cpu;
    523     /* This allows the user to specify the core - we would try to do this 
     611        nb_cpu = RTE_MAX_LCORE;
     612
     613    my_cpu = -1;
     614    /* This allows the user to specify the core - we would try to do this
    524615     * automatically but it's hard to tell that this is secondary
    525      * before running rte_eal_init(...). Currently we are limited to 1 
     616     * before running rte_eal_init(...). Currently we are limited to 1
    526617     * instance per core due to the way memory is allocated. */
    527618    if (parse_pciaddr(uridata, &use_addr, &my_cpu) != 0) {
    528         snprintf(err, errlen, "Failed to parse URI");
    529         return -1;
    530     }
     619        snprintf(err, errlen, "Failed to parse URI");
     620        return -1;
     621    }
     622
     623#if HAVE_LIBNUMA
     624        format_data->nic_numa_node = pci_to_numa(&use_addr);
     625        if (my_cpu < 0) {
     626                /* If we can assign to a core on the same numa node */
     627                printf("Using pci card on numa_node%d\n", format_data->nic_numa_node);
     628                if(format_data->nic_numa_node >= 0) {
     629                        int max_node_cpu = -1;
     630                        struct bitmask *mask = numa_allocate_cpumask();
     631                        assert(mask);
     632                        numa_node_to_cpus(format_data->nic_numa_node, mask);
     633                        for (i = 0 ; i < nb_cpu; ++i) {
     634                                if (numa_bitmask_isbitset(mask,i))
     635                                        max_node_cpu = i+1;
     636                        }
     637                        my_cpu = max_node_cpu;
     638                }
     639        }
     640#endif
     641        if (my_cpu < 0) {
     642                my_cpu = nb_cpu;
     643        }
     644
    531645
    532646    snprintf(format_data->mempool_name, MEMPOOL_NAME_LEN,
    533                 "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
     647                "libtrace_pool_%"PRIu32, (uint32_t) nb_cpu);
    534648
    535649    if (!(my_cpu > 0 && my_cpu <= nb_cpu)) {
    536         snprintf(err, errlen,
    537           "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
    538           " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
    539         return -1;
    540     }
    541 
    542     /* Make our mask */
    543     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    544 
     650        snprintf(err, errlen,
     651          "Intel DPDK - User defined a bad CPU number %"PRIu32" must be"
     652          " between 1 and %"PRIu32, (uint32_t) my_cpu, (uint32_t) nb_cpu);
     653        return -1;
     654    }
     655
     656    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
     657       info older versions */
     658    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     659    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     660
     661#if !DPDK_USE_BLACKLIST
     662    /* Black list all ports besides the one that we want to use */
     663        if ((ret = whitelist_device(format_data, &use_addr)) < 0) {
     664                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     665                         " are you sure the address is correct?: %s", strerror(-ret));
     666                return -1;
     667        }
     668#endif
    545669
    546670        /* Give the memory map a unique name */
    547671        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
    548     /* rte_eal_init it makes a call to getopt so we need to reset the 
     672    /* rte_eal_init it makes a call to getopt so we need to reset the
    549673     * global optind variable of getopt otherwise this fails */
    550674        save_getopts(&save_opts);
    551675    optind = 1;
    552676    if ((ret = rte_eal_init(argc, argv)) < 0) {
    553         snprintf(err, errlen,
    554           "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
    555         return -1;
     677        snprintf(err, errlen,
     678          "Intel DPDK - Initialisation of EAL failed: %s", strerror(-ret));
     679        return -1;
    556680    }
    557681        restore_getopts(&save_opts);
     682    // These are still running but will never do anything with DPDK v1.7 we
     683    // should remove this XXX in the future
     684    for(i = 0; i < RTE_MAX_LCORE; ++i) {
     685            if (rte_lcore_is_enabled(i) && i != (int) rte_get_master_lcore()) {
     686            cfg->lcore_role[i] = ROLE_OFF;
     687            cfg->lcore_count--;
     688        }
     689    }
     690    // Only the master should be running
     691    assert(cfg->lcore_count == 1);
     692
     693    dpdk_move_master_lcore(my_cpu-1);
    558694
    559695#if DEBUG
     
    566702     */
    567703    if ((ret = rte_pmd_init_all()) < 0) {
    568         snprintf(err, errlen,
    569           "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
    570         return -1;
    571     }
    572 #endif
    573 
    574     /* Blacklist all ports besides the one that we want to use */
     704        snprintf(err, errlen,
     705          "Intel DPDK - rte_pmd_init_all failed: %s", strerror(-ret));
     706        return -1;
     707    }
     708#endif
     709
     710#if DPDK_USE_BLACKLIST
     711    /* Black list all ports besides the one that we want to use */
    575712        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
    576713                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
    577                         " are you sure the address is correct?: %s", strerror(-ret));
     714                        " are you sure the address is correct?: %s", strerror(-ret));
    578715                return -1;
    579716        }
     717#endif
    580718
    581719    /* This loads DPDK drivers against all ports that are not blacklisted */
    582720        if ((ret = rte_eal_pci_probe()) < 0) {
    583         snprintf(err, errlen,
    584             "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
    585         return -1;
     721        snprintf(err, errlen,
     722            "Intel DPDK - rte_eal_pci_probe failed: %s", strerror(-ret));
     723        return -1;
    586724    }
    587725
     
    589727
    590728    if (format_data->nb_ports != 1) {
    591         snprintf(err, errlen,
    592             "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
    593             format_data->nb_ports);
    594         return -1;
    595     }
     729        snprintf(err, errlen,
     730            "Intel DPDK - rte_eth_dev_count returned %d but it should be 1",
     731            format_data->nb_ports);
     732        return -1;
     733    }
     734
     735    struct rte_eth_dev_info dev_info;
     736    rte_eth_dev_info_get(0, &dev_info);
     737    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
     738                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
    596739
    597740    return 0;
     
    601744    char err[500];
    602745    err[0] = 0;
    603    
     746
    604747    libtrace->format_data = (struct dpdk_format_data_t *)
    605                             malloc(sizeof(struct dpdk_format_data_t));
     748                            malloc(sizeof(struct dpdk_format_data_t));
    606749    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    607750    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
     
    610753    FORMAT(libtrace)->nb_rx_buf = NB_RX_MBUF;
    611754    FORMAT(libtrace)->nb_tx_buf = MIN_NB_BUF;
     755    FORMAT(libtrace)->nic_numa_node = -1;
    612756    FORMAT(libtrace)->promisc = -1;
    613757    FORMAT(libtrace)->pktmbuf_pool = NULL;
     
    617761    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    618762    FORMAT(libtrace)->mempool_name[0] = 0;
    619 #if HAS_HW_TIMESTAMPS_82580
    620     FORMAT(libtrace)->ts_first_sys = 0;
    621     FORMAT(libtrace)->ts_last_sys = 0;
    622     FORMAT(libtrace)->wrap_count = 0;
    623 #endif
     763    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     764    FORMAT(libtrace)->burst_size = 0;
     765    FORMAT(libtrace)->burst_offset = 0;
    624766
    625767    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    626         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    627         free(libtrace->format_data);
    628         libtrace->format_data = NULL;
    629         return -1;
     768        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     769        free(libtrace->format_data);
     770        libtrace->format_data = NULL;
     771        return -1;
    630772    }
    631773    return 0;
     
    636778    char err[500];
    637779    err[0] = 0;
    638    
     780
    639781    libtrace->format_data = (struct dpdk_format_data_t *)
    640                             malloc(sizeof(struct dpdk_format_data_t));
     782                            malloc(sizeof(struct dpdk_format_data_t));
    641783    FORMAT(libtrace)->port = 0; /* Always assume 1 port loaded */
    642784    FORMAT(libtrace)->queue_id = 0; /* Single queue per port */
     
    645787    FORMAT(libtrace)->nb_rx_buf = MIN_NB_BUF;
    646788    FORMAT(libtrace)->nb_tx_buf = NB_TX_MBUF;
     789    FORMAT(libtrace)->nic_numa_node = -1;
    647790    FORMAT(libtrace)->promisc = -1;
    648791    FORMAT(libtrace)->pktmbuf_pool = NULL;
     
    652795    FORMAT(libtrace)->paused = DPDK_NEVER_STARTED;
    653796    FORMAT(libtrace)->mempool_name[0] = 0;
    654 #if HAS_HW_TIMESTAMPS_82580
    655     FORMAT(libtrace)->ts_first_sys = 0;
    656     FORMAT(libtrace)->ts_last_sys = 0;
    657     FORMAT(libtrace)->wrap_count = 0;
    658 #endif
     797    memset(FORMAT(libtrace)->burst_pkts, 0, sizeof(FORMAT(libtrace)->burst_pkts[0]) * BURST_SIZE);
     798    FORMAT(libtrace)->burst_size = 0;
     799    FORMAT(libtrace)->burst_offset = 0;
    659800
    660801    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    661         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    662         free(libtrace->format_data);
    663         libtrace->format_data = NULL;
    664         return -1;
     802        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     803        free(libtrace->format_data);
     804        libtrace->format_data = NULL;
     805        return -1;
    665806    }
    666807    return 0;
    667808};
    668809
     810static int dpdk_pconfig_input (libtrace_t *libtrace,
     811                                trace_parallel_option_t option,
     812                                void *data) {
     813        switch (option) {
     814                case TRACE_OPTION_SET_HASHER:
     815                        switch (*((enum hasher_types *) data))
     816                        {
     817                                case HASHER_BALANCE:
     818                                case HASHER_UNIDIRECTIONAL:
     819                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     820                                        return 0;
     821                                case HASHER_BIDIRECTIONAL:
     822                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     823                                        return 0;
     824                                case HASHER_HARDWARE:
     825                                case HASHER_CUSTOM:
     826                                        // We don't support these
     827                                        return -1;
     828                        }
     829        break;
     830        }
     831        return -1;
     832}
    669833/**
    670834 * Note here snaplen excludes the MAC checksum. Packets over
    671835 * the requested snaplen will be dropped. (Excluding MAC checksum)
    672  * 
     836 *
    673837 * I.e the maximum size of a standard ethernet packet is 1518 (Including MAC checksum)
    674838 * So to allow packets upto 1518 this would be set to 1514 and if GET_MAC_CRC_CHECKSUM
    675839 * is set the maximum size of the returned packet would be 1518 otherwise
    676840 * 1514 would be the largest size possibly returned.
    677  * 
     841 *
    678842 */
    679843static int dpdk_config_input (libtrace_t *libtrace,
    680                                         trace_option_t option,
    681                                         void *data) {
     844                                        trace_option_t option,
     845                                        void *data) {
    682846    switch (option) {
    683         case TRACE_OPTION_SNAPLEN:
    684             /* Only support changing snaplen before a call to start is
    685              * made */
    686             if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
    687                 FORMAT(libtrace)->snaplen=*(int*)data;
    688             else
    689                 return -1;
    690             return 0;
     847        case TRACE_OPTION_SNAPLEN:
     848            /* Only support changing snaplen before a call to start is
     849             * made */
     850            if (FORMAT(libtrace)->paused == DPDK_NEVER_STARTED)
     851                FORMAT(libtrace)->snaplen=*(int*)data;
     852            else
     853                return -1;
     854            return 0;
    691855                case TRACE_OPTION_PROMISC:
    692856                        FORMAT(libtrace)->promisc=*(int*)data;
    693             return 0;
    694         case TRACE_OPTION_FILTER:
    695             /* TODO filtering */
    696             break;
    697         case TRACE_OPTION_META_FREQ:
    698             break;
    699         case TRACE_OPTION_EVENT_REALTIME:
    700             break;
    701         /* Avoid default: so that future options will cause a warning
    702         * here to remind us to implement it, or flag it as
    703         * unimplementable
    704         */
     857            return 0;
     858        case TRACE_OPTION_FILTER:
     859            /* TODO filtering */
     860            break;
     861        case TRACE_OPTION_META_FREQ:
     862            break;
     863        case TRACE_OPTION_EVENT_REALTIME:
     864            break;
     865        /* Avoid default: so that future options will cause a warning
     866        * here to remind us to implement it, or flag it as
     867        * unimplementable
     868        */
    705869    }
    706870
     
    712876/* Can set jumbo frames/ or limit the size of a frame by setting both
    713877 * max_rx_pkt_len and jumbo_frame. This can be limited to less than
    714  * 
     878 *
    715879 */
    716880static struct rte_eth_conf port_conf = {
    717881        .rxmode = {
     882                .mq_mode = ETH_RSS,
    718883                .split_hdr_size = 0,
    719884                .header_split   = 0, /**< Header Split disabled */
     
    721886                .hw_vlan_filter = 0, /**< VLAN filtering disabled */
    722887                .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
    723         .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
     888                .max_rx_pkt_len = 0, /**< Max frame Size if Jumbo enabled */
    724889#if GET_MAC_CRC_CHECKSUM
    725890/* So it appears that if hw_strip_crc is turned off the driver will still
     
    734899 * always cut off the checksum in the future
    735900 */
    736         .hw_strip_crc   = 1, /**< CRC stripped by hardware */
     901                .hw_strip_crc   = 1, /**< CRC stripped by hardware */
    737902#endif
    738903        },
     
    740905                .mq_mode = ETH_DCB_NONE,
    741906        },
     907        .rx_adv_conf = {
     908                .rss_conf = {
     909                        // .rss_key = &rss_key, // We set this per format
     910                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     911                },
     912        },
     913        .intr_conf = {
     914                .lsc = 1
     915        }
    742916};
    743917
     
    754928static const struct rte_eth_txconf tx_conf = {
    755929        .tx_thresh = {
    756         /**
    757         * TX_PTHRESH prefetch
    758         * Set on the NIC, if the number of unprocessed descriptors to queued on
    759         * the card fall below this try grab at least hthresh more unprocessed
    760         * descriptors.
    761         */
     930        /**
     931        * TX_PTHRESH prefetch
     932        * Set on the NIC, if the number of unprocessed descriptors to queued on
     933        * the card fall below this try grab at least hthresh more unprocessed
     934        * descriptors.
     935        */
    762936                .pthresh = 36,
    763937
    764         /* TX_HTHRESH host
    765         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
    766         */
     938        /* TX_HTHRESH host
     939        * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
     940        */
    767941                .hthresh = 0,
    768        
    769         /* TX_WTHRESH writeback
    770         * Set on the NIC, the number of sent descriptors before writing back
    771         * status to confirm the transmission. This is done more efficiently as
    772         * a bulk DMA-transfer rather than writing one at a time.
    773         * Similar to tx_free_thresh however this is applied to the NIC, where
    774         * as tx_free_thresh is when DPDK will check these. This is extended
    775         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
    776         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
    777         */
     942
     943        /* TX_WTHRESH writeback
     944        * Set on the NIC, the number of sent descriptors before writing back
     945        * status to confirm the transmission. This is done more efficiently as
     946        * a bulk DMA-transfer rather than writing one at a time.
     947        * Similar to tx_free_thresh however this is applied to the NIC, where
     948        * as tx_free_thresh is when DPDK will check these. This is extended
     949        * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
     950        * descriptors rather only every n'th item, reducing DMA memory bandwidth.
     951        */
    778952                .wthresh = 4,
    779953        },
     
    786960
    787961    /* This is the Report Status threshold, used by 10Gbit cards,
    788      * This signals the card to only write back status (such as 
     962     * This signals the card to only write back status (such as
    789963     * transmission successful) after this minimum number of transmit
    790964     * descriptors are seen. The default is 32 (if set to 0) however if set
     
    795969};
    796970
     971/**
     972 * A callback for a link state change (LSC).
     973 *
     974 * Packets may be received before this notification. In fact the DPDK IGXBE
     975 * driver likes to put a delay upto 5sec before sending this.
     976 *
     977 * We use this to ensure the link speed is correct for our timestamp
     978 * calculations. Because packets might be received before the link up we still
     979 * update this when the packet is received.
     980 *
     981 * @param port The DPDK port
     982 * @param event The TYPE of event (expected to be RTE_ETH_EVENT_INTR_LSC)
     983 * @param cb_arg The dpdk_format_data_t structure associated with the format
     984 */
     985static void dpdk_lsc_callback(uint8_t port, enum rte_eth_event_type event,
     986                              void *cb_arg) {
     987        struct dpdk_format_data_t * format_data = cb_arg;
     988        struct rte_eth_link link_info;
     989        assert(event == RTE_ETH_EVENT_INTR_LSC);
     990        assert(port == format_data->port);
     991
     992        rte_eth_link_get_nowait(port, &link_info);
     993
     994        if (link_info.link_status)
     995                format_data->link_speed = link_info.link_speed;
     996        else
     997                format_data->link_speed = 0;
     998
     999#if DEBUG
     1000        fprintf(stderr, "LSC - link status is %s %s speed=%d\n",
     1001                link_info.link_status ? "up" : "down",
     1002                (link_info.link_duplex == ETH_LINK_FULL_DUPLEX) ?
     1003                                          "full-duplex" : "half-duplex",
     1004                (int) link_info.link_speed);
     1005#endif
     1006
     1007        /* Turns out DPDK drivers might not come back up if the link speed
     1008         * changes. So we reset the autoneg procedure. This is very unsafe
     1009         * we have have threads reading packets and we stop the port. */
     1010#if 0
     1011        if (!link_info.link_status) {
     1012                int ret;
     1013                rte_eth_dev_stop(port);
     1014                ret = rte_eth_dev_start(port);
     1015                if (ret < 0) {
     1016                        fprintf(stderr, "Resetting the DPDK port failed : %s\n",
     1017                                strerror(-ret));
     1018                }
     1019        }
     1020#endif
     1021}
     1022
    7971023/* Attach memory to the port and start the port or restart the port.
    7981024 */
     
    8001026    int ret; /* Check return values for errors */
    8011027    struct rte_eth_link link_info; /* Wait for link */
    802    
     1028    unsigned cpu_numa_node = rte_lcore_to_socket_id(rte_lcore_id());
     1029
    8031030    /* Already started */
    8041031    if (format_data->paused == DPDK_RUNNING)
    805         return 0;
    806 
    807     /* First time started we need to alloc our memory, doing this here 
     1032        return 0;
     1033
     1034    /* First time started we need to alloc our memory, doing this here
    8081035     * rather than in environment setup because we don't have snaplen then */
    8091036    if (format_data->paused == DPDK_NEVER_STARTED) {
    810         if (format_data->snaplen == 0) {
    811             format_data->snaplen = RX_MBUF_SIZE;
    812             port_conf.rxmode.jumbo_frame = 0;
    813             port_conf.rxmode.max_rx_pkt_len = 0;
    814         } else {
    815             /* Use jumbo frames */
    816             port_conf.rxmode.jumbo_frame = 1;
    817             port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
    818         }
    819 
    820         /* This is additional overhead so make sure we allow space for this */
     1037        if (format_data->snaplen == 0) {
     1038            format_data->snaplen = RX_MBUF_SIZE;
     1039            port_conf.rxmode.jumbo_frame = 0;
     1040            port_conf.rxmode.max_rx_pkt_len = 0;
     1041        } else {
     1042            /* Use jumbo frames */
     1043            port_conf.rxmode.jumbo_frame = 1;
     1044            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1045        }
     1046
     1047        /* This is additional overhead so make sure we allow space for this */
    8211048#if GET_MAC_CRC_CHECKSUM
    822         format_data->snaplen += ETHER_CRC_LEN;
     1049        format_data->snaplen += ETHER_CRC_LEN;
    8231050#endif
    8241051#if HAS_HW_TIMESTAMPS_82580
    825         format_data->snaplen += sizeof(struct hw_timestamp_82580);
    826 #endif
    827 
    828         /* Create the mbuf pool, which is the place our packets are allocated
    829          * from - TODO figure out if there is is a free function (I cannot see one)
    830          * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    831          * allocate however that extra 1 packet is not used.
    832         * (I assume <= vs < error some where in DPDK code)
    833          * TX requires nb_tx_buffers + 1 in the case the queue is full
    834         * so that will fill the new buffer and wait until slots in the
    835         * ring become available.
    836         */
     1052        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1053#endif
     1054
     1055        /* Create the mbuf pool, which is the place our packets are allocated
     1056         * from - TODO figure out if there is is a free function (I cannot see one)
     1057         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1058         * allocate however that extra 1 packet is not used.
     1059        * (I assume <= vs < error some where in DPDK code)
     1060         * TX requires nb_tx_buffers + 1 in the case the queue is full
     1061        * so that will fill the new buffer and wait until slots in the
     1062        * ring become available.
     1063        */
    8371064#if DEBUG
    8381065    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    8391066#endif
    840         format_data->pktmbuf_pool =
    841             rte_mempool_create(format_data->mempool_name,
    842                        format_data->nb_rx_buf + format_data->nb_tx_buf + 1,
    843                        format_data->snaplen + sizeof(struct rte_mbuf)
    844                                         + RTE_PKTMBUF_HEADROOM,
    845                        8, sizeof(struct rte_pktmbuf_pool_private),
    846                        rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
    847                        0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
    848 
    849         if (format_data->pktmbuf_pool == NULL) {
    850             snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
    851                         "pool failed: %s", strerror(rte_errno));
    852             return -1;
    853         }
    854     }
    855    
     1067    format_data->pktmbuf_pool =
     1068            rte_mempool_create(format_data->mempool_name,
     1069                       (format_data->nb_rx_buf + format_data->nb_tx_buf + 1),
     1070                       format_data->snaplen + sizeof(struct rte_mbuf)
     1071                                        + RTE_PKTMBUF_HEADROOM,
     1072                       128, sizeof(struct rte_pktmbuf_pool_private),
     1073                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     1074                       cpu_numa_node, 0/*MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET*/);
     1075
     1076    if (format_data->pktmbuf_pool == NULL) {
     1077            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf"
     1078                        "pool failed: %s NODE %u", strerror(rte_errno), cpu_numa_node);
     1079            return -1;
     1080        }
     1081    }
     1082
    8561083    /* ----------- Now do the setup for the port mapping ------------ */
    857     /* Order of calls must be 
     1084    /* Order of calls must be
    8581085     * rte_eth_dev_configure()
    8591086     * rte_eth_tx_queue_setup()
     
    8621089     * other rte_eth calls
    8631090     */
    864    
     1091
     1092
     1093    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
     1094
    8651095    /* This must be called first before another *eth* function
    8661096     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
    8671097    ret = rte_eth_dev_configure(format_data->port, 1, 1, &port_conf);
    8681098    if (ret < 0) {
    869         snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
    870                             " %"PRIu8" : %s", format_data->port,
    871                             strerror(-ret));
    872         return -1;
     1099        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1100                            " %"PRIu8" : %s", format_data->port,
     1101                            strerror(-ret));
     1102        return -1;
    8731103    }
    8741104    /* Initialise the TX queue a minimum value if using this port for
     
    8761106     */
    8771107    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
    878                         format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
     1108                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
    8791109    if (ret < 0) {
    880         snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
    881                             " %"PRIu8" : %s", format_data->port,
    882                             strerror(-ret));
    883         return -1;
     1110        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     1111                            " %"PRIu8" : %s", format_data->port,
     1112                            strerror(-ret));
     1113        return -1;
    8841114    }
    8851115    /* Initialise the RX queue with some packets from memory */
    8861116    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
    887                             format_data->nb_rx_buf, SOCKET_ID_ANY,
    888                             &rx_conf, format_data->pktmbuf_pool);
     1117                                 format_data->nb_rx_buf, cpu_numa_node,
     1118                                 &rx_conf, format_data->pktmbuf_pool);
    8891119    if (ret < 0) {
    890         snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
    891                     " %"PRIu8" : %s", format_data->port,
    892                     strerror(-ret));
    893         return -1;
    894     }
    895    
     1120        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     1121                    " %"PRIu8" : %s", format_data->port,
     1122                    strerror(-ret));
     1123        return -1;
     1124    }
     1125
    8961126    /* Start device */
    8971127    ret = rte_eth_dev_start(format_data->port);
    8981128    if (ret < 0) {
    899         snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
    900                     strerror(-ret));
    901         return -1;
     1129        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1130                    strerror(-ret));
     1131        return -1;
    9021132    }
    9031133
    9041134    /* Default promiscuous to on */
    9051135    if (format_data->promisc == -1)
    906         format_data->promisc = 1;
    907    
     1136        format_data->promisc = 1;
     1137
    9081138    if (format_data->promisc == 1)
    909         rte_eth_promiscuous_enable(format_data->port);
     1139        rte_eth_promiscuous_enable(format_data->port);
    9101140    else
    911         rte_eth_promiscuous_disable(format_data->port);
    912    
    913     /* Wait for the link to come up */
    914     rte_eth_link_get(format_data->port, &link_info);
     1141        rte_eth_promiscuous_disable(format_data->port);
     1142
     1143        /* Register a callback for link state changes */
     1144        ret = rte_eth_dev_callback_register(format_data->port,
     1145                                            RTE_ETH_EVENT_INTR_LSC,
     1146                                            dpdk_lsc_callback,
     1147                                            format_data);
     1148        /* If this fails it is not a show stopper */
     1149#if DEBUG
     1150        fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
     1151                ret, strerror(-ret));
     1152#endif
     1153
     1154    /* Get the current link status */
     1155    rte_eth_link_get_nowait(format_data->port, &link_info);
     1156    format_data->link_speed = link_info.link_speed;
    9151157#if DEBUG
    9161158    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    917             (int) link_info.link_duplex, (int) link_info.link_speed);
    918 #endif
    919 
     1159            (int) link_info.link_duplex, (int) link_info.link_speed);
     1160#endif
    9201161    /* We have now successfully started/unpaused */
    9211162    format_data->paused = DPDK_RUNNING;
    922    
     1163
     1164    return 0;
     1165}
     1166
     1167/* Attach memory to the port and start (or restart) the port/s.
     1168 */
     1169static int dpdk_start_port_queues (struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues) {
     1170    int ret, i; /* Check return values for errors */
     1171    struct rte_eth_link link_info; /* Wait for link */
     1172
     1173    /* Already started */
     1174    if (format_data->paused == DPDK_RUNNING)
     1175        return 0;
     1176
     1177    /* First time started we need to alloc our memory, doing this here
     1178     * rather than in environment setup because we don't have snaplen then */
     1179    if (format_data->paused == DPDK_NEVER_STARTED) {
     1180        if (format_data->snaplen == 0) {
     1181            format_data->snaplen = RX_MBUF_SIZE;
     1182            port_conf.rxmode.jumbo_frame = 0;
     1183            port_conf.rxmode.max_rx_pkt_len = 0;
     1184        } else {
     1185            /* Use jumbo frames */
     1186            port_conf.rxmode.jumbo_frame = 1;
     1187            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1188        }
     1189
     1190        /* This is additional overhead so make sure we allow space for this */
     1191#if GET_MAC_CRC_CHECKSUM
     1192        format_data->snaplen += ETHER_CRC_LEN;
     1193#endif
     1194#if HAS_HW_TIMESTAMPS_82580
     1195        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1196#endif
     1197
     1198        /* Create the mbuf pool, which is the place our packets are allocated
     1199         * from - TODO figure out if there is a free function (I cannot see one)
     1200         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1201         * allocate however that extra 1 packet is not used.
     1202         * (I assume <= vs < error some where in DPDK code)
     1203         * TX requires nb_tx_buffers + 1 in the case the queue is full
     1204         * so that will fill the new buffer and wait until slots in the
     1205         * ring become available.
     1206         */
     1207#if DEBUG
     1208    printf("Creating mempool named %s\n", format_data->mempool_name);
     1209#endif
     1210    format_data->pktmbuf_pool =
     1211            rte_mempool_create(format_data->mempool_name,
     1212                       (format_data->nb_rx_buf * rx_queues + format_data->nb_tx_buf + 1)*2,
     1213                       format_data->snaplen + sizeof(struct rte_mbuf)
     1214                                        + RTE_PKTMBUF_HEADROOM,
     1215                       128, sizeof(struct rte_pktmbuf_pool_private),
     1216                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     1217                       format_data->nic_numa_node, 0);
     1218
     1219        if (format_data->pktmbuf_pool == NULL) {
     1220            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1221                        "pool failed: %s", strerror(rte_errno));
     1222            return -1;
     1223        }
     1224    }
     1225
     1226    /* ----------- Now do the setup for the port mapping ------------ */
     1227    /* Order of calls must be
     1228     * rte_eth_dev_configure()
     1229     * rte_eth_tx_queue_setup()
     1230     * rte_eth_rx_queue_setup()
     1231     * rte_eth_dev_start()
     1232     * other rte_eth calls
     1233     */
     1234
     1235    /* This must be called first before another *eth* function
     1236     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
     1237    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     1238    if (ret < 0) {
     1239        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1240                            " %"PRIu8" : %s", format_data->port,
     1241                            strerror(-ret));
     1242        return -1;
     1243    }
     1244#if DEBUG
     1245    printf("Doing dev configure\n");
     1246#endif
     1247    /* Initialise the TX queue a minimum value if using this port for
     1248     * receiving. Otherwise a larger size if writing packets.
     1249     */
     1250    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
     1251                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
     1252    if (ret < 0) {
     1253        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     1254                            " %"PRIu8" : %s", format_data->port,
     1255                            strerror(-ret));
     1256        return -1;
     1257    }
     1258
     1259    for (i=0; i < rx_queues; i++) {
     1260#if DEBUG
     1261    printf("Doing queue configure\n");
     1262#endif
     1263
     1264                /* Initialise the RX queue with some packets from memory */
     1265                ret = rte_eth_rx_queue_setup(format_data->port, i,
     1266                                             format_data->nb_rx_buf, format_data->nic_numa_node,
     1267                                             &rx_conf, format_data->pktmbuf_pool);
     1268        /* Init per_thread data structures */
     1269        format_data->per_lcore[i].port = format_data->port;
     1270        format_data->per_lcore[i].queue_id = i;
     1271
     1272                if (ret < 0) {
     1273                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     1274                                                " %"PRIu8" : %s", format_data->port,
     1275                                                strerror(-ret));
     1276                        return -1;
     1277                }
     1278        }
     1279
     1280#if DEBUG
     1281    fprintf(stderr, "Doing start device\n");
     1282#endif
     1283    /* Start device */
     1284    ret = rte_eth_dev_start(format_data->port);
     1285#if DEBUG
     1286    fprintf(stderr, "Done start device\n");
     1287#endif
     1288    if (ret < 0) {
     1289        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1290                    strerror(-ret));
     1291        return -1;
     1292    }
     1293
     1294
     1295    /* Default promiscuous to on */
     1296    if (format_data->promisc == -1)
     1297        format_data->promisc = 1;
     1298
     1299    if (format_data->promisc == 1)
     1300        rte_eth_promiscuous_enable(format_data->port);
     1301    else
     1302        rte_eth_promiscuous_disable(format_data->port);
     1303
     1304
     1305    /* We have now successfully started/unpased */
     1306    format_data->paused = DPDK_RUNNING;
     1307
     1308    // Can use remote launch for all
     1309    /*RTE_LCORE_FOREACH_SLAVE(i) {
     1310                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
     1311        }*/
     1312
     1313    /* Register a callback for link state changes */
     1314    ret = rte_eth_dev_callback_register(format_data->port,
     1315                                        RTE_ETH_EVENT_INTR_LSC,
     1316                                        dpdk_lsc_callback,
     1317                                        format_data);
     1318    /* If this fails it is not a show stopper */
     1319#if DEBUG
     1320    fprintf(stderr, "rte_eth_dev_callback_register failed %d : %s\n",
     1321            ret, strerror(-ret));
     1322#endif
     1323
     1324    /* Get the current link status */
     1325    rte_eth_link_get_nowait(format_data->port, &link_info);
     1326    format_data->link_speed = link_info.link_speed;
     1327#if DEBUG
     1328    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
     1329            (int) link_info.link_duplex, (int) link_info.link_speed);
     1330        struct rte_eth_rss_reta reta_conf = {0};
     1331        reta_conf.mask_lo = ~reta_conf.mask_lo;
     1332        reta_conf.mask_hi = ~reta_conf.mask_hi;
     1333        int qew = rte_eth_dev_rss_reta_query(format_data->port, &reta_conf);
     1334        fprintf(stderr, "err=%d", qew);
     1335        for (i = 0; i < ETH_RSS_RETA_NUM_ENTRIES; i++) {
     1336                fprintf(stderr, "[%d] = %d\n", i, (int)reta_conf.reta[i]);
     1337        }
     1338
     1339#endif
     1340
    9231341    return 0;
    9241342}
     
    9291347
    9301348    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    931         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    932         free(libtrace->format_data);
    933         libtrace->format_data = NULL;
    934         return -1;
     1349        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1350        free(libtrace->format_data);
     1351        libtrace->format_data = NULL;
     1352        return -1;
    9351353    }
    9361354    return 0;
     1355}
     1356
     1357static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
     1358    struct rte_eth_dev_info dev_info;
     1359    rte_eth_dev_info_get(port_id, &dev_info);
     1360    return dev_info.max_rx_queues;
     1361}
     1362
     1363static inline size_t dpdk_processor_count () {
     1364    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1365    if (nb_cpu <= 0)
     1366        return 1;
     1367    else
     1368        return (size_t) nb_cpu;
     1369}
     1370
     1371static int dpdk_pstart_input (libtrace_t *libtrace) {
     1372    char err[500];
     1373    int i=0, phys_cores=0;
     1374    int tot = libtrace->perpkt_thread_count;
     1375    err[0] = 0;
     1376
     1377    if (rte_lcore_id() != rte_get_master_lcore())
     1378        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
     1379
     1380    // If the master is not on the last thread we move it there
     1381    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1382        // Consider error handling here
     1383        dpdk_move_master_lcore(RTE_MAX_LCORE - 1);
     1384    }
     1385
     1386    // Don't exceed the number of cores in the system/detected by dpdk
     1387    // We don't have to force this but performance wont be good if we don't
     1388    for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1389        if (lcore_config[i].detected) {
     1390            if (rte_lcore_is_enabled(i))
     1391                fprintf(stderr, "Found core %d already in use!\n", i);
     1392            else
     1393                phys_cores++;
     1394        }
     1395    }
     1396
     1397        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1398        tot = MIN(tot, phys_cores);
     1399
     1400        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
     1401
     1402    if (dpdk_start_port_queues(FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1403        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1404        free(libtrace->format_data);
     1405        libtrace->format_data = NULL;
     1406        return -1;
     1407    }
     1408
     1409    // Make sure we only start the number that we should
     1410    libtrace->perpkt_thread_count = tot;
     1411    return 0;
     1412}
     1413
     1414
     1415/**
     1416 * Register a thread with the DPDK system,
     1417 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1418 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1419 * gives it.
     1420 *
     1421 * We then allow a mapper thread to be started on every real core as DPDK would
     1422 * we also bind these to the corresponding CPU cores.
     1423 *
     1424 * @param libtrace A pointer to the trace
     1425 * @param reading True if the thread will be used to read packets, i.e. will
     1426 *                call pread_packet(), false if thread used to process packet
     1427 *                in any other manner including statistics functions.
     1428 */
     1429static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1430{
     1431    struct rte_config *cfg = rte_eal_get_configuration();
     1432    int i;
     1433    int new_id = -1;
     1434
     1435    // If 'reading packets' fill in cores from 0 up and bind affinity
     1436    // otherwise start from the MAX core (which is also the master) and work backwards
     1437    // in this case physical cores on the system will not exist so we don't bind
     1438    // these to any particular physical core
     1439    if (reading) {
     1440#if HAVE_LIBNUMA
     1441        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1442                if (!rte_lcore_is_enabled(i) && numa_node_of_cpu(i) == FORMAT(libtrace)->nic_numa_node) {
     1443                                new_id = i;
     1444                        if (!lcore_config[i].detected)
     1445                                new_id = -1;
     1446                        break;
     1447                }
     1448        }
     1449#endif
     1450        /* Retry without the the numa restriction */
     1451        if (new_id == -1) {
     1452                for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1453                                if (!rte_lcore_is_enabled(i)) {
     1454                                        new_id = i;
     1455                                if (!lcore_config[i].detected)
     1456                                        fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
     1457                                break;
     1458                        }
     1459                }
     1460        }
     1461    } else {
     1462        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1463            if (!rte_lcore_is_enabled(i)) {
     1464                new_id = i;
     1465                break;
     1466            }
     1467        }
     1468    }
     1469
     1470    if (new_id == -1) {
     1471        assert(cfg->lcore_count == RTE_MAX_LCORE);
     1472        // TODO proper libtrace style error here!!
     1473        fprintf(stderr, "Too many threads for DPDK!!\n");
     1474        return -1;
     1475    }
     1476
     1477    // Enable the core in global DPDK structs
     1478    cfg->lcore_role[new_id] = ROLE_RTE;
     1479    cfg->lcore_count++;
     1480    // Set TLS to reflect our new number
     1481    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
     1482    fprintf(stderr, "original id%d", rte_lcore_id());
     1483    RTE_PER_LCORE(_lcore_id) = new_id;
     1484        char name[99];
     1485        pthread_getname_np(pthread_self(),
     1486                              name, sizeof(name));
     1487
     1488    fprintf(stderr, "%s new id%d\n", name, rte_lcore_id());
     1489
     1490    if (reading) {
     1491        // Set affinity bind to corresponding core
     1492        cpu_set_t cpuset;
     1493        CPU_ZERO(&cpuset);
     1494        CPU_SET(rte_lcore_id(), &cpuset);
     1495        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1496        if (i != 0) {
     1497            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
     1498            return -1;
     1499        }
     1500    }
     1501
     1502    // Map our TLS to the thread data
     1503    if (reading) {
     1504        if(t->type == THREAD_PERPKT) {
     1505            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
     1506        } else {
     1507            t->format_data = &FORMAT(libtrace)->per_lcore[0];
     1508        }
     1509    }
     1510    return 0;
     1511}
     1512
     1513
     1514/**
     1515 * Unregister a thread with the DPDK system.
     1516 *
     1517 * Only previously registered threads should be calling this just before
     1518 * they are destroyed.
     1519 */
     1520static void dpdk_punregister_thread(libtrace_t *libtrace UNUSED, libtrace_thread_t *t UNUSED)
     1521{
     1522    struct rte_config *cfg = rte_eal_get_configuration();
     1523
     1524    assert(rte_lcore_id() < RTE_MAX_LCORE);
     1525
     1526    // Skip if master!!
     1527    if (rte_lcore_id() == rte_get_master_lcore()) {
     1528        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1529        return;
     1530    }
     1531
     1532    // Disable this core in global DPDK structs
     1533    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1534    cfg->lcore_count--;
     1535    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1536    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1537    return;
    9371538}
    9381539
     
    9411542    char err[500];
    9421543    err[0] = 0;
    943    
     1544
    9441545    if (dpdk_start_port(FORMAT(libtrace), err, sizeof(err)) != 0) {
    945         trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
    946         free(libtrace->format_data);
    947         libtrace->format_data = NULL;
    948         return -1;
     1546        trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1547        free(libtrace->format_data);
     1548        libtrace->format_data = NULL;
     1549        return -1;
    9491550    }
    9501551    return 0;
    9511552}
    9521553
    953 static int dpdk_pause_input(libtrace_t * libtrace){
     1554static int dpdk_pause_input(libtrace_t * libtrace) {
    9541555    /* This stops the device, but can be restarted using rte_eth_dev_start() */
    9551556    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
    956 #if DEBUG     
    957         fprintf(stderr, "Pausing port\n");
    958 #endif
    959         rte_eth_dev_stop(FORMAT(libtrace)->port);
    960         FORMAT(libtrace)->paused = DPDK_PAUSED;
    961         /* If we pause it the driver will be reset and likely our counter */
     1557#if DEBUG
     1558        fprintf(stderr, "Pausing DPDK port\n");
     1559#endif
     1560        rte_eth_dev_stop(FORMAT(libtrace)->port);
     1561        FORMAT(libtrace)->paused = DPDK_PAUSED;
     1562        /* Empty the queue of packets */
     1563        for (; FORMAT(libtrace)->burst_offset < FORMAT(libtrace)->burst_size; ++FORMAT(libtrace)->burst_offset) {
     1564                rte_pktmbuf_free(FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset]);
     1565        }
     1566        FORMAT(libtrace)->burst_offset = 0;
     1567        FORMAT(libtrace)->burst_size = 0;
     1568        /* If we pause it the driver will be reset and likely our counter */
     1569
     1570        FORMAT(libtrace)->per_lcore[0].ts_last_sys = 0;
    9621571#if HAS_HW_TIMESTAMPS_82580
    963         FORMAT(libtrace)->ts_first_sys = 0;
    964         FORMAT(libtrace)->ts_last_sys = 0;
     1572        FORMAT(libtrace)->per_lcore[0].ts_first_sys = 0;
    9651573#endif
    9661574    }
     
    9681576}
    9691577
    970 static int dpdk_write_packet(libtrace_out_t *trace, 
     1578static int dpdk_write_packet(libtrace_out_t *trace,
    9711579                libtrace_packet_t *packet){
    9721580    struct rte_mbuf* m_buff[1];
    973    
     1581
    9741582    int wirelen = trace_get_wire_length(packet);
    9751583    int caplen = trace_get_capture_length(packet);
    976    
     1584
    9771585    /* Check for a checksum and remove it */
    9781586    if (trace_get_link_type(packet) == TRACE_TYPE_ETH &&
    979                                             wirelen == caplen)
    980         caplen -= ETHER_CRC_LEN;
     1587                                            wirelen == caplen)
     1588        caplen -= ETHER_CRC_LEN;
    9811589
    9821590    m_buff[0] = rte_pktmbuf_alloc(FORMAT(trace)->pktmbuf_pool);
    9831591    if (m_buff[0] == NULL) {
    984         trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
    985         return -1;
     1592        trace_set_err_out(trace, errno, "Cannot get an empty packet buffer");
     1593        return -1;
    9861594    } else {
    987         int ret;
    988         memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
    989         do {
    990             ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
    991         } while (ret != 1);
     1595        int ret;
     1596        memcpy(rte_pktmbuf_append(m_buff[0], caplen), packet->payload, caplen);
     1597        do {
     1598            ret = rte_eth_tx_burst(FORMAT(trace)->queue_id, FORMAT(trace)->port, m_buff, 1);
     1599        } while (ret != 1);
    9921600    }
    9931601
     
    9981606    /* Free our memory structures */
    9991607    if (libtrace->format_data != NULL) {
    1000         /* Close the device completely, device cannot be restarted */
    1001         if (FORMAT(libtrace)->port != 0xFF)
    1002             rte_eth_dev_close(FORMAT(libtrace)->port);
    1003         /* filter here if we used it */
     1608        /* Close the device completely, device cannot be restarted */
     1609        if (FORMAT(libtrace)->port != 0xFF)
     1610                rte_eth_dev_callback_unregister(FORMAT(libtrace)->port,
     1611                                                RTE_ETH_EVENT_INTR_LSC,
     1612                                                dpdk_lsc_callback,
     1613                                                FORMAT(libtrace));
     1614                rte_eth_dev_close(FORMAT(libtrace)->port);
     1615                /* filter here if we used it */
    10041616                free(libtrace->format_data);
    10051617        }
     
    10151627    /* Free our memory structures */
    10161628    if (libtrace->format_data != NULL) {
    1017         /* Close the device completely, device cannot be restarted */
    1018         if (FORMAT(libtrace)->port != 0xFF)
    1019             rte_eth_dev_close(FORMAT(libtrace)->port);
    1020         /* filter here if we used it */
     1629        /* Close the device completely, device cannot be restarted */
     1630        if (FORMAT(libtrace)->port != 0xFF)
     1631            rte_eth_dev_close(FORMAT(libtrace)->port);
     1632        /* filter here if we used it */
    10211633                free(libtrace->format_data);
    10221634        }
     
    10281640}
    10291641
    1030 /** 
    1031  * Get the start of additional header that we added to a packet.
     1642/**
     1643 * Get the start of the additional header that we added to a packet.
    10321644 */
    10331645static inline struct dpdk_addt_hdr * get_addt_hdr (const libtrace_packet_t *packet) {
    1034     uint8_t *hdrsize;
    10351646    assert(packet);
    10361647    assert(packet->buffer);
    1037     hdrsize = (uint8_t *) MBUF_PKTDATA(packet->buffer);
    1038     /* The byte before the original packet data denotes the size in bytes
    1039      * of our additional header that we added sits before the 'size byte' */
    1040     hdrsize--;
    1041     return (struct dpdk_addt_hdr *) (hdrsize - *hdrsize);
     1648    /* Our header sits straight after the mbuf header */
     1649    return (struct dpdk_addt_hdr *) ((struct rte_mbuf*) packet->buffer + 1);
    10421650}
    10431651
     
    10501658    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    10511659    if (size > hdr->cap_len) {
    1052         /* Cannot make a packet bigger */
     1660        /* Cannot make a packet bigger */
    10531661                return trace_get_capture_length(packet);
    10541662        }
     
    10641672    int org_cap_size; /* The original capture size */
    10651673    if (hdr->flags & INCLUDES_HW_TIMESTAMP) {
    1066         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1067                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr) -
    1068                             sizeof(struct hw_timestamp_82580);
     1674        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
     1675                            sizeof(struct hw_timestamp_82580);
    10691676    } else {
    1070         org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer)) -
    1071                             (MBUF_PKTDATA(packet->buffer) - (char *) hdr);
     1677        org_cap_size = (int) rte_pktmbuf_pkt_len(MBUF(packet->buffer));
    10721678    }
    10731679    if (hdr->flags & INCLUDES_CHECKSUM) {
    1074         return org_cap_size;
     1680        return org_cap_size;
    10751681    } else {
    1076         /* DPDK packets are always TRACE_TYPE_ETH packets */
    1077         return org_cap_size + ETHER_CRC_LEN;
     1682        /* DPDK packets are always TRACE_TYPE_ETH packets */
     1683        return org_cap_size + ETHER_CRC_LEN;
    10781684    }
    10791685}
     
    10811687    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    10821688    if (hdr->flags & INCLUDES_HW_TIMESTAMP)
    1083         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
    1084                 sizeof(struct hw_timestamp_82580);
     1689        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM +
     1690                sizeof(struct hw_timestamp_82580);
    10851691    else
    1086         return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
     1692        return sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
    10871693}
    10881694
     
    10921698    assert(packet);
    10931699    if (packet->buffer != buffer &&
    1094         packet->buf_control == TRACE_CTRL_PACKET) {
    1095         free(packet->buffer);
     1700        packet->buf_control == TRACE_CTRL_PACKET) {
     1701        free(packet->buffer);
    10961702    }
    10971703
    10981704    if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    1099         packet->buf_control = TRACE_CTRL_PACKET;
     1705        packet->buf_control = TRACE_CTRL_PACKET;
    11001706    } else
    1101         packet->buf_control = TRACE_CTRL_EXTERNAL;
     1707        packet->buf_control = TRACE_CTRL_EXTERNAL;
    11021708
    11031709    packet->buffer = buffer;
     
    11101716}
    11111717
     1718
     1719/**
     1720 * Given a packet size and a link speed, computes the
     1721 * time to transmit in nanoseconds.
     1722 *
     1723 * @param format_data The dpdk format data from which we get the link speed
     1724 *        and if unset updates it in a thread safe manner
     1725 * @param pkt_size The size of the packet in bytes
     1726 * @return The wire time in nanoseconds
     1727 */
     1728static inline uint32_t calculate_wire_time(struct dpdk_format_data_t* format_data, uint32_t pkt_size) {
     1729        uint32_t wire_time;
     1730        /* 20 extra bytes of interframe gap and preamble */
     1731# if GET_MAC_CRC_CHECKSUM
     1732        wire_time = ((pkt_size + 20) * 8000);
     1733# else
     1734        wire_time = ((pkt_size + 20 + ETHER_CRC_LEN) * 8000);
     1735# endif
     1736
     1737        /* Division is really slow and introduces a pipeline stall
     1738         * The compiler will optimise this into magical multiplication and shifting
     1739         * See http://ridiculousfish.com/blog/posts/labor-of-division-episode-i.html
     1740         */
     1741retry_calc_wiretime:
     1742        switch (format_data->link_speed) {
     1743        case ETH_LINK_SPEED_40G:
     1744                wire_time /=  ETH_LINK_SPEED_40G;
     1745                break;
     1746        case ETH_LINK_SPEED_20G:
     1747                wire_time /= ETH_LINK_SPEED_20G;
     1748                break;
     1749        case ETH_LINK_SPEED_10G:
     1750                wire_time /= ETH_LINK_SPEED_10G;
     1751                break;
     1752        case ETH_LINK_SPEED_1000:
     1753                wire_time /= ETH_LINK_SPEED_1000;
     1754                break;
     1755        case 0:
     1756                {
     1757                /* Maybe the link was down originally, but now it should be up */
     1758                struct rte_eth_link link = {0};
     1759                rte_eth_link_get_nowait(format_data->port, &link);
     1760                if (link.link_status && link.link_speed) {
     1761                        format_data->link_speed = link.link_speed;
     1762#ifdef DEBUG
     1763                        fprintf(stderr, "Link has come up updated speed=%d\n", (int) link.link_speed);
     1764#endif
     1765                        goto retry_calc_wiretime;
     1766                }
     1767                /* We don't know the link speed, make sure numbers are counting up */
     1768                wire_time = 1;
     1769                break;
     1770                }
     1771        default:
     1772                wire_time /= format_data->link_speed;
     1773        }
     1774        return wire_time;
     1775}
     1776
     1777
     1778
    11121779/*
    1113  * Does any extra preperation to a captured packet.
    1114  * This includes adding our extra header to it with the timestamp
    1115  */
    1116 static inline int dpdk_ready_pkt(libtrace_t *libtrace, libtrace_packet_t *packet,
    1117                                                         struct rte_mbuf* pkt){
    1118     uint8_t * hdr_size;
    1119     struct dpdk_addt_hdr *hdr;
     1780 * Does any extra preperation to all captured packets
     1781 * This includes adding our extra header to it with the timestamp,
     1782 * and any snapping
     1783 *
     1784 * @param format_data The DPDK format data
     1785 * @param plc The DPDK per lcore format data
     1786 * @param pkts An array of size nb_pkts of DPDK packets
     1787 * @param nb_pkts The number of packets in pkts and optionally packets
     1788 * @param packets Optional - If not null nb_pkts of libtrace packets which will be prepared
     1789 */
     1790static inline void dpdk_ready_pkts(struct dpdk_format_data_t *format_data, struct dpdk_per_lcore_t *plc,
     1791                                   struct rte_mbuf **pkts, size_t nb_pkts, libtrace_packet_t **packets) {
     1792        struct dpdk_addt_hdr *hdr;
     1793        size_t i;
     1794        uint64_t cur_sys_time_ns;
    11201795#if HAS_HW_TIMESTAMPS_82580
    1121     struct hw_timestamp_82580 *hw_ts;
    1122     struct timeval cur_sys_time;
    1123     uint64_t cur_sys_time_ns;
    1124     uint64_t estimated_wraps;
    1125    
    1126     /* Using gettimeofday because it's most likely to be a vsyscall
    1127      * We don't want to slow down anything with systemcalls we dont need
    1128      * accauracy */
    1129     gettimeofday(&cur_sys_time, NULL);
     1796        struct hw_timestamp_82580 *hw_ts;
     1797        uint64_t estimated_wraps;
    11301798#else
    1131 # if USE_CLOCK_GETTIME
    1132     struct timespec cur_sys_time;
    1133    
    1134     /* This looks terrible and I feel bad doing it. But it's OK
    1135      * on new kernels, because this is a vsyscall */
    1136     clock_gettime(CLOCK_REALTIME, &cur_sys_time);
    1137 # else
    1138     struct timeval cur_sys_time;
    1139     /* Should be a vsyscall */
    1140     gettimeofday(&cur_sys_time, NULL);
    1141 # endif
    1142 #endif
    1143 
    1144     /* Record the size of our header */
    1145     hdr_size = (uint8_t *) rte_pktmbuf_prepend(pkt, sizeof(uint8_t));
    1146     *hdr_size = sizeof(struct dpdk_addt_hdr);
    1147     /* Now put our header in front of that size */
    1148     hdr = (struct dpdk_addt_hdr *) rte_pktmbuf_prepend(pkt, sizeof(struct dpdk_addt_hdr));
    1149     memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
    1150    
     1799
     1800#endif
     1801
     1802#if USE_CLOCK_GETTIME
     1803        struct timespec cur_sys_time = {0};
     1804        /* This looks terrible and I feel bad doing it. But it's OK
     1805         * on new kernels, because this is a fast vsyscall */
     1806        clock_gettime(CLOCK_REALTIME, &cur_sys_time);
     1807        cur_sys_time_ns = TS_TO_NS(cur_sys_time);
     1808#else
     1809        struct timeval cur_sys_time = {0};
     1810        /* Also a fast vsyscall */
     1811        gettimeofday(&cur_sys_time, NULL);
     1812        cur_sys_time_ns = TV_TO_NS(cur_sys_time);
     1813#endif
     1814
     1815        /* The system clock is not perfect so when running
     1816         * at linerate we could timestamp a packet in the past.
     1817         * To avoid this we munge the timestamp to appear 1ns
     1818         * after the previous packet. We should eventually catch up
     1819         * to system time since a 64byte packet on a 10G link takes 67ns.
     1820         *
     1821         * Note with parallel readers timestamping packets
     1822         * with duplicate stamps or out of order is unavoidable without
     1823         * hardware timestamping from the NIC.
     1824         */
     1825#if !HAS_HW_TIMESTAMPS_82580
     1826        if (plc->ts_last_sys >= cur_sys_time_ns) {
     1827                cur_sys_time_ns = plc->ts_last_sys + 1;
     1828        }
     1829#endif
     1830
     1831        assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct dpdk_addt_hdr)); // TODO static compile time assert sometime??
     1832        for (i = 0 ; i < nb_pkts ; ++i) {
     1833
     1834                /* We put our header straight after the dpdk header */
     1835                hdr = (struct dpdk_addt_hdr *) (pkts[i] + 1);
     1836                memset(hdr, 0, sizeof(struct dpdk_addt_hdr));
     1837
    11511838#if GET_MAC_CRC_CHECKSUM
    1152     /* Add back in the CRC sum */
    1153     pkt->pkt.pkt_len += ETHER_CRC_LEN;
    1154     pkt->pkt.data_len += ETHER_CRC_LEN;
    1155     hdr->flags |= INCLUDES_CHECKSUM;
    1156 #endif
     1839                /* Add back in the CRC sum */
     1840                pkts[i]->pkt.pkt_len += ETHER_CRC_LEN;
     1841                pkts[i]->pkt.data_len += ETHER_CRC_LEN;
     1842                hdr->flags |= INCLUDES_CHECKSUM;
     1843#endif
     1844
     1845                hdr->cap_len = rte_pktmbuf_pkt_len(pkts[i]);
    11571846
    11581847#if HAS_HW_TIMESTAMPS_82580
    1159     /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
    1160      *
    1161      *        +----------+---+   +--------------+
    1162      *  82580 |    24    | 8 |   |      32      |
    1163      *        +----------+---+   +--------------+
    1164      *          reserved  \______ 40 bits _____/
    1165      *
    1166      * The 40 bit 82580 SYSTIM overflows every
    1167      *   2^40 * 10^-9 /  60  = 18.3 minutes.
    1168      *
    1169      * NOTE picture is in Big Endian order, in memory it's acutally in Little
    1170      * Endian (for the full 64 bits) i.e. picture is mirrored
    1171      */
    1172    
    1173     /* The timestamp is sitting before our packet and is included in pkt_len */
    1174     hdr->flags |= INCLUDES_HW_TIMESTAMP;
    1175     hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkt);
    1176    
    1177     /* Despite what the documentation says this is in Little
    1178      * Endian byteorder. Mask the reserved section out.
    1179      */
    1180     hdr->timestamp = le64toh(hw_ts->timestamp) &
    1181                 ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
    1182                
    1183     cur_sys_time_ns = TV_TO_NS(cur_sys_time);
    1184     if (unlikely(FORMAT(libtrace)->ts_first_sys == 0)) {
    1185         FORMAT(libtrace)->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
    1186         FORMAT(libtrace)->ts_last_sys = FORMAT(libtrace)->ts_first_sys;
    1187     }
    1188    
    1189     /* This will have serious problems if packets aren't read quickly
    1190      * that is within a couple of seconds because our clock cycles every
    1191      * 18 seconds */
    1192     estimated_wraps = (cur_sys_time_ns - FORMAT(libtrace)->ts_last_sys)
    1193                             / (1ull<<TS_NBITS_82580);
    1194    
    1195     /* Estimated_wraps gives the number of times the counter should have
    1196      * wrapped (however depending on value last time it could have wrapped
    1197      * twice more (if hw clock is close to its max value) or once less (allowing
    1198      * for a bit of variance between hw and sys clock). But if the clock
    1199      * shouldn't have wrapped once then don't allow it to go backwards in time */
    1200     if (unlikely(estimated_wraps >= 2)) {
    1201         /* 2 or more wrap arounds add all but the very last wrap */
    1202         FORMAT(libtrace)->wrap_count += estimated_wraps - 1;
    1203     }
    1204    
    1205     /* Set the timestamp to the lowest possible value we're considering */
    1206     hdr->timestamp += FORMAT(libtrace)->ts_first_sys +
    1207                         FORMAT(libtrace)->wrap_count * (1ull<<TS_NBITS_82580);
    1208    
    1209     /* In most runs only the first if() will need evaluating - i.e our
    1210      * estimate is correct. */
    1211     if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
    1212                                 hdr->timestamp, MAXSKEW_82580))) {
    1213         /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
    1214         FORMAT(libtrace)->wrap_count++;
    1215         hdr->timestamp += (1ull<<TS_NBITS_82580);
    1216         if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1217                                 hdr->timestamp, MAXSKEW_82580)) {
    1218             /* Failed to match estimated_wraps */
    1219             FORMAT(libtrace)->wrap_count++;
    1220             hdr->timestamp += (1ull<<TS_NBITS_82580);
    1221             if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1222                                 hdr->timestamp, MAXSKEW_82580)) {
    1223                 if (estimated_wraps == 0) {
    1224                     /* 0 case Failed to match estimated_wraps+2 */
    1225                     printf("WARNING - Hardware Timestamp failed to"
    1226                                             " match using systemtime!\n");
    1227                     hdr->timestamp = cur_sys_time_ns;
    1228                 } else {
    1229                     /* Failed to match estimated_wraps+1 */
    1230                     FORMAT(libtrace)->wrap_count++;
    1231                     hdr->timestamp += (1ull<<TS_NBITS_82580);
    1232                     if (!WITHIN_VARIANCE(cur_sys_time_ns,
    1233                                 hdr->timestamp, MAXSKEW_82580)) {
    1234                         /* Failed to match estimated_wraps+2 */
    1235                         printf("WARNING - Hardware Timestamp failed to"
    1236                                             " match using systemtime!!\n");
    1237                     }
    1238                 }
    1239             }
    1240         }
    1241     }
    1242 
    1243     /* Log our previous for the next loop */
    1244     FORMAT(libtrace)->ts_last_sys = TV_TO_NS(cur_sys_time);
    1245 
     1848                /* The timestamp is sitting before our packet and is included in pkt_len */
     1849                hdr->flags |= INCLUDES_HW_TIMESTAMP;
     1850                hdr->cap_len -= sizeof(struct hw_timestamp_82580);
     1851                hw_ts = (struct hw_timestamp_82580 *) MBUF_PKTDATA(pkts[i]);
     1852
     1853                /* Taken from igb_ptp.c part of Intel Linux drivers (Good example code)
     1854                 *
     1855                 *        +----------+---+   +--------------+
     1856                 *  82580 |    24    | 8 |   |      32      |
     1857                 *        +----------+---+   +--------------+
     1858                 *          reserved  \______ 40 bits _____/
     1859                 *
     1860                 * The 40 bit 82580 SYSTIM overflows every
     1861                 *   2^40 * 10^-9 /  60  = 18.3 minutes.
     1862                 *
     1863                 * NOTE picture is in Big Endian order, in memory it's acutally in Little
     1864                 * Endian (for the full 64 bits) i.e. picture is mirrored
     1865                 */
     1866
     1867                /* Despite what the documentation says this is in Little
     1868                 * Endian byteorder. Mask the reserved section out.
     1869                 */
     1870                hdr->timestamp = le64toh(hw_ts->timestamp) &
     1871                        ~(((~0ull)>>TS_NBITS_82580)<<TS_NBITS_82580);
     1872
     1873                if (unlikely(plc->ts_first_sys == 0)) {
     1874                        plc->ts_first_sys = cur_sys_time_ns - hdr->timestamp;
     1875                        plc->ts_last_sys = plc->ts_first_sys;
     1876                }
     1877
     1878                /* This will have serious problems if packets aren't read quickly
     1879                 * that is within a couple of seconds because our clock cycles every
     1880                 * 18 seconds */
     1881                estimated_wraps = (cur_sys_time_ns - plc->ts_last_sys)
     1882                                  / (1ull<<TS_NBITS_82580);
     1883
     1884                /* Estimated_wraps gives the number of times the counter should have
     1885                 * wrapped (however depending on value last time it could have wrapped
     1886                 * twice more (if hw clock is close to its max value) or once less (allowing
     1887                 * for a bit of variance between hw and sys clock). But if the clock
     1888                 * shouldn't have wrapped once then don't allow it to go backwards in time */
     1889                if (unlikely(estimated_wraps >= 2)) {
     1890                        /* 2 or more wrap arounds add all but the very last wrap */
     1891                        plc->wrap_count += estimated_wraps - 1;
     1892                }
     1893
     1894                /* Set the timestamp to the lowest possible value we're considering */
     1895                hdr->timestamp += plc->ts_first_sys +
     1896                                  plc->wrap_count * (1ull<<TS_NBITS_82580);
     1897
     1898                /* In most runs only the first if() will need evaluating - i.e our
     1899                 * estimate is correct. */
     1900                if (unlikely(!WITHIN_VARIANCE(cur_sys_time_ns,
     1901                                              hdr->timestamp, MAXSKEW_82580))) {
     1902                        /* Failed to match estimated_wraps-1 (or estimated_wraps in ==0 case) */
     1903                        plc->wrap_count++;
     1904                        hdr->timestamp += (1ull<<TS_NBITS_82580);
     1905                        if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1906                                             hdr->timestamp, MAXSKEW_82580)) {
     1907                                /* Failed to match estimated_wraps */
     1908                                plc->wrap_count++;
     1909                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     1910                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1911                                                     hdr->timestamp, MAXSKEW_82580)) {
     1912                                        if (estimated_wraps == 0) {
     1913                                                /* 0 case Failed to match estimated_wraps+2 */
     1914                                                printf("WARNING - Hardware Timestamp failed to"
     1915                                                       " match using systemtime!\n");
     1916                                                hdr->timestamp = cur_sys_time_ns;
     1917                                        } else {
     1918                                                /* Failed to match estimated_wraps+1 */
     1919                                                plc->wrap_count++;
     1920                                                hdr->timestamp += (1ull<<TS_NBITS_82580);
     1921                                                if (!WITHIN_VARIANCE(cur_sys_time_ns,
     1922                                                                     hdr->timestamp, MAXSKEW_82580)) {
     1923                                                        /* Failed to match estimated_wraps+2 */
     1924                                                        printf("WARNING - Hardware Timestamp failed to"
     1925                                                               " match using systemtime!!\n");
     1926                                                }
     1927                                        }
     1928                                }
     1929                        }
     1930                }
    12461931#else
    1247 # if USE_CLOCK_GETTIME
    1248     hdr->timestamp = TS_TO_NS(cur_sys_time);
    1249 # else
    1250     hdr->timestamp = TV_TO_NS(cur_sys_time);
    1251 # endif
    1252 #endif
    1253 
    1254     /* Intels samples prefetch into level 0 cache lets assume it is a good
    1255      * idea and do the same */
    1256     rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
    1257     packet->buffer = pkt;
    1258     dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
    1259 
    1260     /* Set our capture length for the first time */
    1261     hdr->cap_len = dpdk_get_wire_length(packet);
    1262     if (!(hdr->flags & INCLUDES_CHECKSUM)) {
    1263         hdr->cap_len -= ETHER_CRC_LEN;
    1264     }
    1265    
    1266 
    1267     return dpdk_get_framing_length(packet) +
    1268                         dpdk_get_capture_length(packet);
    1269 }
     1932
     1933                hdr->timestamp = cur_sys_time_ns;
     1934                /* Offset the next packet by the wire time of previous */
     1935                calculate_wire_time(format_data, hdr->cap_len);
     1936
     1937#endif
     1938                if(packets) {
     1939                        packets[i]->buffer = pkts[i];
     1940                        packets[i]->header = pkts[i];
     1941#if HAS_HW_TIMESTAMPS_82580
     1942                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
     1943                                              RTE_PKTMBUF_HEADROOM + sizeof(struct hw_timestamp_82580);
     1944#else
     1945                        packets[i]->payload = (char *) pkts[i] + sizeof(struct rte_mbuf) +
     1946                                              RTE_PKTMBUF_HEADROOM;
     1947#endif
     1948                        packets[i]->error = 1;
     1949                }
     1950        }
     1951
     1952        plc->ts_last_sys = cur_sys_time_ns;
     1953
     1954        return;
     1955}
     1956
     1957
     1958static void dpdk_fin_packet(libtrace_packet_t *packet)
     1959{
     1960        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1961                rte_pktmbuf_free(packet->buffer);
     1962                packet->buffer = NULL;
     1963        }
     1964}
     1965
    12701966
    12711967static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    1272     int nb_rx; /* Number of rx packets we've recevied */
    1273     struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
     1968    int nb_rx; /* Number of rx packets we've received */
    12741969
    12751970    /* Free the last packet buffer */
    12761971    if (packet->buffer != NULL) {
    1277         /* Buffer is owned by DPDK */
    1278         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1279             rte_pktmbuf_free(packet->buffer);
    1280             packet->buffer = NULL;
    1281         } else
    1282         /* Buffer is owned by packet i.e. has been malloc'd */
    1283         if (packet->buf_control == TRACE_CTRL_PACKET) {
    1284             free(packet->buffer);
    1285             packet->buffer = NULL;
    1286         }
    1287     }
    1288    
     1972        /* Buffer is owned by DPDK */
     1973        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1974            rte_pktmbuf_free(packet->buffer);
     1975            packet->buffer = NULL;
     1976        } else
     1977        /* Buffer is owned by packet i.e. has been malloc'd */
     1978        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1979            free(packet->buffer);
     1980            packet->buffer = NULL;
     1981        }
     1982    }
     1983
    12891984    packet->buf_control = TRACE_CTRL_EXTERNAL;
    12901985    packet->type = TRACE_RT_DATA_DPDK;
    1291    
     1986
     1987    /* Check if we already have some packets buffered */
     1988    if (FORMAT(libtrace)->burst_size != FORMAT(libtrace)->burst_offset) {
     1989            packet->buffer =  FORMAT(libtrace)->burst_pkts[FORMAT(libtrace)->burst_offset++];
     1990            dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     1991            return 1; // TODO should be bytes read, which essentially useless anyway
     1992    }
    12921993    /* Wait for a packet */
    12931994    while (1) {
    1294         /* Poll for a single packet */
    1295         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    1296                             FORMAT(libtrace)->queue_id, pkts_burst, 1);       
    1297         if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    1298             return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    1299         }
    1300     }
    1301    
     1995        /* Poll for a single packet */
     1996        nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
     1997                                 FORMAT(libtrace)->queue_id, FORMAT(libtrace)->burst_pkts, BURST_SIZE);
     1998        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
     1999                FORMAT(libtrace)->burst_size = nb_rx;
     2000                FORMAT(libtrace)->burst_offset = 1;
     2001                dpdk_ready_pkts(FORMAT(libtrace), &FORMAT(libtrace)->per_lcore[0], FORMAT(libtrace)->burst_pkts, nb_rx, NULL);
     2002                packet->buffer = FORMAT(libtrace)->burst_pkts[0];
     2003                dpdk_prepare_packet(libtrace, packet, packet->buffer, packet->type, 0);
     2004                return 1; // TODO should be bytes read, which essentially useless anyway
     2005        }
     2006        if (libtrace_halt) {
     2007                return 0;
     2008        }
     2009        /* Wait a while, polling on memory degrades performance
     2010         * This relieves the pressure on memory allowing the NIC to DMA */
     2011        rte_delay_us(10);
     2012    }
     2013
     2014    /* We'll never get here - but if we did it would be bad */
     2015    return -1;
     2016}
     2017
     2018static int dpdk_pread_packets (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) {
     2019    size_t nb_rx; /* Number of rx packets we've recevied */
     2020    struct rte_mbuf* pkts_burst[nb_packets]; /* Array of pointer(s) */
     2021    size_t i;
     2022
     2023    for (i = 0 ; i < nb_packets ; ++i) {
     2024            /* Free the last packet buffer */
     2025            if (packets[i]->buffer != NULL) {
     2026                /* Buffer is owned by DPDK */
     2027                if (packets[i]->buf_control == TRACE_CTRL_EXTERNAL) {
     2028                    rte_pktmbuf_free(packets[i]->buffer);
     2029                    packets[i]->buffer = NULL;
     2030                } else
     2031                /* Buffer is owned by packet i.e. has been malloc'd */
     2032                if (packets[i]->buf_control == TRACE_CTRL_PACKET) {
     2033                    free(packets[i]->buffer);
     2034                    packets[i]->buffer = NULL;
     2035                }
     2036            }
     2037            packets[i]->buf_control = TRACE_CTRL_EXTERNAL;
     2038            packets[i]->type = TRACE_RT_DATA_DPDK;
     2039    }
     2040
     2041    /* Wait for a packet */
     2042    while (1) {
     2043        /* Poll for a single packet */
     2044        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
     2045                            PERPKT_FORMAT(t)->queue_id, pkts_burst, nb_packets);
     2046        if (nb_rx > 0) {
     2047                /* Got some packets - otherwise we keep spining */
     2048                //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     2049                dpdk_ready_pkts(FORMAT(libtrace), PERPKT_FORMAT(t), pkts_burst, nb_rx, packets);
     2050                return nb_rx;
     2051        }
     2052        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
     2053        if (libtrace_message_queue_count(&t->messages) > 0) {
     2054                printf("Extra message yay");
     2055                return -2;
     2056        }
     2057        if (libtrace_halt) {
     2058                return 0;
     2059        }
     2060        /* Wait a while, polling on memory degrades performance
     2061         * This relieves the pressure on memory allowing the NIC to DMA */
     2062        rte_delay_us(10);
     2063    }
     2064
    13022065    /* We'll never get here - but if we did it would be bad */
    13032066    return -1;
     
    13072070    struct timeval tv;
    13082071    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1309    
     2072
    13102073    tv.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    13112074    tv.tv_usec = (hdr->timestamp % (uint64_t) 1000000000) / 1000;
     
    13162079    struct timespec ts;
    13172080    struct dpdk_addt_hdr * hdr = get_addt_hdr(packet);
    1318    
     2081
    13192082    ts.tv_sec = hdr->timestamp / (uint64_t) 1000000000;
    13202083    ts.tv_nsec = hdr->timestamp % (uint64_t) 1000000000;
     
    13432106static uint64_t dpdk_get_dropped_packets (libtrace_t *trace) {
    13442107    struct rte_eth_stats stats = {0};
    1345    
     2108
    13462109    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1347         return UINT64_MAX;
     2110        return UINT64_MAX;
    13482111    /* Grab the current stats */
    13492112    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1350    
     2113
    13512114    /* Get the drop counter */
    13522115    return (uint64_t) stats.ierrors;
     
    13552118static uint64_t dpdk_get_captured_packets (libtrace_t *trace) {
    13562119    struct rte_eth_stats stats = {0};
    1357    
     2120
    13582121    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1359         return UINT64_MAX;
     2122        return UINT64_MAX;
    13602123    /* Grab the current stats */
    13612124    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1362    
     2125
    13632126    /* Get the drop counter */
    13642127    return (uint64_t) stats.ipackets;
     
    13682131 * This is the number of packets filtered by the NIC
    13692132 * and maybe ahead of number read using libtrace.
    1370  * 
     2133 *
    13712134 * XXX we are yet to implement any filtering, but if it was this should
    13722135 * get the result. So this will just return 0 for now.
     
    13742137static uint64_t dpdk_get_filtered_packets (libtrace_t *trace) {
    13752138    struct rte_eth_stats stats = {0};
    1376    
     2139
    13772140    if (trace->format_data == NULL || FORMAT(trace)->port == 0xFF)
    1378         return UINT64_MAX;
     2141        return UINT64_MAX;
    13792142    /* Grab the current stats */
    13802143    rte_eth_stats_get(FORMAT(trace)->port, &stats);
    1381    
     2144
    13822145    /* Get the drop counter */
    13832146    return (uint64_t) stats.fdirmiss;
     
    13892152 */
    13902153static libtrace_eventobj_t dpdk_trace_event(libtrace_t *trace,
    1391                                         libtrace_packet_t *packet) {
     2154                                        libtrace_packet_t *packet) {
    13922155    libtrace_eventobj_t event = {0,0,0.0,0};
    13932156    int nb_rx; /* Number of receive packets we've read */
    13942157    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
    1395    
     2158
    13962159    do {
    1397    
    1398         /* See if we already have a packet waiting */
    1399         nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
    1400                         FORMAT(trace)->queue_id, pkts_burst, 1);
    1401        
    1402         if (nb_rx > 0) {
    1403             /* Free the last packet buffer */
    1404             if (packet->buffer != NULL) {
    1405                 /* Buffer is owned by DPDK */
    1406                 if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
    1407                     rte_pktmbuf_free(packet->buffer);
    1408                     packet->buffer = NULL;
    1409                 } else
    1410                 /* Buffer is owned by packet i.e. has been malloc'd */
    1411                 if (packet->buf_control == TRACE_CTRL_PACKET) {
    1412                     free(packet->buffer);
    1413                     packet->buffer = NULL;
    1414                 }
    1415             }
    1416            
    1417             packet->buf_control = TRACE_CTRL_EXTERNAL;
    1418             packet->type = TRACE_RT_DATA_DPDK;
    1419             event.type = TRACE_EVENT_PACKET;
    1420             event.size = dpdk_ready_pkt(trace, packet, pkts_burst[0]);
    1421            
    1422             /* XXX - Check this passes the filter trace_read_packet normally
    1423              * does this for us but this wont */
    1424             if (trace->filter) {
    1425                 if (!trace_apply_filter(trace->filter, packet)) {
    1426                     /* Failed the filter so we loop for another packet */
    1427                     trace->filtered_packets ++;
    1428                     continue;
    1429                 }
    1430             }
    1431             trace->accepted_packets ++;
    1432         } else {
    1433             /* We only want to sleep for a very short time - we are non-blocking */
    1434             event.type = TRACE_EVENT_SLEEP;
    1435             event.seconds = 0.0001;
    1436             event.size = 0;
    1437         }
    1438        
    1439         /* If we get here we have our event */
    1440         break;
     2160
     2161        /* See if we already have a packet waiting */
     2162        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
     2163                        FORMAT(trace)->queue_id, pkts_burst, 1);
     2164
     2165        if (nb_rx > 0) {
     2166            /* Free the last packet buffer */
     2167            if (packet->buffer != NULL) {
     2168                /* Buffer is owned by DPDK */
     2169                if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     2170                    rte_pktmbuf_free(packet->buffer);
     2171                    packet->buffer = NULL;
     2172                } else
     2173                /* Buffer is owned by packet i.e. has been malloc'd */
     2174                if (packet->buf_control == TRACE_CTRL_PACKET) {
     2175                    free(packet->buffer);
     2176                    packet->buffer = NULL;
     2177                }
     2178            }
     2179
     2180            packet->buf_control = TRACE_CTRL_EXTERNAL;
     2181            packet->type = TRACE_RT_DATA_DPDK;
     2182            event.type = TRACE_EVENT_PACKET;
     2183            dpdk_ready_pkts(FORMAT(trace), &FORMAT(trace)->per_lcore[0], pkts_burst, 1, &packet);
     2184            event.size = 1; // TODO should be bytes read, which essentially useless anyway
     2185
     2186            /* XXX - Check this passes the filter trace_read_packet normally
     2187             * does this for us but this wont */
     2188            if (trace->filter) {
     2189                if (!trace_apply_filter(trace->filter, packet)) {
     2190                    /* Failed the filter so we loop for another packet */
     2191                    trace->filtered_packets ++;
     2192                    continue;
     2193                }
     2194            }
     2195            trace->accepted_packets ++;
     2196        } else {
     2197            /* We only want to sleep for a very short time - we are non-blocking */
     2198            event.type = TRACE_EVENT_SLEEP;
     2199            event.seconds = 0.0001;
     2200            event.size = 0;
     2201        }
     2202
     2203        /* If we get here we have our event */
     2204        break;
    14412205    } while (1);
    14422206
     
    14632227}
    14642228
    1465  static struct libtrace_format_t dpdk = {
     2229static struct libtrace_format_t dpdk = {
    14662230        "dpdk",
    14672231        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
     
    14802244        dpdk_read_packet,           /* read_packet */
    14812245        dpdk_prepare_packet,    /* prepare_packet */
    1482         NULL,                               /* fin_packet */
     2246        dpdk_fin_packet,                                    /* fin_packet */
    14832247        dpdk_write_packet,          /* write_packet */
    14842248        dpdk_get_link_type,         /* get_link_type */
     
    15032267        dpdk_trace_event,               /* trace_event */
    15042268    dpdk_help,              /* help */
    1505         NULL
     2269    NULL,                   /* next pointer */
     2270    {true, 8},              /* Live, NICs typically have 8 threads */
     2271    dpdk_pstart_input, /* pstart_input */
     2272        dpdk_pread_packets, /* pread_packets */
     2273        dpdk_pause_input, /* ppause */
     2274        dpdk_fin_input, /* p_fin */
     2275        dpdk_pconfig_input, /* pconfig_input */
     2276    dpdk_pregister_thread, /* pregister_thread */
     2277    dpdk_punregister_thread /* unpregister_thread */
    15062278};
    15072279
  • lib/format_duck.c

    r9b097ea rb13b939  
    360360        NULL,                           /* trace_event */
    361361        duck_help,                      /* help */
    362         NULL                            /* next pointer */
     362        NULL,                            /* next pointer */
     363        NON_PARALLEL(false)
    363364};
    364365
  • lib/format_erf.c

    rc69aecb rc69aecb  
    837837        erf_event,                      /* trace_event */
    838838        erf_help,                       /* help */
    839         NULL                            /* next pointer */
     839        NULL,                           /* next pointer */
     840        NON_PARALLEL(false)
    840841};
    841842
     
    880881        erf_event,                      /* trace_event */
    881882        erf_help,                       /* help */
    882         NULL                            /* next pointer */
     883        NULL,                           /* next pointer */
     884        NON_PARALLEL(false)
    883885};
    884886
  • lib/format_legacy.c

    r1ca603b rb13b939  
    552552        trace_event_trace,              /* trace_event */
    553553        legacyatm_help,                 /* help */
    554         NULL                            /* next pointer */
     554        NULL,                           /* next pointer */
     555        NON_PARALLEL(false)
    555556};
    556557
     
    595596        trace_event_trace,              /* trace_event */
    596597        legacyeth_help,                 /* help */
    597         NULL                            /* next pointer */
     598        NULL,                           /* next pointer */
     599        NON_PARALLEL(false)
    598600};
    599601
     
    639641        legacypos_help,                 /* help */
    640642        NULL,                           /* next pointer */
     643        NON_PARALLEL(false)
    641644};
    642645
     
    682685        legacynzix_help,                /* help */
    683686        NULL,                           /* next pointer */
     687        NON_PARALLEL(false)
    684688};
    685689       
  • lib/format_linux.c

    r63af502 r0a1d2d0  
    7272#include <sys/mman.h>
    7373
     74#include <fcntl.h>
     75
    7476/* MAX_ORDER is defined in linux/mmzone.h. 10 is default for 2.4 kernel.
    7577 * max_order will be decreased by one if the ring buffer fails to allocate.
     
    147149#define PACKET_HDRLEN   11
    148150#define PACKET_TX_RING  13
     151#define PACKET_FANOUT   18
    149152#define TP_STATUS_USER  0x1
    150153#define TP_STATUS_SEND_REQUEST  0x1
     
    154157#define TPACKET_ALIGN(x)        (((x)+TPACKET_ALIGNMENT-1)&~(TPACKET_ALIGNMENT-1))
    155158#define TPACKET_HDRLEN         (TPACKET_ALIGN(sizeof(struct tpacket2_hdr)) + sizeof(struct sockaddr_ll))
     159
     160/* Since 3.1 kernel we have packet_fanout support */
     161// schedule to socket by skb's rxhash - the implementation is bi-directional
     162#define PACKET_FANOUT_HASH              0
     163// schedule round robin
     164#define PACKET_FANOUT_LB                1
     165// schedule to the same socket that received the packet
     166#define PACKET_FANOUT_CPU               2
     167// Something to do with fragmented packets and hashing problems !! TODO figure out if this needs to be on
     168#define PACKET_FANOUT_FLAG_DEFRAG       0x8000
     169/* Included but unused by libtrace since Linux 3.10 */
     170// if one socket if full roll over to the next
     171#define PACKET_FANOUT_ROLLOVER          3
     172// This flag makes any other system roll over
     173#define PACKET_FANOUT_FLAG_ROLLOVER     0x1000
     174/* Included but unused by libtrace since Linux 3.12 */
     175// schedule random
     176#define PACKET_FANOUT_RND               4
     177
    156178
    157179enum tpacket_versions {
     
    185207        unsigned int tp_frame_nr;    /* Total number of frames */
    186208};
     209
     210struct linux_per_thread_t {
     211        char *rx_ring;
     212        int rxring_offset;
     213        int fd;
     214        // The flag layout should be the same for all (I Hope)
     215        // max_order
     216} ALIGN_STRUCT(CACHE_LINE_SIZE);
    187217
    188218struct linux_format_data_t {
     
    212242        /* Used to determine buffer size for the ring buffer */
    213243        uint32_t max_order;
     244        /* Used for the parallel case, fanout is the mode */
     245        uint16_t fanout_flags;
     246        /* The group lets Linux know which sockets to group together
     247         * so we use a random here to try avoid collisions */
     248        uint16_t fanout_group;
     249        /* When running in parallel mode this is malloc'd with an array
     250         * file descriptors from packet fanout will use, here we assume/hope
     251         * that every ring can get setup the same */
     252        struct linux_per_thread_t *per_thread;
    214253};
    215254
     
    266305
    267306#define FORMAT(x) ((struct linux_format_data_t*)(x))
     307#define PERPKT_FORMAT(x) ((struct linux_per_thread_t*)(x->format_data))
    268308#define DATAOUT(x) ((struct linux_output_format_data_t*)((x)->format_data))
    269309
     
    367407        FORMAT(libtrace->format_data)->rxring_offset = 0;
    368408        FORMAT(libtrace->format_data)->max_order = MAX_ORDER;
     409        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB; // This might be best or alternatively PACKET_FANOUT_LB
     410        // Some examples use pid for the group however that would limit a single
     411        // application to use only int/ring format, instead using rand
     412        FORMAT(libtrace->format_data)->fanout_group = (uint16_t) rand();
     413        FORMAT(libtrace->format_data)->per_thread = NULL;
    369414}
    370415static int linuxring_init_input(libtrace_t *libtrace)
    371416{       
    372417        init_input(libtrace);
    373         FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_RING;
     418        FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_RING;
    374419        return 0;
    375420}
     
    377422{
    378423        init_input(libtrace);
    379         FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_NATIVE;
     424        FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_NATIVE;
    380425        return 0;
    381426}
     
    508553                }
    509554               
    510                 if (setsockopt(FORMAT(libtrace->format_data)->fd,
     555                if (setsockopt(FORMAT(libtrace->format_data)->fd,
    511556                                        SOL_SOCKET,
    512557                                        SO_ATTACH_FILTER,
     
    588633        return 0;
    589634}
    590 static int linuxring_start_input(libtrace_t *libtrace){
    591 
    592         char error[2048];       
     635
     636/**
     637 * Converts a socket, either packet_mmap or standard raw socket into a
     638 * fanout socket.
     639 * NOTE: This means we can read from the socket with multiple queues,
     640 * each must be setup (identically) and then this called upon them
     641 *
     642 * @return 0 success, -1 error
     643 */
     644static inline int socket_to_packet_fanout(int fd,
     645                                        uint16_t fanout_flags,
     646                                        uint16_t fanout_group) {
     647        int fanout_opt = ((int)fanout_flags << 16) | (int)fanout_group;
     648        if (setsockopt(fd, SOL_PACKET, PACKET_FANOUT,
     649                        &fanout_opt, sizeof(fanout_opt)) == -1) {
     650                return -1;
     651        }
     652        return 0;
     653}
     654
     655static int linuxnative_ppause_input(libtrace_t *libtrace)
     656{
     657        int i;
     658        int tot = libtrace->perpkt_thread_count;
     659        printf("CAlling native pause packet\n");
     660       
     661        for (i = 0; i < tot; i++) {
     662                close(FORMAT(libtrace->format_data)->per_thread[i].fd);
     663        }
     664       
     665        free(FORMAT(libtrace->format_data)->per_thread);
     666        FORMAT(libtrace->format_data)->per_thread = NULL;
     667        return 0;
     668}
     669
     670static int linuxring_start_input(libtrace_t *libtrace)
     671{
     672        char error[2048];
    593673
    594674        /* We set the socket up the same and then convert it to PACKET_MMAP */
     
    615695}
    616696
     697static int linuxnative_pstart_input(libtrace_t *libtrace) {
     698        int i = 0;
     699        int tot = libtrace->perpkt_thread_count;
     700        int iserror = 0;
     701        // We store this here otherwise it will be leaked if the memory doesn't know
     702        struct linux_per_thread_t *per_thread = NULL;
     703       
     704        if (!FORMAT(libtrace->format_data)->per_thread) {
     705                //per_thread = calloc(tot, sizeof(struct linux_per_thread_t));
     706                posix_memalign((void **)&per_thread, CACHE_LINE_SIZE, tot*sizeof(struct linux_per_thread_t));
     707                FORMAT(libtrace->format_data)->per_thread = per_thread;
     708        } else {
     709                // Whats going on this might not work 100%
     710                // We assume all sockets have been closed ;)
     711                printf("Pause and then start called again lets hope that perpkt_thread_count hasn't changed\n");
     712        }
     713       
     714        printf("Calling native pstart packet\n");
     715        for (i = 0; i < tot; ++i)
     716        {
     717                if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_NATIVE) {
     718                        if (linuxnative_start_input(libtrace) != 0) {
     719                                iserror = 1;
     720                                break;
     721                        }
     722                } else {
     723                        // This must be ring
     724                        if (linuxring_start_input(libtrace) != 0) {
     725                                iserror = 1;
     726                                break;
     727                        }
     728                }
     729                if (socket_to_packet_fanout(FORMAT(libtrace->format_data)->fd, FORMAT(libtrace->format_data)->fanout_flags, FORMAT(libtrace->format_data)->fanout_group) != 0)
     730                {
     731                        iserror = 1;
     732                        // Clean up here to keep consistent with every one else
     733                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Converting the fd to a socket fanout failed");
     734                        close(FORMAT(libtrace->format_data)->fd);
     735                        free(libtrace->format_data);
     736                        libtrace->format_data = NULL;
     737                        break;
     738                }
     739                per_thread[i].fd = FORMAT(libtrace->format_data)->fd;
     740                if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_RING) {
     741                        per_thread[i].rxring_offset = FORMAT(libtrace->format_data)->rxring_offset;
     742                        per_thread[i].rx_ring = FORMAT(libtrace->format_data)->rx_ring;
     743                }
     744        }
     745       
     746        // Roll back those that failed - by this point in time the format_data
     747        // has been freed
     748        if (iserror) {
     749                for (i = i - 1; i >= 0; i--) {
     750                        close(per_thread[i].fd);
     751                }
     752                free(per_thread);
     753                per_thread = NULL;
     754                return -1;
     755        }
     756       
     757        return 0;
     758}
     759
     760static int linux_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading) {
     761        fprintf(stderr, "registering thread %d!!\n", t->perpkt_num);
     762    if (reading) {
     763        if(t->type == THREAD_PERPKT) {
     764            t->format_data = &FORMAT(libtrace->format_data)->per_thread[t->perpkt_num];
     765        } else {
     766            t->format_data = &FORMAT(libtrace->format_data)->per_thread[0];
     767        }
     768    }
     769    return 0;
     770}
     771
    617772static int linuxnative_start_output(libtrace_out_t *libtrace)
    618773{
     
    621776                free(DATAOUT(libtrace));
    622777                return -1;
    623         }       
     778        }
    624779
    625780        return 0;
     
    666821        return 0;
    667822}
     823
    668824static int linuxring_pause_input(libtrace_t *libtrace)
    669825{
     
    765921                         */
    766922                        f->flag = 1;
    767                 }
     923                }
    768924
    769925                pcap_close(pcap);
     
    813969#endif /* HAVE_NETPACKET_PACKET_H */
    814970
     971
     972static int linuxnative_pconfig_input(libtrace_t *libtrace,
     973                trace_parallel_option_t option,
     974                void *data)
     975{
     976        switch(option) {
     977                case TRACE_OPTION_SET_HASHER:
     978                        switch (*((enum hasher_types *)data)) {
     979                                case HASHER_BALANCE:
     980                                        // Do fanout
     981                                        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB;
     982                                        // Or we could balance to the CPU
     983                                        return 0;
     984                                case HASHER_BIDIRECTIONAL:
     985                                case HASHER_UNIDIRECTIONAL:
     986                                        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_HASH;
     987                                        return 0;
     988                                case HASHER_CUSTOM:
     989                                case HASHER_HARDWARE:
     990                                        return -1;
     991                        }
     992                        break;
     993                /* Avoid default: so that future options will cause a warning
     994                 * here to remind us to implement it, or flag it as
     995                 * unimplementable
     996                 */
     997        }
     998       
     999        /* Don't set an error - trace_config will try to deal with the
     1000         * option and will set an error if it fails */
     1001        return -1;
     1002}
     1003
     1004
    8151005static int linuxnative_prepare_packet(libtrace_t *libtrace UNUSED,
    8161006                libtrace_packet_t *packet, void *buffer,
     
    8841074
    8851075#ifdef HAVE_NETPACKET_PACKET_H
    886 static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1076libtrace_thread_t * get_thread_table(libtrace_t *libtrace) ;
     1077inline static int linuxnative_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, const int check_queue)
    8871078{
    8881079        struct libtrace_linuxnative_header *hdr;
     
    8921083        struct cmsghdr *cmsg;
    8931084        int snaplen;
     1085
    8941086        uint32_t flags = 0;
    8951087        fd_set readfds;
     
    9121104                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*hdr),
    9131105                        (int)FORMAT(libtrace->format_data)->snaplen);
    914 
    9151106        /* Prepare the msghdr and iovec for the kernel to write the
    9161107         * captured packet into. The msghdr will point to the part of our
     
    9301121        iovec.iov_base = (void*)(packet->buffer+sizeof(*hdr));
    9311122        iovec.iov_len = snaplen;
    932 
     1123       
     1124        if (check_queue) {
     1125                // Check for a packet - TODO only Linux has MSG_DONTWAIT should use fctl O_NONBLOCK
     1126                hdr->wirelen = recvmsg(fd, &msghdr, MSG_DONTWAIT | MSG_TRUNC);
     1127                if ((int) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
     1128                        // Do message queue check or select
     1129                        int ret;
     1130                        fd_set rfds;
     1131                        FD_ZERO(&rfds);
     1132                        FD_SET(fd, &rfds);
     1133                        FD_SET(get_thread_table(libtrace)->messages.pipefd[0], &rfds);
     1134                        int largestfd = fd > get_thread_table(libtrace)->messages.pipefd[0] ? fd : get_thread_table(libtrace)->messages.pipefd[0];
     1135                        do {
     1136                                ret = select(largestfd+1, &rfds, NULL, NULL, NULL);
     1137                                if (ret == -1 && errno != EINTR)
     1138                                        perror("Select() failed");
     1139                        }
     1140                        while (ret == -1);
     1141                       
     1142                        assert (ret == 1 || ret == 2); // No timeout 0 is not an option
     1143                       
     1144                        if (FD_ISSET(get_thread_table(libtrace)->messages.pipefd[0], &rfds)) {
     1145                                // Not an error but check the message queue we have something
     1146                                return -2;
     1147                        }
     1148                        // Otherwise we must have a packet
     1149                        hdr->wirelen = recvmsg(fd, &msghdr, MSG_TRUNC);
     1150                }
     1151        } else {
    9331152        /* Use select to allow us to time out occasionally to check if someone
    9341153         * has hit Ctrl-C or otherwise wants us to stop reading and return
     
    9611180                        return 0;
    9621181        }
    963 
    9641182        hdr->wirelen = recvmsg(FORMAT(libtrace->format_data)->fd, &msghdr, MSG_TRUNC);
    965 
     1183        }
     1184
     1185       
    9661186        if (hdr->wirelen==~0U) {
    9671187                trace_set_err(libtrace,errno,"recvmsg");
     
    10111231        if (cmsg == NULL) {
    10121232                struct timeval tv;
    1013                 if (ioctl(FORMAT(libtrace->format_data)->fd,
    1014                                   SIOCGSTAMP,&tv)==0) {
     1233                if (ioctl(fd, SIOCGSTAMP,&tv)==0) {
    10151234                        hdr->tv.tv_sec = tv.tv_sec;
    10161235                        hdr->tv.tv_usec = tv.tv_usec;
     
    10331252}
    10341253
     1254static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1255{
     1256        int fd = FORMAT(libtrace->format_data)->fd;
     1257        return linuxnative_read_packet_fd(libtrace, packet, fd, 0);
     1258}
     1259
     1260static int linuxnative_pread_packets(libtrace_t *libtrace,
     1261                                     libtrace_thread_t *t,
     1262                                     libtrace_packet_t **packets,
     1263                                     UNUSED size_t nb_packets) {
     1264        /* For now just read one packet */
     1265        int fd = PERPKT_FORMAT(t)->fd;
     1266        packets[0]->error = linuxnative_read_packet_fd(libtrace, packets[0],
     1267                                                       fd, 1);
     1268        if (packets[0]->error >= 1)
     1269                return 1;
     1270        else
     1271                return packets[0]->error;
     1272}
     1273
    10351274#define LIBTRACE_BETWEEN(test,a,b) ((test) >= (a) && (test) < (b))
    10361275static int linuxring_get_capture_length(const libtrace_packet_t *packet);
     
    10391278/* Release a frame back to the kernel or free() if it's a malloc'd buffer
    10401279 */
    1041 inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet ){
     1280inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet){
    10421281        /* Free the old packet */
    10431282        if(packet->buffer == NULL)
     
    10511290                struct linux_format_data_t *ftd = FORMAT(libtrace->format_data);
    10521291               
    1053                 /* Check it's within our buffer first */
    1054                 if(LIBTRACE_BETWEEN((char *) packet->buffer,
     1292                /* Check it's within our buffer first - consider the pause resume case it might have already been free'd lets hope we get another buffer */
     1293                // For now let any one free anything
     1294                /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
    10551295                                (char *) ftd->rx_ring,
    10561296                                ftd->rx_ring
    1057                                 + ftd->req.tp_block_size * ftd->req.tp_block_nr)){
     1297                                + ftd->req.tp_block_size * ftd->req.tp_block_nr)){*/
    10581298                        TO_TP_HDR(packet->buffer)->tp_status = 0;
    10591299                        packet->buffer = NULL;
    1060                 }
    1061         }
    1062 }
    1063 
    1064 static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1300                /*}*/
     1301        }
     1302}
     1303
     1304/**
     1305 * Free any resources being kept for this packet, Note: libtrace
     1306 * will ensure all fields are zeroed correctly.
     1307 */
     1308static void linuxring_fin_packet(libtrace_packet_t *packet)
     1309{
     1310        if (packet->buffer == NULL)
     1311                return;
     1312        assert(packet->trace);
     1313       
     1314        /* If we own the packet (i.e. it's not a copy), we need to free it */
     1315        if (packet->buf_control == TRACE_CTRL_EXTERNAL) {
     1316                /* Started should always match the existence of the rx_ring */
     1317                assert(!!FORMAT(packet->trace->format_data)->rx_ring ==
     1318                       !!packet->trace->started);
     1319                /* If we don't have a ring its already been destroyed */
     1320                if (FORMAT(packet->trace->format_data)->rx_ring)
     1321                        ring_release_frame(packet->trace, packet);
     1322                else
     1323                        packet->buffer = NULL;
     1324        }
     1325}
     1326
     1327inline static int linuxring_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, int *rxring_offset, char *rx_ring, int message) {
    10651328
    10661329        struct tpacket2_hdr *header;
    1067         struct pollfd pollset; 
    10681330        int ret;
    10691331        unsigned int snaplen;
     
    10751337       
    10761338        /* Fetch the current frame */
    1077         header = GET_CURRENT_BUFFER(libtrace);
     1339        header = ((void*) rx_ring) + *rxring_offset * FORMAT(libtrace->format_data)->req.tp_frame_size; // GET_CURRENT_BUFFER(libtrace);
    10781340        assert((((unsigned long) header) & (pagesize - 1)) == 0);
    10791341
    1080         while (1) {
    1081                 pollset.fd = FORMAT(libtrace->format_data)->fd;
    1082                 pollset.events = POLLIN;
    1083                 pollset.revents = 0;
    1084                 /* Wait for more data */
    1085                 ret = poll(&pollset, 1, 500);
    1086                 if (ret < 0) {
    1087                         if (errno != EINTR)
    1088                                 trace_set_err(libtrace,errno,"poll()");
    1089                         return -1;
    1090                 } else if (ret == 0) {
    1091                         /* Poll timed out - check if we should exit */
    1092                         if (libtrace_halt)
    1093                                 return 0;
    1094                         continue;
    1095                 }
    1096 
    1097                 /* TP_STATUS_USER means that we can use the frame.
    1098                  * When a slot does not have this flag set, the frame is not
    1099                  * ready for consumption.
    1100                  */
    1101                 if (header->tp_status & TP_STATUS_USER)
    1102                         break;
     1342        /* TP_STATUS_USER means that we can use the frame.
     1343         * When a slot does not have this flag set, the frame is not
     1344         * ready for consumption.
     1345         */
     1346        while (!(header->tp_status & TP_STATUS_USER)) {
     1347                if (message) {
     1348                        struct pollfd pollset[2];
     1349                        pollset[0].fd = fd;
     1350                        pollset[0].events = POLLIN;
     1351                        pollset[0].revents = 0;
     1352                        pollset[1].fd = libtrace_message_queue_get_fd(&get_thread_table(libtrace)->messages);
     1353                        pollset[1].events = POLLIN;
     1354                        pollset[1].revents = 0;
     1355                        /* Wait for more data or a message*/
     1356                        ret = poll(pollset, 2, -1);
     1357                        if (ret < 0) {
     1358                                if (errno != EINTR)
     1359                                        trace_set_err(libtrace,errno,"poll()");
     1360                                return -1;
     1361                        }
     1362                        /* A message is ready */
     1363                        if (pollset[1].revents)
     1364                                return -2;
     1365                } else {
     1366                        struct pollfd pollset;
     1367                        pollset.fd = fd;
     1368                        pollset.events = POLLIN;
     1369                        pollset.revents = 0;
     1370
     1371                        /* Wait for more data or a message*/
     1372                        ret = poll(&pollset, 1, 500);
     1373                        if (ret < 0) {
     1374                                if (errno != EINTR)
     1375                                        trace_set_err(libtrace,errno,"poll()");
     1376                                return -1;
     1377                        } else if (ret == 0) {
     1378                                /* Poll timed out - check if we should exit */
     1379                                if (libtrace_halt)
     1380                                        return 0;
     1381                                continue;
     1382                        }
     1383                }
    11031384        }
    11041385
     
    11151396
    11161397        /* Move to next buffer */
    1117         FORMAT(libtrace->format_data)->rxring_offset++;
    1118         FORMAT(libtrace->format_data)->rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
     1398        (*rxring_offset)++;
     1399        *rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
    11191400
    11201401        /* We just need to get prepare_packet to set all our packet pointers
     
    11261407                                linuxring_get_capture_length(packet);
    11271408
     1409}
     1410
     1411static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1412        int fd = FORMAT(libtrace->format_data)->fd;
     1413        int *rxring_offset = &FORMAT(libtrace->format_data)->rxring_offset;
     1414        char *rx_ring = FORMAT(libtrace->format_data)->rx_ring;
     1415        return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 0);
     1416}
     1417
     1418static int linuxring_pread_packets(libtrace_t *libtrace,
     1419                                   libtrace_thread_t *t,
     1420                                   libtrace_packet_t **packets,
     1421                                   UNUSED size_t nb_packets) {
     1422        //fprintf(stderr, "Thread number is #%d\n", t->perpkt_num);
     1423        int fd = PERPKT_FORMAT(t)->fd;
     1424        int *rxring_offset = &PERPKT_FORMAT(t)->rxring_offset;
     1425        char *rx_ring = PERPKT_FORMAT(t)->rx_ring;
     1426        packets[0]->error = linuxring_read_packet_fd(libtrace, packets[0], fd,
     1427                                                     rxring_offset, rx_ring, 1);
     1428        if (packets[0]->error >= 1)
     1429                return 1;
     1430        else
     1431                return packets[0]->error;
    11281432}
    11291433
     
    11831487
    11841488        return ret;
    1185 
    1186 }
     1489}
     1490
    11871491static int linuxring_write_packet(libtrace_out_t *trace,
    11881492                libtrace_packet_t *packet)
     
    14961800/* Number of packets that passed filtering */
    14971801static uint64_t linuxnative_get_captured_packets(libtrace_t *trace) {
     1802        struct tpacket_stats stats;
     1803
    14981804        if (trace->format_data == NULL)
    14991805                return UINT64_MAX;
     
    15041810        }
    15051811
    1506 #ifdef HAVE_NETPACKET_PACKET_H 
    1507         if ((FORMAT(trace->format_data)->stats_valid & 1)
    1508                         || FORMAT(trace->format_data)->stats_valid == 0) {
    1509                 socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
    1510                 getsockopt(FORMAT(trace->format_data)->fd,
    1511                                 SOL_PACKET,
    1512                                 PACKET_STATISTICS,
    1513                                 &FORMAT(trace->format_data)->stats,
    1514                                 &len);
    1515                 FORMAT(trace->format_data)->stats_valid |= 1;
     1812#ifdef HAVE_NETPACKET_PACKET_H
     1813
     1814        if ((FORMAT(trace->format_data)->stats_valid & 1)
     1815                || FORMAT(trace->format_data)->stats_valid == 0) {
     1816                if (FORMAT(trace->format_data)->per_thread) {
     1817                        int i;
     1818                        FORMAT(trace->format_data)->stats.tp_drops = 0;
     1819                        FORMAT(trace->format_data)->stats.tp_packets = 0;
     1820                        for (i = 0; i < trace->perpkt_thread_count; ++i) {
     1821                                socklen_t len = sizeof(stats);
     1822                                getsockopt(FORMAT(trace->format_data)->per_thread[i].fd,
     1823                                           SOL_PACKET,
     1824                                           PACKET_STATISTICS,
     1825                                           &stats,
     1826                                           &len);
     1827                                FORMAT(trace->format_data)->stats.tp_drops += stats.tp_drops;
     1828                                FORMAT(trace->format_data)->stats.tp_packets += stats.tp_packets;
     1829                        }
     1830                        FORMAT(trace->format_data)->stats_valid |= 1;
     1831                } else {
     1832                        socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
     1833                        getsockopt(FORMAT(trace->format_data)->fd,
     1834                                   SOL_PACKET,
     1835                                   PACKET_STATISTICS,
     1836                                   &FORMAT(trace->format_data)->stats,
     1837                                   &len);
     1838                        FORMAT(trace->format_data)->stats_valid |= 1;
     1839                }
    15161840        }
    15171841
     
    15261850 */
    15271851static uint64_t linuxnative_get_dropped_packets(libtrace_t *trace) {
     1852        struct tpacket_stats stats;
    15281853        if (trace->format_data == NULL)
    15291854                return UINT64_MAX;
     
    15331858                return UINT64_MAX;
    15341859        }
    1535        
     1860
    15361861#ifdef HAVE_NETPACKET_PACKET_H 
    15371862        if ((FORMAT(trace->format_data)->stats_valid & 2)
    1538                         || (FORMAT(trace->format_data)->stats_valid==0)) {
    1539                 socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
    1540                 getsockopt(FORMAT(trace->format_data)->fd,
    1541                                 SOL_PACKET,
    1542                                 PACKET_STATISTICS,
    1543                                 &FORMAT(trace->format_data)->stats,
    1544                                 &len);
    1545                 FORMAT(trace->format_data)->stats_valid |= 2;
     1863                || (FORMAT(trace->format_data)->stats_valid==0)) {
     1864                if (FORMAT(trace->format_data)->per_thread) {
     1865                        int i;
     1866                        FORMAT(trace->format_data)->stats.tp_drops = 0;
     1867                        FORMAT(trace->format_data)->stats.tp_packets = 0;
     1868                        for (i = 0; i < trace->perpkt_thread_count; ++i) {
     1869                                socklen_t len = sizeof(stats);
     1870                                getsockopt(FORMAT(trace->format_data)->per_thread[i].fd,
     1871                                           SOL_PACKET,
     1872                                           PACKET_STATISTICS,
     1873                                           &stats,
     1874                                           &len);
     1875                                FORMAT(trace->format_data)->stats.tp_drops += stats.tp_drops;
     1876                                FORMAT(trace->format_data)->stats.tp_packets += stats.tp_packets;
     1877                        }
     1878                        FORMAT(trace->format_data)->stats_valid |= 2;
     1879                } else {
     1880                        socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
     1881                        getsockopt(FORMAT(trace->format_data)->fd,
     1882                                   SOL_PACKET,
     1883                                   PACKET_STATISTICS,
     1884                                   &FORMAT(trace->format_data)->stats,
     1885                                   &len);
     1886                        FORMAT(trace->format_data)->stats_valid |= 2;
     1887                }
    15461888        }
    15471889
     
    16151957        trace_event_device,             /* trace_event */
    16161958        linuxnative_help,               /* help */
     1959        NULL,                                   /* next pointer */
     1960        {true, -1},              /* Live, no thread limit */
     1961        linuxnative_pstart_input,                       /* pstart_input */
     1962        linuxnative_pread_packets,                      /* pread_packets */
     1963        linuxnative_ppause_input,                       /* ppause */
     1964        linuxnative_fin_input,                          /* p_fin */
     1965        linuxnative_pconfig_input,                      /* pconfig input */
     1966        linux_pregister_thread,
    16171967        NULL
    16181968};
     
    16351985        linuxring_read_packet,  /* read_packet */
    16361986        linuxring_prepare_packet,       /* prepare_packet */
    1637         NULL,                           /* fin_packet */
     1987        linuxring_fin_packet,                           /* fin_packet */
    16381988        linuxring_write_packet, /* write_packet */
    16391989        linuxring_get_link_type,        /* get_link_type */
     
    16582008        linuxring_event,                /* trace_event */
    16592009        linuxring_help,         /* help */
     2010        NULL,                           /* next pointer */
     2011        {true, -1},              /* Live, no thread limit */
     2012        linuxnative_pstart_input,                       /* pstart_input */
     2013        linuxring_pread_packets,                        /* pread_packets */
     2014        linuxnative_ppause_input,                       /* ppause */
     2015        linuxnative_fin_input,                          /* p_fin */
     2016        linuxnative_pconfig_input,
     2017        linux_pregister_thread,
    16602018        NULL
    16612019};
     
    17102068        trace_event_device,             /* trace_event */
    17112069        linuxnative_help,               /* help */
    1712         NULL
     2070        NULL,                   /* next pointer */
     2071        NON_PARALLEL(true)
    17132072};
    17142073
     
    17532112        NULL,                           /* trace_event */
    17542113        linuxring_help,                 /* help */
    1755         NULL
     2114        NULL,                   /* next pointer */
     2115        NON_PARALLEL(true)
    17562116};
    17572117
  • lib/format_pcap.c

    r4649fea r4649fea  
    834834        trace_event_trace,              /* trace_event */
    835835        pcap_help,                      /* help */
    836         NULL                            /* next pointer */
     836        NULL,                   /* next pointer */
     837        NON_PARALLEL(false)
    837838};
    838839
     
    877878        trace_event_device,             /* trace_event */
    878879        pcapint_help,                   /* help */
    879         NULL                            /* next pointer */
     880        NULL,                   /* next pointer */
     881        NON_PARALLEL(true)
    880882};
    881883
  • lib/format_pcapfile.c

    rc69aecb rc69aecb  
    786786        pcapfile_event,         /* trace_event */
    787787        pcapfile_help,                  /* help */
    788         NULL                            /* next pointer */
     788        NULL,                   /* next pointer */
     789        NON_PARALLEL(false)
    789790};
    790791
  • lib/format_rt.c

    rc70f59f r4cc6e74  
    457457                                /* This may fail on a non-Linux machine */
    458458                                if (trace_is_err(RT_INFO->dummy_ring)) {
    459                                         trace_perror(RT_INFO->dummy_ring, "Creating dead int trace");
     459                                        trace_perror(RT_INFO->dummy_ring, "Creating dead ring trace");
    460460                                        return -1;
    461461                                }
     
    862862        trace_event_rt,             /* trace_event */
    863863        rt_help,                        /* help */
    864         NULL                            /* next pointer */
     864        NULL,                   /* next pointer */
     865        NON_PARALLEL(true) /* This is normally live */
    865866};
    866867
  • lib/format_tsh.c

    rc69aecb rc69aecb  
    274274        trace_event_trace,              /* trace_event */
    275275        tsh_help,                       /* help */
    276         NULL                            /* next pointer */
     276        NULL,                   /* next pointer */
     277        NON_PARALLEL(false)
    277278};
    278279
     
    322323        trace_event_trace,              /* trace_event */
    323324        tsh_help,                       /* help */
    324         NULL                            /* next pointer */
     325        NULL,                   /* next pointer */
     326        NON_PARALLEL(false)
    325327};
    326328
  • lib/libtrace.h.in

    ra2ce0a6 ra2ce0a6  
    117117/** DAG driver version installed on the current system */
    118118#define DAG_DRIVER_V "@DAG_VERSION_NUM@"
     119
     120/**
     121  * A version of assert that always runs the first argument even
     122  * when not debugging, however only asserts the condition if debugging
     123  * Intended for use mainly with pthread locks etc. which have error
     124  * returns but *should* never actually fail.
     125  */
     126#ifdef NDEBUG
     127#define ASSERT_RET(run, cond) run
     128#else
     129#define ASSERT_RET(run, cond) assert(run cond)
     130//#define ASSERT_RET(run, cond) run
     131#endif
    119132   
    120133#ifdef __cplusplus
     
    197210#endif
    198211
     212// Used to fight against false sharing
     213#define CACHE_LINE_SIZE 64
     214#define ALIGN_STRUCT(x) __attribute__((aligned(x)))
     215
    199216#ifdef _MSC_VER
    200217    #ifdef LT_BUILDING_DLL
     
    225242/** Opaque structure holding information about a bpf filter */
    226243typedef struct libtrace_filter_t libtrace_filter_t;
     244
     245typedef struct libtrace_thread_t libtrace_thread_t;
    227246
    228247/** If the packet has allocated its own memory the buffer_control should be
     
    511530        uint8_t transport_proto;        /**< Cached transport protocol */
    512531        uint32_t l4_remaining;          /**< Cached transport remaining */
     532        uint64_t order; /**< Notes the order of this packet in relation to the input */
     533        uint64_t hash; /**< A hash of the packet as supplied by the user */
     534        int error; /**< The error status of pread_packet */
    513535} libtrace_packet_t;
    514536
     
    31493171/*@}*/
    31503172
     3173/**
     3174 * A collection of types for convenience used in place of a
     3175 * simple void* to allow a any type of data to be stored.
     3176 *
     3177 * This is expected to be 8 bytes in length.
     3178 */
     3179typedef union {
     3180        /* Pointers */
     3181        void *ptr;
     3182        libtrace_packet_t *pkt;
     3183
     3184        /* C99 Integer types */
     3185        /* NOTE: Standard doesn't require 64-bit
     3186     * but x32 and x64 gcc does */
     3187        int64_t sint64;
     3188        uint64_t uint64;
     3189
     3190        uint32_t uint32s[2];
     3191        int32_t sint32s[2];
     3192        uint32_t uint32;
     3193        int32_t sint32;
     3194
     3195        uint16_t uint16s[4];
     3196        int16_t sint16s[4];
     3197        uint16_t uint16;
     3198        int16_t sint16;
     3199
     3200        uint8_t uint8s[8];
     3201        int8_t sint8s[8];
     3202        uint8_t uint8;
     3203        int8_t sint8;
     3204
     3205        size_t size;
     3206
     3207        /* C basic types - we cannot be certian of the size */
     3208        int sint;
     3209        unsigned int uint;
     3210
     3211        signed char schars[8];
     3212        unsigned char uchars[8];
     3213        signed char schar;
     3214        unsigned char uchar;
     3215
     3216        /* Real numbers */
     3217        float rfloat;
     3218        double rdouble;
     3219} libtrace_generic_types_t;
     3220
     3221typedef struct libtrace_message_t {
     3222        int code;
     3223        libtrace_generic_types_t additional;
     3224        libtrace_thread_t *sender;
     3225} libtrace_message_t;
     3226
     3227/** Structure holding information about a result */
     3228typedef struct libtrace_result_t {
     3229        uint64_t key;
     3230        libtrace_generic_types_t value;
     3231        int type;
     3232} libtrace_result_t;
     3233#define RESULT_NORMAL 0
     3234#define RESULT_PACKET 1
     3235#define RESULT_TICK   2
     3236
     3237
     3238typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
     3239typedef void (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
     3240typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
     3241
     3242DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter);
     3243DLLEXPORT int trace_ppause(libtrace_t *libtrace);
     3244DLLEXPORT int trace_pstop(libtrace_t *libtrace);
     3245DLLEXPORT void trace_join(libtrace_t * trace);
     3246DLLEXPORT void print_contention_stats (libtrace_t *libtrace);
     3247
     3248DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
     3249DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result);
     3250DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value);
     3251DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result);
     3252DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value);
     3253DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
     3254
     3255// Ways to access Global and TLS storage that we provide the user
     3256DLLEXPORT void * trace_get_global(libtrace_t *trace);
     3257DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data);
     3258DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data);
     3259DLLEXPORT void * trace_get_tls(libtrace_thread_t *t);
     3260
     3261
     3262DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type);
     3263typedef struct libtrace_vector libtrace_vector_t;
     3264
     3265DLLEXPORT int trace_post_reporter(libtrace_t *libtrace);
     3266DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace);
     3267DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     3268DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     3269DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message);
     3270DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
     3271DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message);
     3272DLLEXPORT int trace_finished(libtrace_t * libtrace);
     3273DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet);
     3274DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet);
     3275DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order);
     3276DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash);
     3277DLLEXPORT uint64_t tv_to_usec(struct timeval *tv);
     3278
     3279DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
     3280
     3281DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt);
     3282DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res);
     3283
     3284typedef enum {
     3285        /**
     3286         * Sets the hasher function, if NULL(default) no hashing is used a
     3287         * cores will get packets on a first in first served basis
     3288         */
     3289        TRACE_OPTION_SET_HASHER,
     3290       
     3291        /**
     3292         * Libtrace set perpkt thread count
     3293         */
     3294        TRACE_OPTION_SET_PERPKT_THREAD_COUNT,
     3295       
     3296        /**
     3297         * Delays packets so they are played back in trace-time rather than as fast
     3298         * as possible.
     3299         */
     3300        TRACE_OPTION_TRACETIME,
     3301
     3302        /**
     3303         * Specifies the interval between tick packets in milliseconds, if 0
     3304         * or less this is ignored.
     3305         */
     3306        TRACE_OPTION_TICK_INTERVAL,
     3307        TRACE_OPTION_GET_CONFIG,
     3308        TRACE_OPTION_SET_CONFIG
     3309} trace_parallel_option_t;
     3310
     3311enum libtrace_messages {
     3312        MESSAGE_STARTING,
     3313        MESSAGE_RESUMING,
     3314        MESSAGE_STOPPING,
     3315        MESSAGE_PAUSING,
     3316        MESSAGE_DO_PAUSE,
     3317        MESSAGE_DO_STOP,
     3318        MESSAGE_FIRST_PACKET,
     3319        MESSAGE_PERPKT_ENDED,
     3320        MESSAGE_PERPKT_RESUMED,
     3321        MESSAGE_PERPKT_PAUSED,
     3322        MESSAGE_PERPKT_EOF,
     3323        MESSAGE_POST_REPORTER,
     3324        MESSAGE_POST_RANGE,
     3325        MESSAGE_TICK,
     3326        MESSAGE_USER
     3327};
     3328
     3329enum hasher_types {
     3330        /**
     3331         * Balance load across CPUs best as possible, this is basically to say do
     3332         * not care about hash. This might still might be implemented
     3333         * using a hash or round robin etc. under the hood depending on the format
     3334         */
     3335        HASHER_BALANCE,
     3336
     3337        /** Use a hash which is bi-directional for TCP flows, that is packets with
     3338         * the same hash are sent to the same thread. All non TCP packets will be
     3339         * sent to the same thread. UDP may or may not be sent to separate
     3340         * threads like TCP, this depends on the format support.
     3341         */
     3342        HASHER_BIDIRECTIONAL,
     3343       
     3344        /**
     3345         * Use a hash which is uni-directional across TCP flows, that means the
     3346         * opposite directions of the same 5 tuple might end up on separate cores.
     3347         * Otherwise is identical to HASHER_BIDIRECTIONAL
     3348         */
     3349        HASHER_UNIDIRECTIONAL,
     3350
     3351        /**
     3352         * Always use the user supplied hasher, this currently disables native
     3353         * support and is likely significantly slower.
     3354         */
     3355        HASHER_CUSTOM,
     3356
     3357        /**
     3358         * This is not a valid option, used internally only!!! TODO remove
     3359         * Set by the format if the hashing is going to be done in hardware
     3360         */
     3361        HASHER_HARDWARE
     3362};
     3363
     3364typedef struct libtrace_info_t {
     3365        /**
     3366         * True if a live format (i.e. packets have to be tracetime).
     3367         * Otherwise false, indicating packets can be read as fast
     3368         * as possible from the format.
     3369         */
     3370        bool live;
     3371
     3372        /**
     3373         * The maximum number of threads supported by a parallel trace. 1
     3374         * if parallel support is not native (in this case libtrace will simulate
     3375         * an unlimited number of threads), -1 means unlimited and 0 unknown.
     3376         */
     3377        int max_threads;
     3378
     3379        /* TODO hash fn supported list */
     3380
     3381        /* TODO consider time/clock details?? */
     3382} libtrace_info_t;
     3383
     3384DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value);
     3385DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3386DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3387DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data);
     3388DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace);
     3389
     3390/**
     3391 * Tuning the parallel sizes
     3392 */
     3393struct user_configuration {
     3394        // Packet memory cache settings (ocache_init) total
     3395        /**
     3396         * See diagrams, this sets the maximum size of freelist used to
     3397         * maintain packets and their memory buffers.
     3398         * NOTE setting this to less than recommend could cause deadlock a
     3399         * trace that manages its own packets.
     3400         * A unblockable error message will be printed.
     3401         */
     3402        size_t packet_cache_size;
     3403        /**
     3404         * Per thread local cache size for the packet freelist
     3405         */
     3406        size_t packet_thread_cache_size;
     3407        /**
     3408         * If true the total number of packets that can be created by a trace is limited
     3409         * to the packet_cache_size, otherwise once packet_cache_size is exceeded alloc
     3410         * and free will be used to create and free packets, this will be slower than
     3411         * using the freelist and could run a machine out of memory.
     3412         *
     3413         * However this does make it easier to ensure that deadlocks will not occur
     3414         * due to running out of packets
     3415         */
     3416        bool fixed_packet_count;
     3417        /**
     3418         * When reading from a single threaded input source to reduce
     3419         * lock contention a 'burst' of packets is read per pkt thread
     3420         * this determines the bursts size.
     3421         */
     3422        size_t burst_size;
     3423        // Each perpkt thread has a queue leading into the reporter
     3424        //size_t reporter_queue_size;
     3425
     3426        /**
     3427         * The tick interval - in milliseconds
     3428         * When a live trace is used messages are sent at the tick
     3429         * interval to ensure that all perpkt threads receive data
     3430         * this allows results to be printed in cases flows are
     3431         * not being directed to a certian thread, while still
     3432         * maintaining order.
     3433         */
     3434        size_t tick_interval;
     3435
     3436        /**
     3437         * Like the tick interval but used in the case of file format
     3438         * This specifies the number of packets before inserting a tick to
     3439         * every thread.
     3440         */
     3441        size_t tick_count;
     3442
     3443        /**
     3444         * The number of per packet threads requested, 0 means use default.
     3445         * Default typically be the number of processor threads detected less one or two.
     3446         */
     3447        size_t perpkt_threads;
     3448
     3449        /**
     3450         * See diagrams, this sets the maximum size of buffers used between
     3451         * the single hasher thread and the buffer.
     3452         * NOTE setting this to less than recommend could cause deadlock a
     3453         * trace that manages its own packets.
     3454         * A unblockable warning message will be printed to stderr in this case.
     3455         */
     3456        /** The number of packets that can queue per thread from hasher thread */
     3457        size_t hasher_queue_size;
     3458
     3459        /**
     3460         * If true use a polling hasher queue, that means that we will spin/or yeild
     3461         * when rather than blocking on a lock. This applies to both the hasher thread
     3462         * and perpkts reading the queues.
     3463         */
     3464        bool hasher_polling;
     3465
     3466        /**
     3467         * If true the reporter thread will continuously poll waiting for results
     3468         * if false they are only checked when a message is received, this message
     3469         * is controlled by reporter_thold.
     3470         */
     3471        bool reporter_polling;
     3472
     3473        /**
     3474         * Perpkt thread result queue size before triggering the reporter step to read results
     3475         */
     3476        size_t reporter_thold;
     3477
     3478        /**
     3479         * Prints a line to standard error for every state change
     3480         * for both the trace as a whole and for each thread.
     3481         */
     3482        bool debug_state;
     3483};
     3484#include <stdio.h>
     3485DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str);
     3486DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file);
     3487DLLEXPORT int libtrace_get_perpkt_count(libtrace_t* t);
     3488
     3489/**
     3490 * The methods we use to combine multiple outputs into a single output
     3491 * This is not considered a stable API however is public.
     3492 * Where possible use built in combiners
     3493 *
     3494 * NOTE this structure is duplicated per trace and as such can
     3495 * have functions rewritten, and in fact should if possible.
     3496 */
     3497typedef struct libtrace_combine libtrace_combine_t;
     3498struct libtrace_combine {
     3499
     3500        /**
     3501         * Called at the start of the trace to allow datastructures
     3502         * to be initilised and allow functions to be swapped if approriate.
     3503         *
     3504         * Also factors such as whether the trace is live or not can
     3505         * be used to determine the functions used.
     3506         * @return 0 if successful, -1 if an error occurs
     3507         */
     3508        int (*initialise)(libtrace_t *,libtrace_combine_t *);
     3509
     3510        /**
     3511         * Called when the trace ends, clean up any memory here
     3512         * from libtrace_t * init.
     3513         */
     3514        void (*destroy)(libtrace_t *, libtrace_combine_t *);
     3515
     3516        /**
     3517         * Publish a result against it's a threads queue.
     3518         * If null publish directly, expected to be used
     3519         * as a single threaded optimisation and can be
     3520         * set to NULL by init if this case is detected.
     3521         */
     3522        void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *);
     3523
     3524        /**
     3525         * Read as many results as possible from the trace.
     3526         * Directy calls the users code to handle results from here.
     3527         *
     3528         * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE
     3529         * If publish is NULL, this probably should be NULL also otherwise
     3530         * it will not be called.
     3531         */
     3532        void (*read)(libtrace_t *, libtrace_combine_t *);
     3533
     3534        /**
     3535         * Called when the trace is finished to flush the final
     3536         * results to the reporter thread.
     3537         *
     3538         * There may be no results, in which case this should
     3539         * just return.
     3540         *
     3541         * Libtrace state:
     3542         * Called from reporter thread
     3543         * No perpkt threads will be running, i.e. publish will not be
     3544         * called again.
     3545         *
     3546         * If publish is NULL, this probably should be NULL also otherwise
     3547         * it will not be called.
     3548         */
     3549        void (*read_final)(libtrace_t *, libtrace_combine_t *);
     3550
     3551        /**
     3552         * Pause must make sure any results of the type packet are safe.
     3553         * That means trace_copy_packet() and destroy the original.
     3554         * This also should be NULL if publish is NULL.
     3555         */
     3556        void (*pause)(libtrace_t *, libtrace_combine_t *);
     3557
     3558        /**
     3559         * Data storage for all the combiner threads
     3560         */
     3561        void *queues;
     3562
     3563        /**
     3564         * Configuration options, what this does is upto the combiner
     3565         * chosen.
     3566         */
     3567        libtrace_generic_types_t configuration;
     3568};
     3569
     3570DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config);
     3571
     3572#define READ_EOF 0
     3573#define READ_ERROR -1
     3574#define READ_MESSAGE -2
     3575// Used for inband tick message
     3576#define READ_TICK -3
     3577
     3578#define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration));
     3579
    31513580#ifdef __cplusplus
    31523581} /* extern "C" */
  • lib/libtrace_int.h

    r10f924c rd7fd648  
    148148#endif
    149149
     150#include "data-struct/ring_buffer.h"
     151#include "data-struct/object_cache.h"
     152#include "data-struct/vector.h"
     153#include "data-struct/message_queue.h"
     154#include "data-struct/deque.h"
     155#include "data-struct/sliding_window.h"
    150156
    151157//#define RP_BUFSIZE 65536U
     
    166172        bool waiting;
    167173};
     174
     175enum thread_types {
     176        THREAD_EMPTY,
     177        THREAD_HASHER,
     178        THREAD_PERPKT,
     179        THREAD_REPORTER,
     180        THREAD_KEEPALIVE
     181};
     182
     183enum thread_states {
     184        THREAD_RUNNING,
     185        THREAD_FINISHING,
     186        THREAD_FINISHED,
     187        THREAD_PAUSED,
     188        THREAD_STATE_MAX
     189};
     190
     191/**
     192 * Information of this thread
     193 */
     194struct libtrace_thread_t {
     195        int accepted_packets; // The number of packets accepted only used if pread
     196        // is retreving packets
     197        // Set to true once the first packet has been stored
     198        bool recorded_first;
     199        // For thread safety reason we actually must store this here
     200        int64_t tracetime_offset_usec;
     201        void* user_data; // TLS for the user to use
     202        void* format_data; // TLS for the format to use
     203        libtrace_message_queue_t messages; // Message handling
     204        libtrace_ringbuffer_t rbuffer; // Input
     205        libtrace_t * trace;
     206        void* ret;
     207        enum thread_types type;
     208        enum thread_states state;
     209        pthread_t tid;
     210        int perpkt_num; // A number from 0-X that represents this perpkt threads number
     211                                // in the table, intended to quickly identify this thread
     212                                // -1 represents NA (such as the case this is not a perpkt thread)
     213};
     214
     215/**
     216 * Storage to note time value against each.
     217 * Used both internally to do trace time playback
     218 * and can be used externally to assist applications which need
     219 * a trace starting time such as tracertstats.
     220 */
     221struct first_packets {
     222        pthread_spinlock_t lock;
     223        size_t count; // If == perpkt_thread_count threads we have all
     224        size_t first; // Valid if count != 0
     225        struct __packet_storage_magic_type {
     226                libtrace_packet_t * packet;
     227                struct timeval tv;
     228        } * packets;
     229};
     230
     231#define TRACE_STATES \
     232        X(STATE_NEW) \
     233        X(STATE_RUNNING) \
     234        X(STATE_PAUSING) \
     235        X(STATE_PAUSED) \
     236        X(STATE_FINSHED) \
     237        X(STATE_DESTROYED) \
     238        X(STATE_JOINED) \
     239        X(STATE_ERROR)
     240
     241#define X(a) a,
     242enum trace_state {
     243        TRACE_STATES
     244};
     245#undef X
     246
     247#define X(a) case a: return #a;
     248static inline char *get_trace_state_name(enum trace_state ts){
     249        switch(ts) {
     250                TRACE_STATES
     251                default:
     252                        return "UNKNOWN";
     253        }
     254}
     255#undef X
    168256
    169257/** A libtrace input trace
     
    188276        uint64_t filtered_packets;     
    189277        /** The filename from the uri for the trace */
    190         char *uridata;                 
     278        char *uridata;
    191279        /** The libtrace IO reader for this trace (if applicable) */
    192         io_t *io;                       
     280        io_t *io;
    193281        /** Error information for the trace */
    194         libtrace_err_t err;             
     282        libtrace_err_t err;
    195283        /** Boolean flag indicating whether the trace has been started */
    196         bool started;                   
     284        bool started;
     285        /** Synchronise writes/reads across this format object and attached threads etc */
     286        pthread_mutex_t libtrace_lock;
     287        /** State */
     288        enum trace_state state;
     289        /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */
     290        pthread_cond_t perpkt_cond;
     291        /* Keep track of counts of threads in any given state */
     292        int perpkt_thread_states[THREAD_STATE_MAX];
     293
     294        /** For the sliding window hasher implementation */
     295        pthread_rwlock_t window_lock;
     296        /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */
     297        bool perpkt_queue_full;
     298        /** Global storage for this trace, shared among all the threads  */
     299        void* global_blob;
     300        /** The actual freelist */
     301        libtrace_ocache_t packet_freelist;
     302        /** User defined per_pkt function called when a pkt is ready */
     303        fn_per_pkt per_pkt;
     304        /** User defined reporter function entry point XXX not hooked up */
     305        fn_reporter reporter;
     306        /** The hasher function */
     307        enum hasher_types hasher_type;
     308        /** The hasher function - NULL implies they don't care or balance */
     309        fn_hasher hasher; // If valid using a separate thread
     310        void *hasher_data;
     311       
     312        libtrace_thread_t hasher_thread;
     313        libtrace_thread_t reporter_thread;
     314        libtrace_thread_t keepalive_thread;
     315        int perpkt_thread_count;
     316        libtrace_thread_t * perpkt_threads; // All our perpkt threads
     317        libtrace_slidingwindow_t sliding_window;
     318        sem_t sem;
     319        // Used to keep track of the first packet seen on each thread
     320        struct first_packets first_packets;
     321        int tracetime;
     322
     323        /*
     324         * Caches statistic counters in the case that our trace is
     325         * paused or stopped before this counter is taken
     326         */
     327        uint64_t dropped_packets;
     328        uint64_t received_packets;
     329        struct user_configuration config;
     330        libtrace_combine_t combiner;
    197331};
     332
     333void trace_fin_packet(libtrace_packet_t *packet);
     334void libtrace_zero_thread(libtrace_thread_t * t);
     335void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
     336libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
     337int get_thread_table_num(libtrace_t *libtrace);
     338
    198339
    199340/** A libtrace output trace
     
    202343struct libtrace_out_t {
    203344        /** The capture format for the output trace */
    204         struct libtrace_format_t *format;
     345        struct libtrace_format_t *format;
    205346        /** Pointer to the "global" data for the capture format module */
    206347        void *format_data;             
     
    210351        libtrace_err_t err;
    211352        /** Boolean flag indicating whether the trace has been started */
    212         bool started;                   
     353        bool started;
    213354};
    214355
     
    303444} PACKED libtrace_pflog_header_t;
    304445
    305 
    306 
    307446/** A libtrace capture format module */
    308447/* All functions should return -1, or NULL on failure */
     
    734873        /** Prints some useful help information to standard output. */
    735874        void (*help)(void);
    736 
     875       
    737876        /** Next pointer, should always be NULL - used by the format module
    738877         * manager. */
    739878        struct libtrace_format_t *next;
     879
     880        /** Holds information about the trace format */
     881        struct libtrace_info_t info;
     882
     883        /** Starts or unpauses an input trace in parallel mode - note that
     884         * this function is often the one that opens the file or device for
     885         * reading.
     886         *
     887         * @param libtrace      The input trace to be started or unpaused
     888         * @return If successful the number of threads started, 0 indicates
     889         *                 no threads started and this should be done automatically.
     890         *                 Otherwise in event of an error -1 is returned.
     891         *
     892         */
     893        int (*pstart_input)(libtrace_t *trace);
     894       
     895        /**
     896         * Read a batch of packets from the input stream related to thread.
     897         * At most read nb_packets, however should return with less if packets
     898         * are not waiting. However still must return at least 1, 0 still indicates
     899         * EOF.
     900         *
     901         * @param libtrace      The input trace
     902         * @param t     The thread
     903         * @param packets       An array of packets
     904         * @param nb_packets    The number of packets in the array (the maximum to read)
     905         * @return The number of packets read, or 0 in the case of EOF or -1 in error or -2 to represent
     906         * interrupted due to message waiting before packets had been read.
     907         */
     908        int (*pread_packets)(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets);
     909       
     910        /** Pause a parallel trace
     911         *
     912         * @param libtrace      The input trace to be paused
     913         */
     914        int (*ppause_input)(libtrace_t *trace);
     915       
     916        /** Called after all threads have been paused, Finish (close) a parallel trace
     917         *
     918         * @param libtrace      The input trace to be stopped
     919         */
     920        int (*pfin_input)(libtrace_t *trace);
     921       
     922        /** Applies a configuration option to an input trace.
     923         *
     924         * @param libtrace      The input trace to apply the option to
     925         * @param option        The option that is being configured
     926         * @param value         A pointer to the value that the option is to be
     927         *                      set to
     928         * @return 0 if successful, -1 if the option is unsupported or an error
     929         * occurs
     930         */
     931        int (*pconfig_input)(libtrace_t *libtrace,trace_parallel_option_t option,void *value);
     932
     933        /**
     934         * Register a thread for use with the format or using the packets produced
     935         * by it. This is NOT only used for threads reading packets infact all
     936         * threads use this.
     937         *
     938         * Some use cases include setting up any thread local storage required for
     939         * to read packets and free packets. For DPDK we require any thread that
     940         * may release or read a packet to have have an internal number associated
     941         * with it.
     942         *
     943         * The thread type can be used to see if this thread is going to be used
     944         * to read packets or otherwise.
     945         *
     946         * @return 0 if successful, -1 if the option is unsupported or an error
     947         * occurs (such as a maximum of threads being reached)
     948         */
     949        int (*pregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t, bool reader);
     950
     951        /**
     952         * If needed any memory allocated with pregister_thread can be released
     953         * in this function. The thread will be destroyed directly after this
     954         * function is called.
     955         */
     956        void (*punregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t);
    740957};
     958
     959/** Macro to zero out a single thread format */
     960#define NON_PARALLEL(live) \
     961{live, 1},              /* trace info */ \
     962NULL,                   /* pstart_input */ \
     963NULL,                   /* pread_packet */ \
     964NULL,                   /* ppause_input */ \
     965NULL,                   /* pfin_input */ \
     966NULL,                   /* pconfig_input */ \
     967NULL,                   /* pregister_thread */ \
     968NULL                    /* punregister_thread */
    741969
    742970/** The list of registered capture formats */
     
    746974 * immediately
    747975 */
    748 extern int libtrace_halt;
     976extern volatile int libtrace_halt;
    749977
    750978/** Registers a new capture format module.
  • lib/trace.c

    r7fda5c5 rbdc8b36  
    9999#include "rt_protocol.h"
    100100
     101#include <pthread.h>
     102#include <signal.h>
     103
    101104#define MAXOPTS 1024
    102105
     
    106109
    107110int libtrace_halt = 0;
     111
     112/* Set once pstart is called used for backwards compatibility reasons */
     113int libtrace_parallel = 0;
    108114
    109115/* strncpy is not assured to copy the final \0, so we
     
    253259        libtrace->filtered_packets = 0;
    254260        libtrace->accepted_packets = 0;
     261       
     262        /* Parallel inits */
     263        // libtrace->libtrace_lock
     264        // libtrace->perpkt_cond;
     265        libtrace->state = STATE_NEW;
     266        libtrace->perpkt_queue_full = false;
     267        libtrace->global_blob = NULL;
     268        libtrace->per_pkt = NULL;
     269        libtrace->reporter = NULL;
     270        libtrace->hasher = NULL;
     271        libtrace_zero_ocache(&libtrace->packet_freelist);
     272        libtrace_zero_thread(&libtrace->hasher_thread);
     273        libtrace_zero_thread(&libtrace->reporter_thread);
     274        libtrace_zero_thread(&libtrace->keepalive_thread);
     275        libtrace_zero_slidingwindow(&libtrace->sliding_window);
     276        libtrace->reporter_thread.type = THREAD_EMPTY;
     277        libtrace->perpkt_thread_count = 0;
     278        libtrace->perpkt_threads = NULL;
     279        libtrace->tracetime = 0;
     280        libtrace->first_packets.first = 0;
     281        libtrace->first_packets.count = 0;
     282        libtrace->first_packets.packets = NULL;
     283        libtrace->dropped_packets = UINT64_MAX;
     284        libtrace->received_packets = UINT64_MAX;
     285        ZERO_USER_CONFIG(libtrace->config);
    255286
    256287        /* Parse the URI to determine what sort of trace we are dealing with */
     
    348379        libtrace->io = NULL;
    349380        libtrace->filtered_packets = 0;
     381       
     382        /* Parallel inits */
     383        // libtrace->libtrace_lock
     384        // libtrace->perpkt_cond;
     385        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
     386        libtrace->perpkt_queue_full = false;
     387        libtrace->global_blob = NULL;
     388        libtrace->per_pkt = NULL;
     389        libtrace->reporter = NULL;
     390        libtrace->hasher = NULL;
     391        libtrace_zero_ocache(&libtrace->packet_freelist);
     392        libtrace_zero_thread(&libtrace->hasher_thread);
     393        libtrace_zero_thread(&libtrace->reporter_thread);
     394        libtrace_zero_thread(&libtrace->keepalive_thread);
     395        libtrace_zero_slidingwindow(&libtrace->sliding_window);
     396        libtrace->reporter_thread.type = THREAD_EMPTY;
     397        libtrace->perpkt_thread_count = 0;
     398        libtrace->perpkt_threads = NULL;
     399        libtrace->tracetime = 0;
     400        ZERO_USER_CONFIG(libtrace->config);
    350401       
    351402        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    583634 */
    584635DLLEXPORT void trace_destroy(libtrace_t *libtrace) {
    585         assert(libtrace);
     636    int i;
     637        assert(libtrace);
     638
     639        /* destroy any packet that are still around */
     640        if (libtrace->state != STATE_NEW && libtrace->first_packets.packets) {
     641                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     642                        if(libtrace->first_packets.packets[i].packet) {
     643                                trace_destroy_packet(libtrace->first_packets.packets[i].packet);
     644                        }
     645                }
     646                free(libtrace->first_packets.packets);
     647                ASSERT_RET(pthread_spin_destroy(&libtrace->first_packets.lock), == 0);
     648        }
     649
    586650        if (libtrace->format) {
    587651                if (libtrace->started && libtrace->format->pause_input)
     
    590654                        libtrace->format->fin_input(libtrace);
    591655        }
    592         /* Need to free things! */
    593         if (libtrace->uridata)
     656        /* Need to free things! */
     657        if (libtrace->uridata)
    594658                free(libtrace->uridata);
     659       
     660        /* Empty any packet memory */
     661        if (libtrace->state != STATE_NEW) {
     662                // This has all of our packets
     663                libtrace_ocache_destroy(&libtrace->packet_freelist);
     664                if (libtrace->combiner.destroy)
     665                        libtrace->combiner.destroy(libtrace, &libtrace->combiner);
     666                free(libtrace->perpkt_threads);
     667                libtrace->perpkt_threads = NULL;
     668                libtrace->perpkt_thread_count = 0;
     669        }
     670       
    595671        if (libtrace->event.packet) {
    596672                /* Don't use trace_destroy_packet here - there is almost
     
    605681                 free(libtrace->event.packet);
    606682        }
    607         free(libtrace);
     683        free(libtrace);
    608684}
    609685
     
    633709}
    634710
    635 DLLEXPORT libtrace_packet_t *trace_create_packet(void) 
    636 {
    637         libtrace_packet_t *packet = 
     711DLLEXPORT libtrace_packet_t *trace_create_packet(void)
     712{
     713        libtrace_packet_t *packet =
    638714                (libtrace_packet_t*)calloc((size_t)1,sizeof(libtrace_packet_t));
    639715
     
    661737        dest->type=packet->type;
    662738        dest->buf_control=TRACE_CTRL_PACKET;
     739        dest->order = packet->order;
     740        dest->hash = packet->hash;
     741        dest->error = packet->error;
    663742        /* Reset the cache - better to recalculate than try to convert
    664743         * the values over to the new packet */
     
    675754 */
    676755DLLEXPORT void trace_destroy_packet(libtrace_packet_t *packet) {
     756        /* Free any resources possibly associated with the packet */
     757        if (libtrace_parallel && packet->trace && packet->trace->format->fin_packet) {
     758                packet->trace->format->fin_packet(packet);
     759        }
     760       
    677761        if (packet->buf_control == TRACE_CTRL_PACKET && packet->buffer) {
    678762                free(packet->buffer);
     
    683767                                 */
    684768        free(packet);
    685 }       
     769}
     770
     771/**
     772 * Removes any possible data stored againt the trace and releases any data.
     773 * This will not destroy a reusable good malloc'd buffer (TRACE_CTRL_PACKET)
     774 * use trace_destroy_packet() for those diabolical purposes.
     775 */
     776void trace_fin_packet(libtrace_packet_t *packet) {
     777        if (packet)
     778        {
     779                if (packet->trace && packet->trace->format->fin_packet) {
     780                        packet->trace->format->fin_packet(packet);
     781                        //gettimeofday(&tv, NULL);
     782                        //printf ("%d.%06d DESTROYED #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
     783                }
     784
     785                // No matter what we remove the header and link pointers
     786                packet->trace = NULL;
     787                packet->header = NULL;
     788                packet->payload = NULL;
     789
     790                if (packet->buf_control != TRACE_CTRL_PACKET)
     791                {
     792                        packet->buffer = NULL;
     793                }
     794
     795                trace_clear_cache(packet);
     796                packet->hash = 0;
     797                packet->order = 0;
     798        }
     799}
    686800
    687801/* Read one packet from the trace into buffer. Note that this function will
     
    707821        }
    708822        assert(packet);
    709      
    710         /* Store the trace we are reading from into the packet opaque
    711          * structure */
    712         packet->trace = libtrace;
    713 
    714         /* Finalise the packet, freeing any resources the format module
    715          * may have allocated it
    716          */
    717         if (libtrace->format->fin_packet) {
    718                 libtrace->format->fin_packet(packet);
    719         }
    720 
    721823
    722824        if (libtrace->format->read_packet) {
    723825                do {
    724826                        size_t ret;
    725                         int filtret;
    726                         /* Clear the packet cache */
    727                         trace_clear_cache(packet);
     827                        int filtret;
     828                        /* Finalise the packet, freeing any resources the format module
     829                         * may have allocated it and zeroing all data associated with it.
     830                         */
     831                        trace_fin_packet(packet);
     832                        /* Store the trace we are reading from into the packet opaque
     833                         * structure */
     834                        packet->trace = libtrace;
    728835                        ret=libtrace->format->read_packet(libtrace,packet);
    729836                        if (ret==(size_t)-1 || ret==0) {
     
    750857                                                libtrace->snaplen);
    751858                        }
     859                        trace_packet_set_order(packet, libtrace->accepted_packets);
    752860                        ++libtrace->accepted_packets;
    753861                        return ret;
     
    813921DLLEXPORT int trace_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    814922        assert(libtrace);
    815         assert(packet); 
     923        assert(packet);
    816924        /* Verify the packet is valid */
    817925        if (!libtrace->started) {
     
    9531061        }
    9541062
    955         return tv;
     1063    return tv;
    9561064}
    9571065
     
    11231231                 * function so don't increment them here.
    11241232                 */
    1125                 event=packet->trace->format->trace_event(trace,packet);
    1126         }
     1233                event=packet->trace->format->trace_event(trace,packet);
     1234                }
    11271235        return event;
    11281236
     
    12071315                libtrace_linktype_t linktype    ) {
    12081316#ifdef HAVE_BPF_FILTER
     1317        /* It just so happens that the underlying libs used by pthread arn't
     1318         * thread safe, namely lex/flex thingys, so single threaded compile
     1319         * multi threaded running should be safe.
     1320         */
     1321        static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    12091322        assert(filter);
    12101323
     
    12281341                                        "Unknown pcap equivalent linktype");
    12291342                        return -1;
     1343                }
     1344                assert (pthread_mutex_lock(&mutex) == 0);
     1345                /* Make sure not one bet us to this */
     1346                if (filter->flag) {
     1347                        printf("Someone bet us to compile the filter\n");
     1348                        assert (pthread_mutex_unlock(&mutex) == 0);
     1349                        return 1;
    12301350                }
    12311351                pcap=(pcap_t *)pcap_open_dead(
     
    12411361                                        pcap_geterr(pcap));
    12421362                        pcap_close(pcap);
     1363                        assert (pthread_mutex_unlock(&mutex) == 0);
    12431364                        return -1;
    12441365                }
    12451366                pcap_close(pcap);
    12461367                filter->flag=1;
     1368                assert (pthread_mutex_unlock(&mutex) == 0);
    12471369        }
    12481370        return 0;
     
    12641386        libtrace_linktype_t linktype;
    12651387        libtrace_packet_t *packet_copy = (libtrace_packet_t*)packet;
     1388#ifdef HAVE_LLVM
     1389        static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
     1390#endif
    12661391
    12671392        assert(filter);
     
    13141439         * what the link type was
    13151440         */
     1441        // Note internal mutex locking used here
    13161442        if (trace_bpf_compile(filter,packet_copy,linkptr,linktype)==-1) {
    13171443                if (free_packet_needed) {
     
    13241450#if HAVE_LLVM
    13251451        if (!filter->jitfilter) {
    1326                 filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
     1452                ASSERT_RET(pthread_mutex_lock(&mutex), == 0);
     1453                /* Again double check here like the bpf filter */
     1454                if(filter->jitfilter)
     1455                        printf("Someone bet us to compile the JIT thingy\n");
     1456                else
     1457                /* Looking at compile_program source this appears to be thread safe
     1458                 * however if this gets called twice we will leak this memory :(
     1459                 * as such lock here anyways */
     1460                        filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
     1461                ASSERT_RET(pthread_mutex_unlock(&mutex), == 0);
    13271462        }
    13281463#endif
     
    18051940{
    18061941        assert(trace);
     1942        uint64_t ret;
     1943
    18071944        if (trace->format->get_received_packets) {
    1808                 return trace->format->get_received_packets(trace);
    1809         }
    1810         return (uint64_t)-1;
     1945                if ((ret = trace->format->get_received_packets(trace)) != UINT64_MAX)
     1946                        return ret;
     1947        }
     1948        // Read this cached value taken before the trace was closed
     1949        return trace->received_packets;
    18111950}
    18121951
     
    18241963{
    18251964        assert(trace);
     1965        uint64_t ret;
     1966
    18261967        if (trace->format->get_dropped_packets) {
    1827                 return trace->format->get_dropped_packets(trace);
    1828         }
    1829         return (uint64_t)-1;
     1968                if ((ret = trace->format->get_dropped_packets(trace)) != UINT64_MAX)
     1969                        return ret;
     1970        }
     1971        // Read this cached value taken before the trace was closed
     1972        return trace->dropped_packets;
    18301973}
    18311974
     
    18331976{
    18341977        assert(trace);
    1835         return trace->accepted_packets;
     1978        int i = 0;
     1979        uint64_t ret = trace->accepted_packets;
     1980        for (i = 0; i < trace->perpkt_thread_count; i++) {
     1981                ret += trace->perpkt_threads[i].accepted_packets;
     1982        }
     1983        return ret;
    18361984}
    18371985
  • test/Makefile

    r262a093 rf051c1b  
    55
    66INCLUDE = -I$(PREFIX)/lib -I$(PREFIX)/libpacketdump
    7 CFLAGS = -Wall -Wimplicit -Wformat -W -pedantic -pipe -g -O2 \
     7CFLAGS = -Wall -Wimplicit -Wformat -W -pedantic -pipe -g -O2 -std=gnu99 -pthread \
    88                -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE -D_LARGEFILE64_SOURCE
    99CFLAGS += $(INCLUDE)
     
    1111LDLIBS = -L$(PREFIX)/lib/.libs -L$(PREFIX)/libpacketdump/.libs -ltrace -lpacketdump
    1212
     13BINS_DATASTRUCT = test-datastruct-vector test-datastruct-deque \
     14        test-datastruct-ringbuffer
     15BINS_PARALLEL = test-format-parallel test-format-parallel-hasher \
     16        test-format-parallel-singlethreaded test-format-parallel-stressthreads \
     17        test-format-parallel-singlethreaded-hasher test-format-parallel-reporter
     18
    1319BINS = test-pcap-bpf test-event test-time test-dir test-wireless test-errors \
    14         test-plen test-autodetect test-ports test-fragment test-live test-live-snaplen
     20        test-plen test-autodetect test-ports test-fragment test-live test-live-snaplen \
     21        $(BINS_DATASTRUCT) $(BINS_PARALLEL)
    1522
    1623.PHONY: all clean distclean install depend test
     
    1926
    2027clean:
    21         $(RM) $(BINS) $(OBJS) test-format  test-decode test-convert \
     28        $(RM) $(BINS) $(OBJS) test-format test-decode test-convert \
    2229        test-decode2 test-write test-drops test-convert2
    2330
  • tools/traceanon/Makefile.am

    rc0a5a50 r29bbef0  
    1 bin_PROGRAMS = traceanon
     1bin_PROGRAMS = traceanon traceanon_parallel
    22
    33man_MANS = traceanon.1
     
    66include ../Makefile.tools
    77traceanon_SOURCES = traceanon.c rijndael.h rijndael.c panon.h panon.c ipenc.c ipenc.h
     8traceanon_parallel_SOURCES = traceanon_parallel.c rijndael.h rijndael.c panon.h panon.c ipenc.c ipenc.h
    89
    910# rijndael.c does lots of nasty casting that is going to be a nightmare to fix
     
    1112# messy and hopefully isn't actually an issue.
    1213traceanon_CFLAGS = $(AM_CFLAGS)
     14traceanon_parallel_CFLAGS = $(AM_CFLAGS)
  • tools/traceanon/panon.c

    ra3041a4 r29bbef0  
    88#include "panon.h"
    99
    10 static uint8_t m_key[16];
    11 static uint8_t m_pad[16];
     10static __thread uint8_t m_key[16];
     11static __thread  uint8_t m_pad[16];
    1212
    1313#define CACHEBITS 20
     
    1616//static uint32_t enc_cache[CACHESIZE];
    1717
    18 static uint32_t *enc_cache = 0;
    19 static uint32_t fullcache[2][2];
     18static __thread  uint32_t *enc_cache = 0; // Should be ok shared across multiple
     19static __thread  uint32_t fullcache[2][2]; // Needs to be against on thread
    2020
    2121
  • tools/tracertstats/Makefile.am

    r530bcf0 r29bbef0  
    1 bin_PROGRAMS = tracertstats
     1bin_PROGRAMS = tracertstats tracertstats_parallel
    22man_MANS = tracertstats.1
    33EXTRA_DIST = $(man_MANS)
     
    1616tracertstats_SOURCES = tracertstats.c output.h output.c $(OUTPUT_MODULES)
    1717tracertstats_LDADD = -ltrace $(OUTPUT_PNG_LD)
     18tracertstats_parallel_SOURCES = tracertstats_parallel.c output.h output.c $(OUTPUT_MODULES)
     19tracertstats_parallel_LDADD = -ltrace $(OUTPUT_PNG_LD)
  • tools/tracestats/Makefile.am

    r3b8a5ef r29bbef0  
    1 bin_PROGRAMS = tracestats
     1bin_PROGRAMS = tracestats tracestats_parallel
    22bin_SCRIPTS = tracesummary
    33