Changeset 6463f32


Ignore:
Timestamp:
03/01/17 11:48:37 (4 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, dpdk-ndag, etsilive, master, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
14b7489
Parents:
d0a067f (diff), 204d65b (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'develop'

Files:
18 added
24 edited

Legend:

Unmodified
Added
Removed
  • .gitignore

    r864bd71 rcaf7841  
    8181test/test-wireless
    8282test/test-write
     83test/DPDK_source/
     84test/DPDK_builds/
    8385tools/traceanon/traceanon
    8486tools/traceanon/traceanon_parallel
  • AUTHORS

    r67b6d9e r6450950  
    1515
    1616 * Matt Brown for getting us into Debian and keeping us up to date.
     17 * Brad Cowie for packaging libtrace4 for Debian and Ubuntu
    1718 * Alistair King for reporting and suggesting fixes for numerous bugs,
    1819   particularly in libwandio
     
    2728 * Robert Edmonds for tidying up the libpacketdump plugin install
    2829 * Martin Bligh for patching in support for nanosecond pcap traces
     30 * Teemu Rytilahti for adding support for SIT
     31 * "EaseTheWorld" for their work with improving the packet statistics API, as
     32   well as reporting several bugs
     33 * Richard Cziva for contributing to the DPDK support
    2934 * Jamie Curtis for fixing a couple of bugs many many years ago
    3035 * Brendon Jones for creating the original Windows DLLs and writing bits of
  • README

    r864bd71 rd759ee1  
    1 libtrace 4.0.0
     1libtrace 4.0.1
    22
    33---------------------------------------------------------------------------
    4 Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
     4Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
    55All rights reserved.
    66
  • configure.in

    r32a3ec5 rd759ee1  
    44# and in the README
    55
    6 AC_INIT([libtrace],[4.0.0],[contact@wand.net.nz],[libtrace])
     6AC_INIT([libtrace],[4.0.1],[contact@wand.net.nz],[libtrace])
    77
    88LIBTRACE_MAJOR=4
    99LIBTRACE_MID=0
    10 LIBTRACE_MINOR=0
     10LIBTRACE_MINOR=1
    1111
    1212# OpenSolaris hides libraries like libncurses in /usr/gnu/lib, which is not
     
    387387# Check for DPDK
    388388AC_ARG_WITH(dpdk,
    389             AS_HELP_STRING(--with-dpdk,include DPDK live capture support (Must set RTE_SDK/_TARGET environment variable)),
     389            AS_HELP_STRING(--with-dpdk,include DPDK live capture support (From either the RTE_SDK/_TARGET environment variable or the dpdk-dev package)),
    390390[
    391391        if test "$withval" = no
     
    402402libtrace_dpdk=false
    403403if test "$want_dpdk" != no; then
    404     saved_ldflags="$LDFLAGS"
    405     # Add to our libpath location
    406     LDFLAGS="$LDFLAGS -L$RTE_SDK/$RTE_TARGET/lib/"
    407     # Test that the library exists, we cannot be sure of what extra library flags need to be added
    408     # this could change in the next version which would make the test fail
    409     #AC_CHECK_LIB(intel_dpdk, rte_eal_init, dpdk_found=1, dpdk_found=0, -lpthread -lm -lrt)
    410 
    411     # So instead simply check for existence
    412     AC_CHECK_FILE("$RTE_SDK/$RTE_TARGET/lib/libintel_dpdk.a", dpdk_found=1, dpdk_found=0)
    413     # Revert back
    414     LDFLAGS="$saved_ldflags"
    415     if test "$dpdk_found" = 1; then
    416         # Save these now so that they can be re-exported later
     404        # So instead simply check for existence
     405        if test "$RTE_SDK" != ""; then
     406                AC_CHECK_FILE("$RTE_SDK/$RTE_TARGET/lib/libintel_dpdk.a", dpdk_found="libintel_dpdk.a", dpdk_found=0)
     407        fi
     408        # DPDK 2.1.0+ renames this to libdpdk from libintel_dpdk
     409        if test "$dpdk_found" = 0 -a "$RTE_SDK" != ""; then
     410                AC_CHECK_FILE("$RTE_SDK/$RTE_TARGET/lib/libdpdk.a", dpdk_found="libdpdk.a", dpdk_found=0)
     411        fi
     412        if test "$dpdk_found" != 0 -a "$RTE_SDK" != ""; then
     413                # Save these now so that they can be re-exported later
    417414                AC_SUBST([RTE_TARGET])
    418         AC_SUBST([RTE_SDK])
    419         # Force dpdk library to be statically linked to allow compiler optimisations
    420         LIBTRACE_LIBS="$LIBTRACE_LIBS -Wl,--whole-archive -Wl,-l:libintel_dpdk.a -Wl,--no-whole-archive -Wl,-lm"
     415                AC_SUBST([RTE_SDK])
     416                # Force dpdk library to be statically linked to allow compiler optimisations
     417                LIBTRACE_LIBS="$LIBTRACE_LIBS -Wl,--whole-archive -Wl,-l:$dpdk_found -Wl,--no-whole-archive -Wl,-lm"
    421418                AC_DEFINE(HAVE_DPDK,1,[conditional for building with DPDK live capture support])
    422419                libtrace_dpdk=true
     420        fi
     421        if test "$RTE_SDK" = ""; then
     422                AC_MSG_NOTICE([No RTE_SDK given, checking for system dpdk-dev package])
     423                # Search the system, maybe it is installed? Ethdev is one of the main libraries
     424                AC_CHECK_LIB(ethdev, rte_eth_dev_configure, dpdk_found="system", dpdk_found=0)
     425                # We also need to check that rte.vars.mk is installed from dpdk-dev (as well as libdpdk-dev)
     426                if test "$dpdk_found" != 0 -a -e /usr/share/dpdk/mk/rte.vars.mk ; then
     427                        RTE_TARGET="x86_64-default-linuxapp-gcc"
     428                        RTE_SDK="/usr/share/dpdk/"
     429                        RTE_INCLUDE="/usr/include/dpdk"
     430                        AC_SUBST([RTE_TARGET])
     431                        AC_SUBST([RTE_SDK])
     432                        AC_SUBST([RTE_INCLUDE])
     433                        # include libethdev this is part of DPDK
     434                        LIBTRACE_LIBS="$LIBTRACE_LIBS -lethdev"
     435                        # Include all rte libs note we need to escape '[' and ']'
     436                        LIBTRACE_LIBS="$LIBTRACE_LIBS $(ldconfig -p | sed 's/.*lib\([[^.]]*\).*/-l\1/' | grep rte_ | tr '\n' ' ')"
     437                        AC_MSG_NOTICE([Building against system DPDK])
     438
     439                        AC_DEFINE(HAVE_DPDK,1,[conditional for building with DPDK live capture support])
     440                        libtrace_dpdk=true
     441                fi
    423442        fi
    424443fi
     
    471490if test "$ac_cv_search_dlopen" != "none required"; then
    472491        LIBPKTDUMP_LIBS="$LIBPKTDUMP_LIBS $ac_cv_search_dlopen"
    473         if test "$dpdk_found" = 1; then
     492        if test "$dpdk_found" != 0; then
    474493                LIBTRACE_LIBS="$LIBTRACE_LIBS -Wl,$ac_cv_search_dlopen"
    475494        fi
  • lib/Makefile.am

    re63d80d r4db5b98  
    11lib_LTLIBRARIES = libtrace.la
    2 include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h \
    3         rt_protocol.h erftypes.h libtrace_parallel.h \
     2include_HEADERS = libtrace.h libtrace_parallel.h
     3pkginclude_HEADERS = dagformat.h lt_inttypes.h daglegacy.h \
     4        rt_protocol.h erftypes.h \
    45        data-struct/ring_buffer.h data-struct/object_cache.h \
    5         data-struct/vector.h data-struct/message_queue.h \
     6        data-struct/vector.h \
    67        data-struct/deque.h data-struct/linked_list.h \
    7         data-struct/sliding_window.h hash_toeplitz.h \
    8         data-struct/buckets.h
     8        data-struct/buckets.h data-struct/sliding_window.h \
     9        data-struct/message_queue.h hash_toeplitz.h
    910
    1011AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread
  • lib/format_bpf.c

    ree6e802 r5e3f16c  
    497497
    498498                /* Timed out -- check if we should halt */
    499                 if (libtrace_halt)
    500                         return 0;
     499                if ((ret=is_halted(libtrace)) != -1)
     500                        return ret;
    501501        }
    502502       
  • lib/format_dag25.c

    ree6e802 rdb84bb2  
    13041304                                return -2;
    13051305
    1306                         if (libtrace_halt)
    1307                                 return 0;
     1306                        if ((numbytes=is_halted(libtrace)) != -1)
     1307                                return numbytes;
    13081308                        /* Block until we see a packet */
    13091309                        continue;
     
    13131313
    13141314        packet->trace = libtrace;
     1315
    13151316        /* Prepare the libtrace packet */
    13161317        if (dag_prepare_packet_stream(libtrace, stream_data, packet, erfptr,
     
    13241325        }
    13251326
     1327        packet->order = erf_get_erf_timestamp(packet);
    13261328        packet->error = packet->payload ? htons(erfptr->rlen) :
    13271329                                          erf_get_framing_length(packet);
  • lib/format_dpdk.c

    ree6e802 rdb84bb2  
    2323 *
    2424 *
    25  */
     25 *
    2626 * Kit capture format.
    2727 *
     
    6969 * code (that we still attempt to support).
    7070 *
    71  * DPDK v1.7.1 is recommended.
    72  * However 1.5 to 1.8 are likely supported.
     71 * DPDK 16.04 or newer is recommended.
     72 * However 1.6 and newer are still likely supported.
    7373 */
    7474#include <rte_eal.h>
     
    145145#else
    146146#       define DPDK_USE_NULL_QUEUE_CONFIG 0
     147#endif
     148
     149/* 2.0.0-rc1
     150 * Unifies RSS hash between cards
     151 */
     152#if RTE_VERSION >= RTE_VERSION_NUM(2, 0, 0, 1)
     153#       define RX_RSS_FLAGS (ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP | \
     154                             ETH_RSS_SCTP)
     155#else
     156#       define RX_RSS_FLAGS (ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | \
     157                             ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP |\
     158                             ETH_RSS_IPV6_UDP)
     159#endif
     160
     161/* v16.07-rc1 - deprecated
     162 * rte_mempool_avail_count to replace rte_mempool_count
     163 * rte_mempool_in_use_count to replace rte_mempool_free_count
     164 */
     165#if RTE_VERSION < RTE_VERSION_NUM(16, 7, 0, 1)
     166#define rte_mempool_avail_count rte_mempool_count
     167#define rte_mempool_in_use_count rte_mempool_free_count
    147168#endif
    148169
     
    170191#endif
    171192
     193/* 16.04-rc3 ETH_LINK_SPEED_X are replaced with ETH_SPEED_NUM_X.
     194 * ETH_LINK_SPEED_ are reused as flags, ugly.
     195 * We use the new way in this code.
     196 */
     197#ifndef ETH_SPEED_NUM_1G
     198        #define ETH_SPEED_NUM_1G ETH_LINK_SPEED_1000
     199        #define ETH_SPEED_NUM_10G ETH_LINK_SPEED_10G
     200        #define ETH_SPEED_NUM_20G ETH_LINK_SPEED_20G
     201        #define ETH_SPEED_NUM_40G ETH_LINK_SPEED_40G
     202#endif
    172203
    173204/* The default size of memory buffers to use - This is the max size of standard
     
    175206#define RX_MBUF_SIZE 1514
    176207
    177 /* The minimum number of memory buffers per queue tx or rx. Search for
    178  * _MIN_RING_DESC in DPDK. The largest minimum is 64 for 10GBit cards.
    179  */
    180 #define MIN_NB_BUF 64
     208/* The minimum number of memory buffers per queue tx or rx. Based on
     209 * the requirement of the memory pool with 128 per thread buffers, needing
     210 * at least 128*1.5 = 192 buffers. Our code allocates 128*2 to be safe.
     211 */
     212#define MIN_NB_BUF 128
    181213
    182214/* Number of receive memory buffers to use
     
    185217 * This can be increased in the driver and here.
    186218 * Should be at least MIN_NB_BUF.
    187  */
    188 #define NB_RX_MBUF 4096
     219 * We choose 2K rather than 4K because it enables the usage of sse vector
     220 * drivers which are significantly faster than using the larger buffer.
     221 */
     222#define NB_RX_MBUF (4096/2)
    189223
    190224/* Number of send memory buffers to use.
     
    203237/* For single threaded libtrace we read packets as a batch/burst
    204238 * this is the maximum size of said burst */
    205 #define BURST_SIZE 50
     239#define BURST_SIZE 32
    206240
    207241#define MBUF(x) ((struct rte_mbuf *) x)
     
    229263 *
    230264 * Make sure you understand what these are doing before enabling them.
    231  * They might make traces incompatable with other builds etc.
     265 * They might make traces incompatible with other builds etc.
    232266 *
    233267 * These are also included to show how to do somethings which aren't
     
    240274/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
    241275 * only turn on if you know clock_gettime is a vsyscall on your system
    242  * overwise could be a large overhead. Again gettimeofday() should be
     276 * otherwise could be a large overhead. Again gettimeofday() should be
    243277 * vsyscall also if it's not you should seriously consider updating your
    244278 * kernel.
     
    297331        int lcore;
    298332#if HAS_HW_TIMESTAMPS_82580
    299         /* Timestamping only relevent to RX */
     333        /* Timestamping only relevant to RX */
    300334        uint64_t ts_first_sys; /* Sytem timestamp of the first packet in nanoseconds */
    301335        uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
     
    330364#endif
    331365        char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    332         uint8_t rss_key[40]; // This is the RSS KEY
     366        enum hasher_types hasher_type;
     367        uint8_t *rss_key;
    333368        /* To improve single-threaded performance we always batch reading
    334369         * packets, in a burst, otherwise the parallel library does this for us */
     
    822857        FORMAT(libtrace)->burst_size = 0;
    823858        FORMAT(libtrace)->burst_offset = 0;
     859        FORMAT(libtrace)->hasher_type = HASHER_BALANCE;
     860        FORMAT(libtrace)->rss_key = NULL;
    824861
    825862        /* Make our first stream */
     
    899936                case HASHER_BALANCE:
    900937                case HASHER_UNIDIRECTIONAL:
    901                         toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
    902                         return 0;
    903938                case HASHER_BIDIRECTIONAL:
    904                         toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     939                        FORMAT(libtrace)->hasher_type = *(enum hasher_types*)data;
     940                        if (FORMAT(libtrace)->rss_key)
     941                                free(FORMAT(libtrace)->rss_key);
     942                        FORMAT(libtrace)->rss_key = NULL;
    905943                        return 0;
    906944                case HASHER_CUSTOM:
    907                         // We don't support these
     945                        // Let libtrace do this
    908946                        return -1;
    909947                }
     
    958996        .rx_adv_conf = {
    959997                .rss_conf = {
    960                         // .rss_key = &rss_key, // We set this per format
    961                         .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     998                        .rss_hf = RX_RSS_FLAGS,
    962999                },
    9631000        },
     
    10871124        int i;
    10881125        struct rte_config *cfg = rte_eal_get_configuration();
     1126        (void) socket;
    10891127
    10901128        pthread_mutex_lock(&dpdk_lock);
     
    12461284                fprintf(stderr, "DPDK memory pool not empty %d of %d, please "
    12471285                        "free all packets before finishing a trace\n",
    1248                         rte_mempool_count(mempool), mempool->size);
     1286                        rte_mempool_avail_count(mempool), mempool->size);
    12491287        }
    12501288
     
    13151353                        return -1;
    13161354                }
     1355        }
     1356
     1357        /* Generate the hash key, based on the device */
     1358        uint8_t rss_size = 52; // 52 for i40e, 40 for others, use the largest by default
     1359        // In new versions DPDK we can query the size
     1360#if RTE_VERSION >= RTE_VERSION_NUM(2, 1, 0, 0)
     1361        struct rte_eth_dev_info dev_info;
     1362        rte_eth_dev_info_get(format_data->port, &dev_info);
     1363        rss_size = dev_info.hash_key_size;
     1364#endif
     1365        if (rss_size != 0) {
     1366                format_data->rss_key = malloc(rss_size);
     1367                if (format_data->hasher_type == HASHER_BIDIRECTIONAL) {
     1368                        toeplitz_ncreate_bikey(format_data->rss_key, rss_size);
     1369                } else {
     1370                        toeplitz_ncreate_unikey(format_data->rss_key, rss_size);
     1371                }
     1372                port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
     1373#if RTE_VERSION >= RTE_VERSION_NUM(1, 7, 0, 1)
     1374                port_conf.rx_adv_conf.rss_conf.rss_key_len = rss_size;
     1375#endif
     1376        } else {
     1377                fprintf(stderr, "DPDK couldn't configure RSS hashing!");
    13171378        }
    13181379
     
    17201781                libtrace_list_deinit(FORMAT(libtrace)->per_stream);
    17211782                /* filter here if we used it */
     1783                if (FORMAT(libtrace)->rss_key)
     1784                        free(FORMAT(libtrace)->rss_key);
    17221785                free(libtrace->format_data);
    17231786        }
     
    18421905retry_calc_wiretime:
    18431906        switch (format_data->link_speed) {
    1844         case ETH_LINK_SPEED_40G:
    1845                 wire_time /=  ETH_LINK_SPEED_40G;
     1907        case ETH_SPEED_NUM_40G:
     1908                wire_time /=  ETH_SPEED_NUM_40G;
    18461909                break;
    1847         case ETH_LINK_SPEED_20G:
    1848                 wire_time /= ETH_LINK_SPEED_20G;
     1910        case ETH_SPEED_NUM_20G:
     1911                wire_time /= ETH_SPEED_NUM_20G;
    18491912                break;
    1850         case ETH_LINK_SPEED_10G:
    1851                 wire_time /= ETH_LINK_SPEED_10G;
     1913        case ETH_SPEED_NUM_10G:
     1914                wire_time /= ETH_SPEED_NUM_10G;
    18521915                break;
    1853         case ETH_LINK_SPEED_1000:
    1854                 wire_time /= ETH_LINK_SPEED_1000;
     1916        case ETH_SPEED_NUM_1G:
     1917                wire_time /= ETH_SPEED_NUM_1G;
    18551918                break;
    18561919        case 0:
     
    20722135                if (mesg && libtrace_message_queue_count(mesg) > 0)
    20732136                        return READ_MESSAGE;
    2074                 if (libtrace_halt)
    2075                         return READ_EOF;
     2137                if ((nb_rx=is_halted(libtrace)) != (size_t) -1)
     2138                        return nb_rx;
    20762139                /* Wait a while, polling on memory degrades performance
    20772140                 * This relieves the pressure on memory allowing the NIC to DMA */
     
    20912154        int i;
    20922155        dpdk_per_stream_t *stream = t->format_data;
     2156        struct dpdk_addt_hdr * hdr;
    20932157
    20942158        nb_rx = dpdk_read_packet_stream (libtrace, stream, &t->messages,
     
    21072171                        packets[i]->trace = libtrace;
    21082172                        packets[i]->error = 1;
     2173                        hdr = (struct dpdk_addt_hdr *)
     2174                                        ((struct rte_mbuf*) pkts_burst[i] + 1);
     2175                        packets[i]->order = hdr->timestamp;
    21092176                        dpdk_prepare_packet(libtrace, packets[i], packets[i]->buffer, packets[i]->type, 0);
    21102177                }
     
    21942261        stats->captured = dev_stats.ipackets;
    21952262
    2196         /* Not that we support adding filters but if we did this
    2197          * would work */
    2198         stats->filtered += dev_stats.fdirmiss;
    2199 
    22002263        stats->dropped_valid = true;
    22012264        stats->dropped = dev_stats.imissed;
    22022265
     2266#if RTE_VERSION >= RTE_VERSION_NUM(16, 4, 0, 2)
     2267        /* DPDK commit 86057c fixes ensures missed does not get counted as
     2268         * errors */
     2269        stats->errors_valid = true;
     2270        stats->errors = dev_stats.ierrors;
     2271#else
    22032272        /* DPDK errors includes drops */
    22042273        stats->errors_valid = true;
    22052274        stats->errors = dev_stats.ierrors - dev_stats.imissed;
    2206 
     2275#endif
    22072276        stats->received_valid = true;
    22082277        stats->received = dev_stats.ipackets + dev_stats.imissed;
     
    22172286                                            libtrace_packet_t *packet) {
    22182287        libtrace_eventobj_t event = {0,0,0.0,0};
    2219         int nb_rx; /* Number of receive packets we've read */
    2220         struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) to rx buffers */
     2288        size_t nb_rx; /* Number of received packets we've read */
    22212289
    22222290        do {
    22232291
    2224                 /* See if we already have a packet waiting */
    2225                 nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
    2226                                          FORMAT_DATA_FIRST(trace)->queue_id,
    2227                                          pkts_burst, 1);
    2228 
    2229                 if (nb_rx > 0) {
     2292                /* No packets waiting in our buffer? Try and read some more */
     2293                if (FORMAT(trace)->burst_size == FORMAT(trace)->burst_offset) {
     2294                        nb_rx = rte_eth_rx_burst(FORMAT(trace)->port,
     2295                                                 FORMAT_DATA_FIRST(trace)->queue_id,
     2296                                                 FORMAT(trace)->burst_pkts, BURST_SIZE);
     2297                        if (nb_rx > 0) {
     2298                                dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace),
     2299                                                FORMAT(trace)->burst_pkts, nb_rx);
     2300                                FORMAT(trace)->burst_size = nb_rx;
     2301                                FORMAT(trace)->burst_offset = 0;
     2302                        }
     2303                }
     2304
     2305                /* Now do we have packets waiting? */
     2306                if (FORMAT(trace)->burst_size != FORMAT(trace)->burst_offset) {
    22302307                        /* Free the last packet buffer */
    22312308                        if (packet->buffer != NULL) {
     
    22392316                        packet->type = TRACE_RT_DATA_DPDK;
    22402317                        event.type = TRACE_EVENT_PACKET;
    2241                         dpdk_ready_pkts(trace, FORMAT_DATA_FIRST(trace), pkts_burst, 1);
    2242                         packet->buffer = FORMAT(trace)->burst_pkts[0];
     2318                        packet->buffer = FORMAT(trace)->burst_pkts[
     2319                                             FORMAT(trace)->burst_offset++];
    22432320                        dpdk_prepare_packet(trace, packet, packet->buffer, packet->type, 0);
    22442321                        event.size = 1; // TODO should be bytes read, which essentially useless anyway
     
    22692346
    22702347static void dpdk_help(void) {
    2271         printf("dpdk format module: $Revision: 1752 $\n");
     2348        printf("dpdk format module: %s (%d) \n", rte_version(), RTE_VERSION);
    22722349        printf("Supported input URIs:\n");
    22732350        printf("\tdpdk:<domain:bus:devid.func>-<coreid>\n");
  • lib/format_linux_common.c

    r1d780e4 rdb84bb2  
    315315        libtrace_filter_t *filter = FORMAT_DATA->filter;
    316316
     317        stream->last_timestamp = 0;
     318
    317319        /* Create a raw socket for reading packets on */
    318320        stream->fd = socket(PF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
  • lib/format_linux_common.h

    ree6e802 rdb84bb2  
    257257         * that every ring can get setup the same */
    258258        libtrace_list_t *per_stream;
     259
    259260};
    260261
     
    287288        /* The ring buffer layout */
    288289        struct tpacket_req req;
     290        uint64_t last_timestamp;
    289291} ALIGN_STRUCT(CACHE_LINE_SIZE);
    290292
    291 #define ZERO_LINUX_STREAM {-1, MAP_FAILED, 0, {0,0,0,0}}
     293#define ZERO_LINUX_STREAM {-1, MAP_FAILED, 0, {0,0,0,0}, 0}
    292294
    293295
  • lib/format_linux_int.c

    ree6e802 rdb84bb2  
    215215                                return -1;
    216216                        } else {
    217                                 if (libtrace_halt)
    218                                         return READ_EOF;
     217                                if ((ret=is_halted(libtrace)) != -1)
     218                                        return ret;
    219219                        }
    220220                }
     
    286286        }
    287287
     288
    288289        /* Buffer contains all of our packet (including our custom header) so
    289290         * we just need to get prepare_packet to set all our packet pointers
     
    294295                return -1;
    295296       
     297        if (hdr->timestamptype == TS_TIMEVAL) {
     298                packet->order = (((uint64_t)hdr->tv.tv_sec) << 32)
     299                            + ((((uint64_t)hdr->tv.tv_usec) << 32) /1000000);
     300        } else if (hdr->timestamptype == TS_TIMESPEC) {
     301                packet->order = (((uint64_t)hdr->ts.tv_sec) << 32)
     302                            + ((((uint64_t)hdr->ts.tv_nsec) << 32) /1000000000);
     303        } else {
     304                packet->order = 0;
     305        }
     306
     307        if (packet->order <= stream->last_timestamp) {
     308                packet->order = stream->last_timestamp + 1;
     309        }
     310
     311        stream->last_timestamp = packet->order;
     312
    296313        return hdr->wirelen+sizeof(*hdr);
    297314}
  • lib/format_linux_ring.c

    ree6e802 rdb84bb2  
    447447#ifdef HAVE_NETPACKET_PACKET_H
    448448#define LIBTRACE_MIN(a,b) ((a)<(b) ? (a) : (b))
     449/* We use TP_STATUS_LIBTRACE to ensure we don't loop back on ourself
     450 * and read the same packet twice if an old packet has not yet been freed */
     451#define TP_STATUS_LIBTRACE 0xFFFFFFFF
     452
    449453inline static int linuxring_read_stream(libtrace_t *libtrace,
    450454                                        libtrace_packet_t *packet,
     
    470474         * ready for consumption.
    471475         */
    472         while (!(header->tp_status & TP_STATUS_USER)) {
     476        while (!(header->tp_status & TP_STATUS_USER) ||
     477               header->tp_status == TP_STATUS_LIBTRACE) {
     478                if ((ret=is_halted(libtrace)) != -1)
     479                        return ret;
    473480                pollset[0].fd = stream->fd;
    474481                pollset[0].events = POLLIN;
     
    509516                        }
    510517                } else {
    511                         /* Poll timed out - check if we should exit */
    512                         if (libtrace_halt)
    513                                 return 0;
     518                        /* Poll timed out - check if we should exit on next loop */
    514519                        continue;
    515520                }
    516521        }
    517 
    518522        packet->buffer = header;
    519523        packet->trace = libtrace;
     524       
     525        header->tp_status = TP_STATUS_LIBTRACE;
    520526
    521527        /* If a snaplen was configured, automatically truncate the packet to
     
    531537        stream->rxring_offset++;
    532538        stream->rxring_offset %= stream->req.tp_frame_nr;
     539
     540        packet->order = (((uint64_t)TO_TP_HDR2(packet->buffer)->tp_sec) << 32)
     541                        + ((((uint64_t)TO_TP_HDR2(packet->buffer)->tp_nsec)
     542                        << 32) / 1000000000);
     543
     544        if (packet->order <= stream->last_timestamp) {
     545                packet->order = stream->last_timestamp + 1;
     546        }
     547
     548        stream->last_timestamp = packet->order;
     549               
    533550
    534551        /* We just need to get prepare_packet to set all our packet pointers
     
    574591        /* Fetch the current frame */
    575592        header = GET_CURRENT_BUFFER(FORMAT_DATA_FIRST);
    576         if (header->tp_status & TP_STATUS_USER) {
     593        if (header->tp_status & TP_STATUS_USER &&
     594            header->tp_status != TP_STATUS_LIBTRACE) {
    577595                /* We have a frame waiting */
    578596                event.size = trace_read_packet(libtrace, packet);
  • lib/format_pcap.c

    r631fdbc r5e3f16c  
    447447                        case 1: break; /* no error */
    448448                        case 0:
    449                                 if (libtrace_halt)
    450                                         return 0;
     449                                if ((ret=is_halted(libtrace)) != -1)
     450                                        return ret;
    451451                                continue; /* timeout expired */
    452452                        case -1:
  • lib/hash_toeplitz.c

    ree6e802 r6f7cd4b  
    7171 * Creates a random unidirectional RSS key - a ip or ip+port combination in
    7272 * the opposite directions will most likely get different hashes.
     73 * @param key An array of bytes to retrieve the RSS key
     74 * @param num The number of bytes in key
     75 */
     76void toeplitz_ncreate_unikey(uint8_t *key, size_t num) {
     77        size_t i;
     78        unsigned int seed = time(NULL);
     79        for (i = 0; i < num; i++) {
     80                key[i] = (uint8_t) rand_r(&seed);
     81        }
     82}
     83
     84/**
     85 * Creates a random 40 byte unidirectional RSS key - a ip or ip+port combination
     86 * in the opposite directions will most likely get different hashes.
    7387 * @param key must have 40 bytes of space to retrieve random the key
    74  */ 
     88 */
    7589void toeplitz_create_unikey(uint8_t *key) {
    76         int i;
    77         unsigned int seed = time(NULL);
    78         for (i = 0; i < 40; i++) {
    79                 key[i] = (uint8_t) rand_r(&seed);
    80         }
     90        toeplitz_ncreate_unikey(key, 40);
    8191}
    8292
     
    8595 * in opposite directions will receive the same hash
    8696 * @param key must have 40 bytes of space to retrieve random the key
    87  */
    88 void toeplitz_create_bikey(uint8_t *key) {
     97 * @param num The number of bytes in the key, must be a multiple of 2
     98 */
     99void toeplitz_ncreate_bikey(uint8_t *key, size_t num) {
    89100        unsigned int seed = time(NULL);
    90         int i;
     101        size_t i;
     102        if (num % 2 != 0) {
     103                perror("Can not create a bidirectional key for an odd length key");
     104        }
    91105        // Every thing is 16bit (port=16, ipv4=32, ipv6=128
    92106        // aligned so this will make the hash bidirectional
    93107        uint16_t bi_key = (uint16_t) rand_r(&seed);
    94108        uint16_t *bi_rep = (uint16_t *) key;
    95         for (i = 0; i < 20; i++) {
     109        for (i = 0; i < num/2; i++) {
    96110                bi_rep[i] = bi_key;
    97111        }
     112}
     113
     114/**
     115 * Create a 40 byte bidirectional RSS key, i.e. ip and ip+port configurations
     116 * in opposite directions will receive the same hash
     117 * @param key An array of bytes to retrieve the RSS key
     118 */
     119void toeplitz_create_bikey(uint8_t *key) {
     120        toeplitz_ncreate_bikey(key, 40);
    98121}
    99122
     
    106129        }
    107130        toeplitz_hash_expand_key(conf);
     131        conf->hash_ipv4 = 1;
     132        conf->hash_ipv6 = 1;
     133        conf->hash_tcp_ipv4 = 1;
     134        conf->x_hash_udp_ipv4 = 1;
     135        conf->hash_tcp_ipv6 = 1;
     136        conf->x_hash_udp_ipv6 = 1;
    108137}
    109138
  • lib/hash_toeplitz.h

    ree6e802 r6f7cd4b  
    5858void toeplitz_init_config(toeplitz_conf_t *conf, bool bidirectional);
    5959uint64_t toeplitz_hash_packet(const libtrace_packet_t * pkt, const toeplitz_conf_t *cnf);
     60void toeplitz_ncreate_bikey(uint8_t *key, size_t num);
    6061void toeplitz_create_bikey(uint8_t *key);
     62void toeplitz_ncreate_unikey(uint8_t *key, size_t num);
    6163void toeplitz_create_unikey(uint8_t *key);
    6264
  • lib/libtrace.h.in

    r0cdd231 r317e903  
    260260/** If the packet has allocated its own memory the buffer_control should be
    261261 * set to TRACE_CTRL_PACKET, so that the memory will be freed when the packet
    262  * is destroyed. If the packet has been zerocopied out of memory owned by
     262 * is destroyed. If the packet has been zero-copied out of memory owned by
    263263 * something else, e.g. a DAG card, it should be TRACE_CTRL_EXTERNAL.
    264264 *
     
    282282/** Enumeration of error codes */
    283283enum {
    284         /** No Error has occured.... yet. */
     284        /** No Error has occurred.... yet. */
    285285        TRACE_ERR_NOERROR       = 0,
    286286        /** The URI passed to trace_create() is unsupported, or badly formed */
     
    336336#endif
    337337        TRACE_DLT_PPP_SERIAL = 50,
    338         TRACE_DLT_LINKTYPE_RAW = 101, /**< See TRACE_DLT_RAW for explainations of pain. */
     338        TRACE_DLT_LINKTYPE_RAW = 101, /**< See TRACE_DLT_RAW for explanations of pain. */
    339339        TRACE_DLT_C_HDLC = 104,
    340340        TRACE_DLT_IEEE802_11 = 105,
     
    548548        uint64_t hash; /**< A hash of the packet as supplied by the user */
    549549        int error; /**< The error status of pread_packet */
    550         uint64_t internalid;            /** Internal indentifier for the pkt */
     550        uint64_t internalid;            /** Internal identifier for the pkt */
    551551        void *srcbucket;
    552552} libtrace_packet_t;
     
    576576    TRACE_RADIOTAP_LOCK_QUALITY = 7, /**< Barker Code lock quality (uint16) */
    577577    TRACE_RADIOTAP_TX_ATTENUATION = 8, /**< TX attenuation as unitless distance from max power (uint16) */
    578     TRACE_RADIOTAP_DB_TX_ATTENUATION = 9, /**< TX attenutation as dB from max power (uint16) */
     578    TRACE_RADIOTAP_DB_TX_ATTENUATION = 9, /**< TX attenuation as dB from max power (uint16) */
    579579    TRACE_RADIOTAP_DBM_TX_POWER = 10, /**< TX Power in dBm (int8) */
    580580    TRACE_RADIOTAP_ANTENNA = 11, /**< Antenna frame was rx'd or tx'd on (uint8) */
     
    869869#define LIBTRACE_GRE_FLAG_VERMASK  0x0007
    870870
     871
     872/* PPTP GRE (RFC2637) */
     873#define LIBTRACE_GRE_FLAG_ACK      0x0080
     874#define LIBTRACE_GRE_PPTP_VERSION  0x0001
     875
    871876/** Libtrace local definition of VXLAN Header
    872877 * (draft-mahalingam-dutt-dcops-vxlan)
     
    11821187 * @param [out] format  A pointer that will be updated to point to an allocated
    11831188 *                      string holding the format component of the URI
    1184  * @return NULL if an error occured, otherwise return a pointer to the uridata
     1189 * @return NULL if an error occurred, otherwise return a pointer to the uridata
    11851190 * component
    11861191 *
     
    12071212 *  - rt:hostname:port
    12081213 *
    1209  *  If an error occured when attempting to open the trace file, a
     1214 *  If an error occurred when attempting to open the trace file, a
    12101215 *  trace is still returned so trace_is_err() should be called to find out
    1211  *  if an error occured. The trace is created in the configuration state, you
     1216 *  if an error occurred. The trace is created in the configuration state, you
    12121217 *  must call trace_start before attempting to read packets from the trace.
    12131218 */
     
    12381243 *  - pcap:/path/to/pcap/file
    12391244 *
    1240  *  If an error occured when attempting to open the output trace, a trace is
     1245 *  If an error occurred when attempting to open the output trace, a trace is
    12411246 *  still returned but trace_errno will be set. Use trace_is_err_out() and
    12421247 *  trace_perror_output() to get more information.
     
    13771382} trace_option_output_t;
    13781383
    1379 /* To add a new stat field update this list, and the relevent places in
     1384/* To add a new stat field update this list, and the relevant places in
    13801385 * libtrace_stat_t structure.
    13811386 */
     
    14301435        uint64_t filtered;
    14311436
    1432         /** The total number of good packets which have been recevied. Including
     1437        /** The total number of good packets which have been received. Including
    14331438         * those which are dropped and filtered. This does not include errors.
    14341439         *
     
    16101615 * Returns statistic counters for a trace, for a parallel trace this is a
    16111616 * combined total.
    1612  * Where possible these are retrived atomically, however this behaviour depends
     1617 * Where possible these are retrieved atomically, however this behaviour depends
    16131618 * on the underlying trace format.
    16141619 *
     
    16311636/**
    16321637 * Returns statistic counters for a single thread of a trace.
    1633  * Where possible these are retrived atomically, however this behaviour depends
     1638 * Where possible these are retrieved atomically, however this behaviour depends
    16341639 * on the underlying trace format.
    16351640 *
    16361641 * @param trace The input trace to examine.
    1637  * @param t An optional thread to received stats for or NULL to retrive stats
     1642 * @param t An optional thread to received stats for or NULL to retrieve stats
    16381643 *          for the current thread
    16391644 * @param stats Filled upon return with statistics about the trace, check the
     
    17341739 * function should be avoided where possible.
    17351740 *
    1736  * @par The reason you would want to use this function is that a zerocopied
     1741 * @par The reason you would want to use this function is that a zero-copied
    17371742 * packet from a device will be stored using memory owned by the device which
    17381743 * may be a limited resource. Copying the packet will ensure that the packet
     
    26932698 *
    26942699 * @note This function only works for OSPF version 2 packets.
    2695  * @note trace_get_next_ospf_lsa_v2() should be subequently used to process the LSAs
     2700 * @note trace_get_next_ospf_lsa_v2() should be subsequently used to process the LSAs
    26962701 */
    26972702DLLEXPORT SIMPLE_FUNCTION
     
    27162721 *
    27172722 * @note This function only works for OSPF version 2 packets.
    2718  * @note trace_get_next_ospf_lsa_header_v2() should be subequently used to process the LSA headers
     2723 * @note trace_get_next_ospf_lsa_header_v2() should be subsequently used to process the LSA headers
    27192724 */
    27202725DLLEXPORT SIMPLE_FUNCTION
     
    27392744 *
    27402745 * @note This function only works for OSPF version 2 packets.
    2741  * @note trace_get_next_ospf_link_v2() should be subequently used to process
     2746 * @note trace_get_next_ospf_link_v2() should be subsequently used to process
    27422747 * the links
    27432748 */
  • lib/libtrace_int.h

    ree6e802 r5e3f16c  
    327327        /** Synchronise writes/reads across this format object and attached threads etc */
    328328        pthread_mutex_t libtrace_lock;
     329        /** Packet read lock, seperate from libtrace_lock as to not block while reading a burst */
     330        pthread_mutex_t read_packet_lock;
    329331        /** State */
    330332        enum trace_state state;
     
    606608         * @return The size of the packet read (in bytes) including the capture
    607609         * framing header, or -1 if an error occurs. 0 is returned in the
    608          * event of an EOF.
     610         * event of an EOF or -2 in the case of interrupting the parallel API.
    609611         *
    610612         * If no packets are available for reading, this function should block
     
    10321034 */
    10331035extern volatile int libtrace_halt;
     1036
     1037/**
     1038 * Used by a format to check if trace_interrupt or if a trace_pause/stop has
     1039 * been called. Provides backwards compatibility with traditional read
     1040 * functions when trace_read_packet() is used by the parallel API.
     1041 *
     1042 * Returns -1 if not halting otherwise returns the code that the read
     1043 * operation should pass on.
     1044 */
     1045static inline int is_halted(libtrace_t *trace) {
     1046        if (!(libtrace_halt || trace->state == STATE_PAUSING)) {
     1047                return -1;
     1048        } else if (libtrace_halt) {
     1049                return READ_EOF;
     1050        } else {
     1051                return READ_MESSAGE;
     1052        }
     1053}
    10341054
    10351055/** Registers a new capture format module.
  • lib/protocols_l2.c

    ree6e802 r99351e3  
    313313                switch(ntohs(ppp->protocol)) {
    314314                        case 0x0021: *type = TRACE_ETHERTYPE_IP; break;
     315                        case 0x0057: *type = TRACE_ETHERTYPE_IPV6; break;                               
    315316                        /* If it isn't IP, then it is probably PPP control and
    316317                         * I can't imagine anyone caring about that too much
     
    395396                        case 0x0021: /* IP */
    396397                                *type = TRACE_ETHERTYPE_IP;
     398                                break;
     399                        case 0x0057: /* IPV6 */
     400                                *type = TRACE_ETHERTYPE_IPV6;
    397401                                break;
    398402                        case 0xc021: /* Link Control Protocol */
  • lib/protocols_transport.c

    ree6e802 r317e903  
    549549        uint32_t *remaining)
    550550{
     551    uint8_t flags = ntohs(gre->flags);
    551552    uint32_t size = 4; /* GRE is 4 bytes long by default */
    552553    if (remaining && *remaining < size) {
     
    555556    }
    556557
    557     if ((ntohs(gre->flags) & LIBTRACE_GRE_FLAG_CHECKSUM) != 0) {
    558         size += 4;  /* An extra 4 bytes. */
    559     }
    560 
    561     if ((ntohs(gre->flags) & LIBTRACE_GRE_FLAG_KEY) != 0) {
    562         size += 4;  /* An extra 4 bytes. */
    563     }
    564 
    565     if ((ntohs(gre->flags) & LIBTRACE_GRE_FLAG_SEQ) != 0) {
    566         size += 4;  /* An extra 4 bytes. */
     558    if((flags & LIBTRACE_GRE_FLAG_VERMASK) == LIBTRACE_GRE_PPTP_VERSION) {
     559        size += 4;
     560
     561        if ((flags & LIBTRACE_GRE_FLAG_SEQ) != 0) {
     562            size += 4;
     563        }
     564        if ((flags & LIBTRACE_GRE_FLAG_ACK) != 0) {
     565            size += 4;
     566        }
     567    } else {
     568
     569        if ((ntohs(gre->flags) & LIBTRACE_GRE_FLAG_CHECKSUM) != 0) {
     570            size += 4;  /* An extra 4 bytes. */
     571        }
     572
     573        if ((ntohs(gre->flags) & LIBTRACE_GRE_FLAG_KEY) != 0) {
     574           size += 4;  /* An extra 4 bytes. */
     575        }
     576
     577        if ((ntohs(gre->flags) & LIBTRACE_GRE_FLAG_SEQ) != 0) {
     578            size += 4;  /* An extra 4 bytes. */
     579        }
    567580    }
    568581
  • lib/trace.c

    r0cdd231 radb2c4c  
    261261        /* Parallel inits */
    262262        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     263        ASSERT_RET(pthread_mutex_init(&libtrace->read_packet_lock, NULL), == 0);
    263264        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
    264265        libtrace->state = STATE_NEW;
     
    385386        /* Parallel inits */
    386387        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     388        ASSERT_RET(pthread_mutex_init(&libtrace->read_packet_lock, NULL), == 0);
    387389        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
    388390        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
     
    548550
    549551        /* Finish the last packet we read - for backwards compatibility */
    550         if (libtrace->last_packet)
     552        if (!libtrace_parallel && libtrace->last_packet)
    551553                trace_fin_packet(libtrace->last_packet);
    552554        assert(libtrace->last_packet == NULL);
     
    680682
    681683        ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0);
     684        ASSERT_RET(pthread_mutex_destroy(&libtrace->read_packet_lock), == 0);
    682685        ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0);
    683686
     
    694697
    695698        /* Finish any the last packet we read - for backwards compatibility */
    696         if (libtrace->last_packet) {
     699        if (!libtrace_parallel && libtrace->last_packet) {
    697700                trace_fin_packet(libtrace->last_packet);
    698701        }
     
    758761
    759762        ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0);
     763        ASSERT_RET(pthread_mutex_destroy(&libtrace->read_packet_lock), == 0);
    760764        ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0);
    761765
     
    833837                packet->trace->format->fin_packet(packet);
    834838        }
    835         if (packet->trace && packet->trace->last_packet == packet)
     839        if (!libtrace_parallel && packet->trace &&
     840             packet->trace->last_packet == packet) {
    836841                packet->trace->last_packet = NULL;
     842        }
    837843       
    838844        if (packet->buf_control == TRACE_CTRL_PACKET && packet->buffer) {
     
    864870
    865871                if (packet->trace) {
    866                         pthread_mutex_lock(&packet->trace->libtrace_lock);
    867                         if (packet->trace->last_packet == packet)
    868                                 packet->trace->last_packet = NULL;
    869                         pthread_mutex_unlock(&packet->trace->libtrace_lock);
     872                        if (!libtrace_parallel && packet->trace->last_packet == packet)
     873                                packet->trace->last_packet = NULL;
    870874                }
    871875
     
    921925                        size_t ret;
    922926                        int filtret;
     927                        if ((ret=is_halted(libtrace)) != (size_t)-1)
     928                                return ret;
    923929                        /* Store the trace we are reading from into the packet opaque
    924930                         * structure */
    925931                        packet->trace = libtrace;
    926932                        ret=libtrace->format->read_packet(libtrace,packet);
    927                         if (ret==(size_t)-1 || ret==0) {
     933                        if (ret==(size_t)READ_MESSAGE ||
     934                            ret==(size_t)-1 || ret==0) {
    928935                                packet->trace = NULL;
    929936                                return ret;
     
    953960                        ++libtrace->accepted_packets;
    954961                        ++libtrace->sequence_number;
    955                         if (packet->trace == libtrace)
     962                        if (!libtrace_parallel && packet->trace == libtrace)
    956963                                libtrace->last_packet = packet;
    957964
     
    9961003
    9971004        packet->trace = trace;
    998         pthread_mutex_lock(&trace->libtrace_lock);
    999         trace->last_packet = packet;
    1000         pthread_mutex_unlock(&trace->libtrace_lock);
     1005        if (!libtrace_parallel)
     1006                trace->last_packet = packet;
    10011007        /* Clear packet cache */
    10021008        trace_clear_cache(packet);
  • lib/trace_parallel.c

    ree6e802 rdb84bb2  
    677677                        if (!trace->pread) {
    678678                                assert(packets[0]);
     679                                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    679680                                nb_packets = trace_read_packet(trace, packets[0]);
     681                                ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    680682                                packets[0]->error = nb_packets;
    681683                                if (nb_packets > 0)
     
    787789                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
    788790                assert(packet);
    789 
    790                 if (libtrace_halt) {
    791                         packet->error = 0;
    792                         break;
    793                 }
    794791
    795792                // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only)
     
    821818
    822819                if ((packet->error = trace_read_packet(trace, packet)) <1) {
    823                         break; /* We are EOF or error'd either way we stop  */
     820                        if (packet->error == READ_MESSAGE) {
     821                                pkt_skipped = 1;
     822                                continue;
     823                        } else {
     824                                break; /* We are EOF or error'd either way we stop  */
     825                        }
    824826                }
    825827
     
    889891        //bool tick_hit = false;
    890892
    891         ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     893        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
    892894        /* Read nb_packets */
    893895        for (i = 0; i < nb_packets; ++i) {
    894                 if (libtrace_halt) {
    895                         break;
     896                if (libtrace_message_queue_count(&t->messages) > 0) {
     897                        if ( i==0 ) {
     898                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
     899                                return READ_MESSAGE;
     900                        } else {
     901                                break;
     902                        }
    896903                }
    897904                packets[i]->error = trace_read_packet(libtrace, packets[i]);
     
    900907                        /* We'll catch this next time if we have already got packets */
    901908                        if ( i==0 ) {
    902                                 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     909                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
    903910                                return packets[i]->error;
    904911                        } else {
     
    916923                store_first_packet(libtrace, packets[0], t);
    917924        }
    918         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     925        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
    919926        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
    920927        if (tick_hit) {
     
    10071014                        /* Check if we are newer than the previous 'first' packet */
    10081015                        size_t first = libtrace->first_packets.first;
    1009                         if (trace_get_seconds(dup) <
    1010                                 trace_get_seconds(libtrace->first_packets.packets[first].packet))
     1016                        struct timeval cur_ts = trace_get_timeval(dup);
     1017                        struct timeval first_ts = trace_get_timeval(libtrace->first_packets.packets[first].packet);
     1018                        if (timercmp(&cur_ts, &first_ts, <))
    10111019                                libtrace->first_packets.first = t->perpkt_num;
    10121020                }
     
    13361344                                        trace_set_capture_length(packets[i],
    13371345                                                        libtrace->snaplen);
    1338                                 trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i]));
    13391346                        }
    13401347                } while(ret == 0);
     
    14711478                libtrace->config.reporter_thold = 100;
    14721479        if (libtrace->config.burst_size <= 0)
    1473                 libtrace->config.burst_size = 10;
     1480                libtrace->config.burst_size = 32;
    14741481        if (libtrace->config.thread_cache_size <= 0)
    1475                 libtrace->config.thread_cache_size = 20;
     1482                libtrace->config.thread_cache_size = 64;
    14761483        if (libtrace->config.cache_size <= 0)
    14771484                libtrace->config.cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
     
    17071714        libtrace->perpkt_threads = NULL;
    17081715        /* Set a global which says we are using a parallel trace. This is
    1709          * for backwards compatability due to changes when destroying packets */
     1716         * for backwards compatibility due to changes when destroying packets */
    17101717        libtrace_parallel = 1;
    17111718
     
    17281735                if (libtrace->perpkt_thread_count > 1)
    17291736                        libtrace->pread = trace_pread_packet_first_in_first_served;
     1737                        /* Don't wait for a burst of packets if the format is
     1738                         * live as this could block ring based formats and
     1739                         * introduces delay. */
     1740                        if (libtrace->format->info.live) {
     1741                                libtrace->config.burst_size = 1;
     1742                        }
    17301743                else
    17311744                        /* Use standard read_packet */
  • test/test-live.c

    r8decff7 rcac1d92  
    171171        if (!stat->received_valid) {
    172172                printf("\tInfo: trace does not support received counter\n");
    173         } else if (stat->received != 100) {
    174                 ERROR("Trace received %zu/100 packets\n", stat->received);
     173        } else if (stat->received != (uint32_t) test_size) {
     174                ERROR("Trace received %zu/%u packets\n", stat->received,
     175                                (uint32_t)test_size);
    175176        }
    176177
  • test/test-tracetime-parallel.c

    r8decff7 rb148e3b  
    232232
    233233        trace_set_reporter_thold(trace, 1);
     234        trace_set_burst_size(trace, 10);
    234235
    235236        // Start it
  • tools/tracertstats/tracertstats.c

    ree6e802 r0b4b388  
    258258        }
    259259        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    260         trace_set_tracetime(trace, true);
    261260        trace_set_perpkt_threads(trace, threadcount);
    262261        trace_set_burst_size(trace, burstsize);
     
    264263        if (trace_get_information(trace)->live) {
    265264                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
     265        } else {
     266                trace_set_tracetime(trace, true);
    266267        }
    267268
Note: See TracChangeset for help on using the changeset viewer.