Changes in / [a6c77b0:17c5749]


Ignore:
Files:
27 added
23 edited

Legend:

Unmodified
Added
Removed
  • README

    r9ad7a35 r9ad7a35  
     1This fork of Libtrace aims to support parallel packet processing.
     2
     3This is still work in progress and is full of bugs, some of the original
     4Libtrace functions might not function correctly breaking the supplied tools.
     5
    16libtrace 3.0.20
    27
  • lib/Makefile.am

    rbd119b3 rdafe86a  
    22include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 
    33
    4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@
    5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@
     4AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread
     5AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread
    66
    77extra_DIST = format_template.c
     
    3333export RTE_SDK=@RTE_SDK@
    3434export RTE_TARGET=@RTE_TARGET@
     35export SAVED_CFLAGS:=$(CFLAGS)
     36export SAVED_CXXFLAGS:=$(CXXFLAGS)
    3537include $(RTE_SDK)/mk/rte.vars.mk
    3638# We need to add -Wl before the linker otherwise this breaks our build
     
    3840export DPDK_LIBTRACE_MK=dpdk_libtrace.mk
    3941include $(DPDK_LIBTRACE_MK)
     42export CFLAGS += $(SAVED_CFLAGS)
     43export CXXFLAGS += $(SAVED_CXXFLAGS)
    4044endif
    4145
    42 libtrace_la_SOURCES = trace.c common.h \
     46libtrace_la_SOURCES = trace.c trace_parallel.c common.h \
    4347                format_erf.c format_pcap.c format_legacy.c \
    4448                format_rt.c format_helper.c format_helper.h format_pcapfile.c \
     
    5256                $(DAGSOURCE) format_erf.h \
    5357                $(BPFJITSOURCE) \
    54                 libtrace_arphrd.h
     58                libtrace_arphrd.h \
     59                data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \
     60                data-struct/deque.c data-struct/sliding_window.c \
     61                hash_toeplitz.c
    5562
    5663if DAG2_4
  • lib/format_atmhdr.c

    r5952ff0 rb13b939  
    231231        trace_event_trace,              /* trace_event */
    232232        NULL,                           /* help */
    233         NULL                            /* next pointer */
     233        NULL,                            /* next pointer */
     234        NON_PARALLEL(false)
    234235};
    235236       
  • lib/format_bpf.c

    r19b44c8 r10e183f  
    599599        trace_event_device,     /* trace_event */
    600600        bpf_help,               /* help */
    601         NULL
     601        NULL,                   /* next pointer */
     602        NON_PARALLEL(true)
    602603};
    603604#else   /* HAVE_DECL_BIOCSETIF */
     
    648649        NULL,                   /* trace_event */
    649650        bpf_help,               /* help */
    650         NULL
     651        NULL,                   /* next pointer */
     652        NON_PARALLEL(true)
    651653};
    652654#endif  /* HAVE_DECL_BIOCSETIF */
  • lib/format_dag24.c

    rc909fad rb13b939  
    557557        trace_event_dag,                /* trace_event */
    558558        dag_help,                       /* help */
    559         NULL                            /* next pointer */
     559        NULL,                            /* next pointer */
     560    NON_PARALLEL(true)
    560561};
    561562
  • lib/format_dag25.c

    rc909fad rb13b939  
    12101210        trace_event_dag,                /* trace_event */
    12111211        dag_help,                       /* help */
    1212         NULL                            /* next pointer */
     1212        NULL, /* pstart_input */
     1213        NULL, /* pread_packet */
     1214        NULL, /* ppause_input */
     1215        NULL, /* pfin_input */
     1216        NULL, /* pconfig_input */
     1217        NULL,                            /* next pointer */
     1218        NON_PARALLEL(true)
    12131219};
    12141220
  • lib/format_dpdk.c

    r2138553 r50ce607  
    4141 */
    4242
     43#define _GNU_SOURCE
     44
    4345#include "config.h"
    4446#include "libtrace.h"
     
    4648#include "format_helper.h"
    4749#include "libtrace_arphrd.h"
     50#include "hash_toeplitz.h"
    4851
    4952#ifdef HAVE_INTTYPES_H
     
    7275#include <rte_mempool.h>
    7376#include <rte_mbuf.h>
     77#include <rte_launch.h>
     78#include <rte_lcore.h>
     79#include <rte_per_lcore.h>
     80#include <pthread.h>
    7481
    7582/* The default size of memory buffers to use - This is the max size of standard
     
    107114#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    108115#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     116#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     117
    109118#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    110119                        (uint64_t) tv.tv_usec*1000ull)
     
    129138
    130139/* Print verbose messages to stdout */
    131 #define DEBUG 0
     140#define DEBUG 1
    132141
    133142/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     
    176185    DPDK_RUNNING,
    177186    DPDK_PAUSED,
     187};
     188
     189struct dpdk_per_lcore_t
     190{
     191        // TODO move time stamp stuff here
     192        uint16_t queue_id;
     193        uint8_t port;
    178194};
    179195
     
    194210    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
    195211    unsigned int nb_blacklist; /* Number of blacklist items in are valid */
     212    uint8_t rss_key[40]; // This is the RSS KEY
    196213#if HAS_HW_TIMESTAMPS_82580
    197214    /* Timestamping only relevent to RX */
     
    200217    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    201218#endif
     219        // DPDK normally seems to have a limit of 8 queues for a given card
     220        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
    202221};
    203222
     
    336355    if (global_config != NULL) {
    337356        int i;
    338         printf("Intel DPDK setup\n"
     357        fprintf(stderr, "Intel DPDK setup\n"
    339358               "---Version      : %"PRIu32"\n"
    340359               "---Magic        : %"PRIu32"\n"
     
    345364       
    346365        for (i = 0 ; i < nb_cpu; i++) {
    347             printf("   ---Core %d : %s\n", i,
     366            fprintf(stderr, "   ---Core %d : %s\n", i,
    348367                   global_config->lcore_role[i] == ROLE_RTE ? "on" : "off");
    349368        }
     
    366385                proc_type = "something worse than invalid!!";
    367386        }
    368         printf("---Process Type : %s\n", proc_type);
    369     }
    370    
    371 }
    372 #endif
     387        fprintf(stderr, "---Process Type : %s\n", proc_type);
     388    }
     389   
     390}
     391#endif
     392
     393/**
     394 * Expects to be called from the master lcore and moves it to the given dpdk id
     395 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     396 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     397 *               and not already in use.
     398 * @return 0 is successful otherwise -1 on error.
     399 */
     400static inline int dpdk_move_master_lcore(size_t core) {
     401    struct rte_config *cfg = rte_eal_get_configuration();
     402    cpu_set_t cpuset;
     403    int i;
     404
     405    assert (core < RTE_MAX_LCORE);
     406    assert (rte_get_master_lcore() == rte_lcore_id());
     407
     408    if (core == rte_lcore_id())
     409        return 0;
     410
     411    // Make sure we are not overwriting someone else
     412    assert(!rte_lcore_is_enabled(core));
     413
     414    // Move the core
     415    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     416    cfg->lcore_role[core] = ROLE_RTE;
     417    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     418    rte_eal_get_configuration()->master_lcore = core;
     419    RTE_PER_LCORE(_lcore_id) = core;
     420
     421    // Now change the affinity
     422    CPU_ZERO(&cpuset);
     423
     424    if (lcore_config[core].detected) {
     425        CPU_SET(core, &cpuset);
     426    } else {
     427        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     428            if (lcore_config[i].detected)
     429                CPU_SET(i, &cpuset);
     430        }
     431    }
     432
     433    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     434    if (i != 0) {
     435        // TODO proper libtrace style error here!!
     436        fprintf(stderr, "pthread_setaffinity_np failed\n");
     437        return -1;
     438    }
     439    return 0;
     440}
     441
    373442
    374443static inline int dpdk_init_enviroment(char * uridata, struct dpdk_format_data_t * format_data,
     
    377446    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
    378447    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
     448    char mem_map[20] = {0}; /* The memory name */
    379449    long nb_cpu; /* The number of CPUs in the system */
    380450    long my_cpu; /* The CPU number we want to bind to */
     451    int i;
     452    struct rte_config *cfg = rte_eal_get_configuration();
    381453   
    382454#if DEBUG
     
    386458#endif
    387459    /* Using proc-type auto allows this to be either primary or secondary
    388      * Secondary allows two intances of libtrace to be used on different
     460     * Secondary allows two instances of libtrace to be used on different
    389461     * ports. However current version of DPDK doesn't support this on the
    390      * same card (My understanding is this should work with two seperate
     462     * same card (My understanding is this should work with two separate
    391463     * cards).
     464     *
     465     * Using unique file prefixes mean separate memory is used, unlinking
     466     * the two processes. However be careful we still cannot access a
     467     * port that already in use.
    392468     */
    393     char* argv[] = {"libtrace", "-c", NULL, "-n", "1", "--proc-type", "auto", NULL};
     469    char* argv[] = {"libtrace", "-c", cpu_number, "-n", "1", "--proc-type", "auto",
     470                "--file-prefix", mem_map, "-m", "256", NULL};
    394471    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    395472   
    396     /* This initilises the Enviroment Abstraction Layer (EAL)
     473    /* This initialises the Environment Abstraction Layer (EAL)
    397474     * If we had slave workers these are put into WAITING state
    398475     *
    399476     * Basically binds this thread to a fixed core, which we choose as
    400477     * the last core on the machine (assuming fewer interrupts mapped here).
    401      * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so om
     478     * "-c" controls the cpu mask 0x1=1st core 0x2=2nd 0x4=3rd and so on
    402479     * "-n" the number of memory channels into the CPU (hardware specific)
    403480     *      - Most likely to be half the number of ram slots in your machine.
     
    436513    }
    437514
    438     /* Make our mask */
    439     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    440     argv[2] = cpu_number;
    441 
     515    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
     516       info older versions */
     517    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     518    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     519
     520        /* Give this a name */
     521        snprintf(mem_map, sizeof(mem_map), "libtrace-%d", (int) getpid());
    442522    /* rte_eal_init it makes a call to getopt so we need to reset the
    443523     * global optind variable of getopt otherwise this fails */
     
    448528        return -1;
    449529    }
     530
     531    // These are still running but will never do anything with DPDK v1.7 we
     532    // should remove this XXX in the future
     533    for(i = 0; i < RTE_MAX_LCORE; ++i) {
     534        if (rte_lcore_is_enabled(i) && i != rte_get_master_lcore()) {
     535            cfg->lcore_role[i] = ROLE_OFF;
     536            cfg->lcore_count--;
     537        }
     538    }
     539    // Only the master should be running
     540    assert(cfg->lcore_count == 1);
     541
     542    dpdk_move_master_lcore(my_cpu-1);
     543
    450544#if DEBUG
    451545    dump_configuration();
     
    478572        return -1;
    479573    }
     574   
     575    struct rte_eth_dev_info dev_info;
     576    rte_eth_dev_info_get(0, &dev_info);
     577    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
     578                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
    480579
    481580    return 0;
     
    504603    FORMAT(libtrace)->wrap_count = 0;
    505604#endif
    506 
     605       
    507606    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    508607        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     
    547646};
    548647
     648static int dpdk_pconfig_input (libtrace_t *libtrace,
     649                                trace_parallel_option_t option,
     650                                void *data) {
     651        switch (option) {
     652                case TRACE_OPTION_SET_HASHER:
     653                        switch (*((enum hasher_types *) data))
     654                        {
     655                                case HASHER_BALANCE:
     656                                case HASHER_UNIDIRECTIONAL:
     657                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     658                                        return 0;
     659                                case HASHER_BIDIRECTIONAL:
     660                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     661                                        return 0;
     662                                case HASHER_HARDWARE:
     663                                case HASHER_CUSTOM:
     664                                        // We don't support these
     665                                        return -1;
     666                        }
     667        break;
     668        }
     669        return -1;
     670}
    549671/**
    550672 * Note here snaplen excludes the MAC checksum. Packets over
     
    596718static struct rte_eth_conf port_conf = {
    597719        .rxmode = {
     720                .mq_mode = ETH_RSS,
    598721                .split_hdr_size = 0,
    599722                .header_split   = 0, /**< Header Split disabled */
     
    620743                .mq_mode = ETH_DCB_NONE,
    621744        },
     745        .rx_adv_conf = {
     746                .rss_conf = {
     747                        // .rss_key = &rss_key, // We set this per format
     748                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     749                },
     750        },
    622751};
    623752
     
    634763static const struct rte_eth_txconf tx_conf = {
    635764        .tx_thresh = {
    636                 .pthresh = 36,/* TX_PTHRESH prefetch */
    637                 .hthresh = 0,/* TX_HTHRESH host */
    638                 .wthresh = 4,/* TX_WTHRESH writeback */
     765        /**
     766         * TX_PTHRESH prefetch
     767         * Set on the NIC, if the number of unprocessed descriptors to queued on
     768         * the card fall below this try grab at least hthresh more unprocessed
     769         * descriptors.
     770         */
     771                .pthresh = 36,
     772
     773        /* TX_HTHRESH host
     774         * Set on the NIC, the batch size to prefetch unprocessed tx descriptors.
     775         */
     776                .hthresh = 0,
     777       
     778        /* TX_WTHRESH writeback
     779         * Set on the NIC, the number of sent descriptors before writing back
     780         * status to confirm the transmission. This is done more efficiently as
     781         * a bulk DMA-transfer rather than writing one at a time.
     782         * Similar to tx_free_thresh however this is applied to the NIC, where
     783         * as tx_free_thresh is when DPDK will check these. This is extended
     784         * upon by tx_rs_thresh (10Gbit cards) which doesn't write all
     785         * descriptors rather only every n'th item, reducing DMA memory bandwidth.
     786         */
     787                .wthresh = 4,
    639788        },
    640         .tx_free_thresh = 0, /* Use PMD default values */
    641         .tx_rs_thresh = 0, /* Use PMD default values */
     789
     790    /* Used internally by DPDK rather than passed to the NIC. The number of
     791     * packet descriptors to send before checking for any responses written
     792     * back (to confirm the transmission). Default = 32 if set to 0)
     793     */
     794        .tx_free_thresh = 0,
     795
     796    /* This is the Report Status threshold, used by 10Gbit cards,
     797     * This signals the card to only write back status (such as
     798     * transmission successful) after this minimum number of transmit
     799     * descriptors are seen. The default is 32 (if set to 0) however if set
     800     * to greater than 1 TX wthresh must be set to zero, because this is kindof
     801     * a replacement. See the dpdk programmers guide for more restrictions.
     802     */
     803        .tx_rs_thresh = 1,
    642804};
    643805
     
    683845         */
    684846#if DEBUG
    685     printf("Creating mempool named %s\n", format_data->mempool_name);
     847    fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name);
    686848#endif
    687849        format_data->pktmbuf_pool =
     
    710872     */
    711873   
     874   
     875    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
     876   
    712877    /* This must be called first before another *eth* function
    713878     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
     
    719884        return -1;
    720885    }
    721     /* Initilise the TX queue a minimum value if using this port for
     886    /* Initialise the TX queue a minimum value if using this port for
    722887     * receiving. Otherwise a larger size if writing packets.
    723888     */
     
    730895        return -1;
    731896    }
    732     /* Initilise the RX queue with some packets from memory */
     897    /* Initialise the RX queue with some packets from memory */
    733898    ret = rte_eth_rx_queue_setup(format_data->port, format_data->queue_id,
    734899                            format_data->nb_rx_buf, SOCKET_ID_ANY,
     
    761926    rte_eth_link_get(format_data->port, &link_info);
    762927#if DEBUG
    763     printf("Link status is %d %d %d\n", (int) link_info.link_status,
     928    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
    764929            (int) link_info.link_duplex, (int) link_info.link_speed);
    765930#endif
    766931
     932    /* We have now successfully started/unpaused */
     933    format_data->paused = DPDK_RUNNING;
     934   
     935    return 0;
     936}
     937
     938/* Attach memory to the port and start (or restart) the port/s.
     939 */
     940static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues){
     941    int ret, i; /* Check return values for errors */
     942    struct rte_eth_link link_info; /* Wait for link */
     943   
     944    /* Already started */
     945    if (format_data->paused == DPDK_RUNNING)
     946        return 0;
     947
     948    /* First time started we need to alloc our memory, doing this here
     949     * rather than in environment setup because we don't have snaplen then */
     950    if (format_data->paused == DPDK_NEVER_STARTED) {
     951        if (format_data->snaplen == 0) {
     952            format_data->snaplen = RX_MBUF_SIZE;
     953            port_conf.rxmode.jumbo_frame = 0;
     954            port_conf.rxmode.max_rx_pkt_len = 0;
     955        } else {
     956            /* Use jumbo frames */
     957            port_conf.rxmode.jumbo_frame = 1;
     958            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     959        }
     960
     961        /* This is additional overhead so make sure we allow space for this */
     962#if GET_MAC_CRC_CHECKSUM
     963        format_data->snaplen += ETHER_CRC_LEN;
     964#endif
     965#if HAS_HW_TIMESTAMPS_82580
     966        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     967#endif
     968
     969        /* Create the mbuf pool, which is the place our packets are allocated
     970         * from - TODO figure out if there is is a free function (I cannot see one)
     971         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     972         * allocate however that extra 1 packet is not used.
     973         * (I assume <= vs < error some where in DPDK code)
     974         * TX requires nb_tx_buffers + 1 in the case the queue is full
     975         * so that will fill the new buffer and wait until slots in the
     976         * ring become available.
     977         */
     978#if DEBUG
     979    printf("Creating mempool named %s\n", format_data->mempool_name);
     980#endif
     981        format_data->pktmbuf_pool =
     982            rte_mempool_create(format_data->mempool_name,
     983                       format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1,
     984                       format_data->snaplen + sizeof(struct rte_mbuf)
     985                                        + RTE_PKTMBUF_HEADROOM,
     986                       8, sizeof(struct rte_pktmbuf_pool_private),
     987                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     988                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
     989
     990        if (format_data->pktmbuf_pool == NULL) {
     991            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     992                        "pool failed: %s", strerror(rte_errno));
     993            return -1;
     994        }
     995    }
     996   
     997    /* ----------- Now do the setup for the port mapping ------------ */
     998    /* Order of calls must be
     999     * rte_eth_dev_configure()
     1000     * rte_eth_tx_queue_setup()
     1001     * rte_eth_rx_queue_setup()
     1002     * rte_eth_dev_start()
     1003     * other rte_eth calls
     1004     */
     1005   
     1006    /* This must be called first before another *eth* function
     1007     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
     1008    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     1009    if (ret < 0) {
     1010        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1011                            " %"PRIu8" : %s", format_data->port,
     1012                            strerror(-ret));
     1013        return -1;
     1014    }
     1015#if DEBUG
     1016    printf("Doing dev configure\n");
     1017#endif
     1018    /* Initialise the TX queue a minimum value if using this port for
     1019     * receiving. Otherwise a larger size if writing packets.
     1020     */
     1021    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
     1022                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
     1023    if (ret < 0) {
     1024        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     1025                            " %"PRIu8" : %s", format_data->port,
     1026                            strerror(-ret));
     1027        return -1;
     1028    }
     1029   
     1030    for (i=0; i < rx_queues; i++) {
     1031#if DEBUG
     1032    printf("Doing queue configure\n");
     1033#endif 
     1034                /* Initialise the RX queue with some packets from memory */
     1035                ret = rte_eth_rx_queue_setup(format_data->port, i,
     1036                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
     1037                                                                &rx_conf, format_data->pktmbuf_pool);
     1038        /* Init per_thread data structures */
     1039        format_data->per_lcore[i].port = format_data->port;
     1040        format_data->per_lcore[i].queue_id = i;
     1041
     1042                if (ret < 0) {
     1043                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     1044                                                " %"PRIu8" : %s", format_data->port,
     1045                                                strerror(-ret));
     1046                        return -1;
     1047                }
     1048        }
     1049   
     1050#if DEBUG
     1051    fprintf(stderr, "Doing start device\n");
     1052#endif 
     1053    /* Start device */
     1054    ret = rte_eth_dev_start(format_data->port);
     1055#if DEBUG
     1056    fprintf(stderr, "Done start device\n");
     1057#endif 
     1058    if (ret < 0) {
     1059        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1060                    strerror(-ret));
     1061        return -1;
     1062    }
     1063
     1064
     1065    /* Default promiscuous to on */
     1066    if (format_data->promisc == -1)
     1067        format_data->promisc = 1;
     1068   
     1069    if (format_data->promisc == 1)
     1070        rte_eth_promiscuous_enable(format_data->port);
     1071    else
     1072        rte_eth_promiscuous_disable(format_data->port);
     1073   
     1074   
    7671075    /* We have now successfully started/unpased */
    7681076    format_data->paused = DPDK_RUNNING;
    7691077   
     1078    // Can use remote launch for all
     1079    /*RTE_LCORE_FOREACH_SLAVE(i) {
     1080                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
     1081        }*/
     1082   
     1083    /* Wait for the link to come up */
     1084    rte_eth_link_get(format_data->port, &link_info);
     1085#if DEBUG
     1086    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
     1087            (int) link_info.link_duplex, (int) link_info.link_speed);
     1088#endif
     1089
    7701090    return 0;
    7711091}
     
    7841104}
    7851105
     1106static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
     1107    struct rte_eth_dev_info dev_info;
     1108    rte_eth_dev_info_get(port_id, &dev_info);
     1109    return dev_info.max_rx_queues;
     1110}
     1111
     1112static inline size_t dpdk_processor_count () {
     1113    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1114    if (nb_cpu <= 0)
     1115        return 1;
     1116    else
     1117        return (size_t) nb_cpu;
     1118}
     1119
     1120static int dpdk_pstart_input (libtrace_t *libtrace) {
     1121    char err[500];
     1122    int i=0, phys_cores=0;
     1123    int tot = libtrace->perpkt_thread_count;
     1124    err[0] = 0;
     1125
     1126    if (rte_lcore_id() != rte_get_master_lcore())
     1127        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
     1128
     1129    // If the master is not on the last thread we move it there
     1130    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1131        // Consider error handling here
     1132        dpdk_move_master_lcore(RTE_MAX_LCORE - 1) == -1;
     1133    }
     1134
     1135    // Don't exceed the number of cores in the system/detected by dpdk
     1136    // We don't have to force this but performance wont be good if we don't
     1137    for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1138        if (lcore_config[i].detected) {
     1139            if (rte_lcore_is_enabled(i))
     1140                fprintf(stderr, "Found core %d already in use!\n", i);
     1141            else
     1142                phys_cores++;
     1143        }
     1144    }
     1145
     1146        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1147    tot = MIN(tot, phys_cores);
     1148
     1149        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
     1150       
     1151    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1152        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1153        free(libtrace->format_data);
     1154        libtrace->format_data = NULL;
     1155        return -1;
     1156    }
     1157
     1158    // Make sure we only start the number that we should
     1159    libtrace->perpkt_thread_count = tot;
     1160    return 0;
     1161}
     1162
     1163
     1164/**
     1165 * Register a thread with the DPDK system,
     1166 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1167 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1168 * gives it.
     1169 *
     1170 * We then allow a mapper thread to be started on every real core as DPDK would
     1171 * we also bind these to the corresponding CPU cores.
     1172 *
     1173 * @param libtrace A pointer to the trace
     1174 * @param reading True if the thread will be used to read packets, i.e. will
     1175 *                call pread_packet(), false if thread used to process packet
     1176 *                in any other manner including statistics functions.
     1177 */
     1178static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1179{
     1180    struct rte_config *cfg = rte_eal_get_configuration();
     1181    int i;
     1182    int new_id = -1;
     1183
     1184    // If 'reading packets' fill in cores from 0 up and bind affinity
     1185    // otherwise start from the MAX core (which is also the master) and work backwards
     1186    // in this case physical cores on the system will not exist so we don't bind
     1187    // these to any particular physical core
     1188    if (reading) {
     1189        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1190            if (!rte_lcore_is_enabled(i)) {
     1191                new_id = i;
     1192                if (!lcore_config[i].detected)
     1193                    fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
     1194                break;
     1195            }
     1196        }
     1197    } else {
     1198        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1199            if (!rte_lcore_is_enabled(i)) {
     1200                new_id = i;
     1201                break;
     1202            }
     1203        }
     1204    }
     1205
     1206    if (new_id == -1) {
     1207        assert(cfg->lcore_count == RTE_MAX_LCORE);
     1208        // TODO proper libtrace style error here!!
     1209        fprintf(stderr, "Too many threads for DPDK!!\n");
     1210        return -1;
     1211    }
     1212
     1213    // Enable the core in global DPDK structs
     1214    cfg->lcore_role[new_id] = ROLE_RTE;
     1215    cfg->lcore_count++;
     1216    // Set TLS to reflect our new number
     1217    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
     1218    fprintf(stderr, "original id%d", rte_lcore_id());
     1219    RTE_PER_LCORE(_lcore_id) = new_id;
     1220    fprintf(stderr, " new id%d\n", rte_lcore_id());
     1221
     1222    if (reading) {
     1223        // Set affinity bind to corresponding core
     1224        cpu_set_t cpuset;
     1225        CPU_ZERO(&cpuset);
     1226        CPU_SET(rte_lcore_id(), &cpuset);
     1227        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1228        if (i != 0) {
     1229            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
     1230            return -1;
     1231        }
     1232    }
     1233
     1234    // Map our TLS to the thread data
     1235    if (reading) {
     1236        if(t->type == THREAD_PERPKT) {
     1237            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
     1238        } else {
     1239            t->format_data = &FORMAT(libtrace)->per_lcore[0];
     1240        }
     1241    }
     1242}
     1243
     1244
     1245/**
     1246 * Unregister a thread with the DPDK system.
     1247 *
     1248 * Only previously registered threads should be calling this just before
     1249 * they are destroyed.
     1250 */
     1251static int dpdk_punregister_thread(libtrace_t libtrace, libtrace_thread_t *t UNUSED)
     1252{
     1253    struct rte_config *cfg = rte_eal_get_configuration();
     1254
     1255    assert(rte_lcore_id() >= 0 && rte_lcore_id() < RTE_MAX_LCORE);
     1256
     1257    // Skip if master!!
     1258    if (rte_lcore_id() == rte_get_master_lcore()) {
     1259        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1260        return 0;
     1261    }
     1262
     1263    // Disable this core in global DPDK structs
     1264    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1265    cfg->lcore_count--;
     1266    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1267    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1268    return 0;
     1269}
     1270
    7861271static int dpdk_start_output(libtrace_out_t *libtrace)
    7871272{
     
    8021287    if (FORMAT(libtrace)->paused == DPDK_RUNNING) {
    8031288#if DEBUG     
    804         printf("Pausing port\n");
     1289        fprintf(stderr, "Pausing port\n");
    8051290#endif
    8061291        rte_eth_dev_stop(FORMAT(libtrace)->port);
     
    11161601}
    11171602
     1603
     1604static void dpdk_fin_packet(libtrace_packet_t *packet)
     1605{
     1606        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1607                rte_pktmbuf_free(packet->buffer);
     1608                packet->buffer = NULL;
     1609        }
     1610}
     1611
    11181612static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    11191613    int nb_rx; /* Number of rx packets we've recevied */
     
    11451639            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    11461640        }
     1641    }
     1642   
     1643    /* We'll never get here - but if we did it would be bad */
     1644    return -1;
     1645}
     1646
     1647static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
     1648    int nb_rx; /* Number of rx packets we've recevied */
     1649    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
     1650
     1651    /* Free the last packet buffer */
     1652    if (packet->buffer != NULL) {
     1653        /* Buffer is owned by DPDK */
     1654        if ( packet->buf_control == TRACE_CTRL_EXTERNAL) {
     1655            rte_pktmbuf_free(packet->buffer);
     1656            packet->buffer = NULL;
     1657        } else
     1658        /* Buffer is owned by packet i.e. has been malloc'd */
     1659        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1660            free(packet->buffer);
     1661            packet->buffer = NULL;
     1662        }
     1663    }
     1664   
     1665    packet->buf_control = TRACE_CTRL_EXTERNAL;
     1666    packet->type = TRACE_RT_DATA_DPDK;
     1667   
     1668    /* Wait for a packet */
     1669    while (1) {
     1670        /* Poll for a single packet */
     1671        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
     1672                            PERPKT_FORMAT(t)->queue_id, pkts_burst, 1);
     1673        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
     1674                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     1675            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
     1676        }
     1677        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
     1678        if (libtrace_message_queue_count(&t->messages) > 0) {
     1679                        printf("Extra message yay");
     1680                        return -2;
     1681                }
    11471682    }
    11481683   
     
    13081843}
    13091844
    1310  static struct libtrace_format_t dpdk = {
     1845static struct libtrace_format_t dpdk = {
    13111846        "dpdk",
    13121847        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
     
    13251860        dpdk_read_packet,           /* read_packet */
    13261861        dpdk_prepare_packet,    /* prepare_packet */
    1327         NULL,                               /* fin_packet */
     1862        dpdk_fin_packet,                                    /* fin_packet */
    13281863        dpdk_write_packet,          /* write_packet */
    13291864        dpdk_get_link_type,         /* get_link_type */
     
    13481883        dpdk_trace_event,               /* trace_event */
    13491884    dpdk_help,              /* help */
    1350         NULL
     1885    NULL,                   /* next pointer */
     1886    {true, 8},              /* Live, NICs typically have 8 threads */
     1887    dpdk_pstart_input, /* pstart_input */
     1888        dpdk_pread_packet, /* pread_packet */
     1889        dpdk_pause_input, /* ppause */
     1890        dpdk_fin_input, /* p_fin */
     1891        dpdk_pconfig_input, /* pconfig_input */
     1892    dpdk_pregister_thread, /* pregister_thread */
     1893    dpdk_punregister_thread /* unpregister_thread */
    13511894};
    13521895
  • lib/format_duck.c

    r9b097ea rb13b939  
    360360        NULL,                           /* trace_event */
    361361        duck_help,                      /* help */
    362         NULL                            /* next pointer */
     362        NULL,                            /* next pointer */
     363        NON_PARALLEL(false)
    363364};
    364365
  • lib/format_erf.c

    rf7bcbfb rb13b939  
    828828        erf_event,                      /* trace_event */
    829829        erf_help,                       /* help */
    830         NULL                            /* next pointer */
     830        NULL,                           /* next pointer */
     831        NON_PARALLEL(false)
    831832};
    832833
     
    871872        erf_event,                      /* trace_event */
    872873        erf_help,                       /* help */
    873         NULL                            /* next pointer */
     874        NULL,                           /* next pointer */
     875        NON_PARALLEL(false)
    874876};
    875877
  • lib/format_legacy.c

    r1ca603b rb13b939  
    552552        trace_event_trace,              /* trace_event */
    553553        legacyatm_help,                 /* help */
    554         NULL                            /* next pointer */
     554        NULL,                           /* next pointer */
     555        NON_PARALLEL(false)
    555556};
    556557
     
    595596        trace_event_trace,              /* trace_event */
    596597        legacyeth_help,                 /* help */
    597         NULL                            /* next pointer */
     598        NULL,                           /* next pointer */
     599        NON_PARALLEL(false)
    598600};
    599601
     
    639641        legacypos_help,                 /* help */
    640642        NULL,                           /* next pointer */
     643        NON_PARALLEL(false)
    641644};
    642645
     
    682685        legacynzix_help,                /* help */
    683686        NULL,                           /* next pointer */
     687        NON_PARALLEL(false)
    684688};
    685689       
  • lib/format_linux.c

    r53eb1aa r53eb1aa  
    7272#include <sys/mman.h>
    7373
     74#include <fcntl.h>
     75
    7476/* MAX_ORDER is defined in linux/mmzone.h. 10 is default for 2.4 kernel.
    7577 * max_order will be decreased by one if the ring buffer fails to allocate.
     
    147149#define PACKET_HDRLEN   11
    148150#define PACKET_TX_RING  13
     151#define PACKET_FANOUT   18
    149152#define TP_STATUS_USER  0x1
    150153#define TP_STATUS_SEND_REQUEST  0x1
     
    154157#define TPACKET_ALIGN(x)        (((x)+TPACKET_ALIGNMENT-1)&~(TPACKET_ALIGNMENT-1))
    155158#define TPACKET_HDRLEN         (TPACKET_ALIGN(sizeof(struct tpacket2_hdr)) + sizeof(struct sockaddr_ll))
     159
     160/* Since 3.1 kernel we have packet_fanout support */
     161// schedule to socket by skb's rxhash - the implementation is bi-directional
     162#define PACKET_FANOUT_HASH              0
     163// schedule round robin
     164#define PACKET_FANOUT_LB                1
     165// schedule to the same socket that received the packet
     166#define PACKET_FANOUT_CPU               2
     167// Something to do with fragmented packets and hashing problems !! TODO figure out if this needs to be on
     168#define PACKET_FANOUT_FLAG_DEFRAG       0x8000
     169/* Included but unused by libtrace since 3.10 */
     170// if one socket if full roll over to the next
     171#define PACKET_FANOUT_ROLLOVER          3
     172// This flag makes any other system roll over
     173#define PACKET_FANOUT_FLAG_ROLLOVER     0x1000
     174/* Included but unused by libtrace since 3.12 */
     175// schedule random
     176#define PACKET_FANOUT_RND               4
     177
    156178
    157179enum tpacket_versions {
     
    185207        unsigned int tp_frame_nr;    /* Total number of frames */
    186208};
     209
     210struct linux_per_thread_t {
     211        char *rx_ring;
     212        int rxring_offset;
     213        int fd;
     214        // The flag layout should be the same for all (I Hope)
     215        // max_order
     216} ALIGN_STRUCT(CACHE_LINE_SIZE);
    187217
    188218struct linux_format_data_t {
     
    212242        /* Used to determine buffer size for the ring buffer */
    213243        uint32_t max_order;
     244        /* Used for the parallel case, fanout is the mode */
     245        uint16_t fanout_flags;
     246        /* The group lets Linux know which sockets to group together
     247         * so we use a random here to try avoid collisions */
     248        uint16_t fanout_group;
     249        /* When running in parallel mode this is malloc'd with an array
     250         * file descriptors from packet fanout will use, here we assume/hope
     251         * that every ring can get setup the same */
     252        struct linux_per_thread_t *per_thread;
    214253};
    215254
     
    266305
    267306#define FORMAT(x) ((struct linux_format_data_t*)(x))
     307#define PERPKT_FORMAT(x) ((struct linux_per_thread_t*)(x->format_data))
    268308#define DATAOUT(x) ((struct linux_output_format_data_t*)((x)->format_data))
    269309
     
    367407        FORMAT(libtrace->format_data)->rxring_offset = 0;
    368408        FORMAT(libtrace->format_data)->max_order = MAX_ORDER;
     409        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB; // This might be best or alternatively PACKET_FANOUT_LB
     410        // Some examples use pid for the group however that would limit a single
     411        // application to use only int/ring format, instead using rand
     412        FORMAT(libtrace->format_data)->fanout_group = (uint16_t) rand();
     413        FORMAT(libtrace->format_data)->per_thread = NULL;
    369414}
    370415static int linuxring_init_input(libtrace_t *libtrace)
    371416{       
    372417        init_input(libtrace);
    373         FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_RING;
     418        FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_RING;
    374419        return 0;
    375420}
     
    377422{
    378423        init_input(libtrace);
    379         FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_NATIVE;
     424        FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_NATIVE;
    380425        return 0;
    381426}
     
    582627        return 0;
    583628}
    584 static int linuxring_start_input(libtrace_t *libtrace){
    585 
    586         char error[2048];       
     629
     630/**
     631 * Converts a socket, either packet_mmap or standard raw socket into a
     632 * fanout socket.
     633 * NOTE: This means we can read from the socket with multiple queues,
     634 * each must be setup (identically) and then this called upon them
     635 *
     636 * @return 0 success, -1 error
     637 */
     638static inline int socket_to_packet_fanout(int fd,
     639                                        uint16_t fanout_flags,
     640                                        uint16_t fanout_group) {
     641        int fanout_opt = ((int)fanout_flags << 16) | (int)fanout_group;
     642        if (setsockopt(fd, SOL_PACKET, PACKET_FANOUT,
     643                        &fanout_opt, sizeof(fanout_opt)) == -1) {
     644                return -1;
     645        }
     646        return 0;
     647}
     648
     649static int linuxnative_ppause_input(libtrace_t *libtrace)
     650{
     651        int i;
     652        int tot = libtrace->perpkt_thread_count;
     653        printf("CAlling native pause packet\n");
     654       
     655        for (i = 0; i < tot; i++) {
     656                close(FORMAT(libtrace->format_data)->per_thread[i].fd);
     657        }
     658       
     659        free(FORMAT(libtrace->format_data)->per_thread);
     660        FORMAT(libtrace->format_data)->per_thread = NULL;
     661        return 0;
     662}
     663
     664static int linuxring_start_input(libtrace_t *libtrace)
     665{
     666        char error[2048];
    587667
    588668        /* We set the socket up the same and then convert it to PACKET_MMAP */
     
    609689}
    610690
     691static int linuxnative_pstart_input(libtrace_t *libtrace) {
     692        int i = 0;
     693        int tot = libtrace->perpkt_thread_count;
     694        int iserror = 0;
     695        // We store this here otherwise it will be leaked if the memory doesn't know
     696        struct linux_per_thread_t *per_thread = NULL;
     697       
     698        if (!FORMAT(libtrace->format_data)->per_thread) {
     699                //per_thread = calloc(tot, sizeof(struct linux_per_thread_t));
     700                posix_memalign((void **)&per_thread, CACHE_LINE_SIZE, tot*sizeof(struct linux_per_thread_t));
     701                FORMAT(libtrace->format_data)->per_thread = per_thread;
     702        } else {
     703                // Whats going on this might not work 100%
     704                // We assume all sockets have been closed ;)
     705                printf("Pause and then start called again lets hope that perpkt_thread_count hasn't changed\n");
     706        }
     707       
     708        printf("Calling native pstart packet\n");
     709        for (i = 0; i < tot; ++i)
     710        {
     711                if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_NATIVE) {
     712                        if (linuxnative_start_input(libtrace) != 0) {
     713                                iserror = 1;
     714                                break;
     715                        }
     716                } else {
     717                        // This must be ring
     718                        if (linuxring_start_input(libtrace) != 0) {
     719                                iserror = 1;
     720                                break;
     721                        }
     722                }
     723                if (socket_to_packet_fanout(FORMAT(libtrace->format_data)->fd, FORMAT(libtrace->format_data)->fanout_flags, FORMAT(libtrace->format_data)->fanout_group) != 0)
     724                {
     725                        iserror = 1;
     726                        // Clean up here to keep consistent with every one else
     727                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Converting the fd to a socket fanout failed");
     728                        close(FORMAT(libtrace->format_data)->fd);
     729                        free(libtrace->format_data);
     730                        libtrace->format_data = NULL;
     731                        break;
     732                }
     733                per_thread[i].fd = FORMAT(libtrace->format_data)->fd;
     734                if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_RING) {
     735                        per_thread[i].rxring_offset = FORMAT(libtrace->format_data)->rxring_offset;
     736                        per_thread[i].rx_ring = FORMAT(libtrace->format_data)->rx_ring;
     737                }
     738        }
     739       
     740        // Roll back those that failed - by this point in time the format_data
     741        // has been freed
     742        if (iserror) {
     743                for (i = i - 1; i >= 0; i--) {
     744                        close(per_thread[i].fd);
     745                }
     746                free(per_thread);
     747                per_thread = NULL;
     748                return -1;
     749        }
     750       
     751        return 0;
     752}
     753
     754static int linux_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading) {
     755        fprintf(stderr, "registering thread %d!!\n", t->perpkt_num);
     756    if (reading) {
     757        if(t->type == THREAD_PERPKT) {
     758            t->format_data = &FORMAT(libtrace->format_data)->per_thread[t->perpkt_num];
     759        } else {
     760            t->format_data = &FORMAT(libtrace->format_data)->per_thread[0];
     761        }
     762    }
     763    return 0;
     764}
     765
    611766static int linuxnative_start_output(libtrace_out_t *libtrace)
    612767{
     
    615770                free(DATAOUT(libtrace));
    616771                return -1;
    617         }       
     772        }
    618773
    619774        return 0;
     
    660815        return 0;
    661816}
     817
    662818static int linuxring_pause_input(libtrace_t *libtrace)
    663819{
     
    800956#endif /* HAVE_NETPACKET_PACKET_H */
    801957
     958
     959static int linuxnative_pconfig_input(libtrace_t *libtrace,
     960                trace_parallel_option_t option,
     961                void *data)
     962{
     963        switch(option) {
     964                case TRACE_OPTION_SET_HASHER:
     965                        switch (*((enum hasher_types *)data)) {
     966                                case HASHER_BALANCE:
     967                                        // Do fanout
     968                                        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB;
     969                                        // Or we could balance to the CPU
     970                                        return 0;
     971                                case HASHER_BIDIRECTIONAL:
     972                                case HASHER_UNIDIRECTIONAL:
     973                                        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_HASH;
     974                                        return 0;
     975                                case HASHER_CUSTOM:
     976                                case HASHER_HARDWARE:
     977                                        return -1;
     978                        }
     979                        break;
     980                /* Avoid default: so that future options will cause a warning
     981                 * here to remind us to implement it, or flag it as
     982                 * unimplementable
     983                 */
     984        }
     985       
     986        /* Don't set an error - trace_config will try to deal with the
     987         * option and will set an error if it fails */
     988        return -1;
     989}
     990
     991
    802992static int linuxnative_prepare_packet(libtrace_t *libtrace UNUSED,
    803993                libtrace_packet_t *packet, void *buffer,
     
    8711061
    8721062#ifdef HAVE_NETPACKET_PACKET_H
    873 static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1063libtrace_thread_t * get_thread_table(libtrace_t *libtrace) ;
     1064inline static int linuxnative_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, const int check_queue)
    8741065{
    8751066        struct libtrace_linuxnative_header *hdr;
     
    8791070        struct cmsghdr *cmsg;
    8801071        int snaplen;
     1072
    8811073        uint32_t flags = 0;
    8821074       
     
    9141106        iovec.iov_base = (void*)(packet->buffer+sizeof(*hdr));
    9151107        iovec.iov_len = snaplen;
    916 
    917         hdr->wirelen = recvmsg(FORMAT(libtrace->format_data)->fd, &msghdr, MSG_TRUNC);
    918 
     1108       
     1109        if (check_queue) {
     1110                // Check for a packet - TODO only Linux has MSG_DONTWAIT should use fctl O_NONBLOCK
     1111                hdr->wirelen = recvmsg(fd, &msghdr, MSG_DONTWAIT | MSG_TRUNC);
     1112                if ((int) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
     1113                        // Do message queue check or select
     1114                        int ret;
     1115                        fd_set rfds;
     1116                        FD_ZERO(&rfds);
     1117                        FD_SET(fd, &rfds);
     1118                        FD_SET(get_thread_table(libtrace)->messages.pipefd[0], &rfds);
     1119                        int largestfd = fd > get_thread_table(libtrace)->messages.pipefd[0] ? fd : get_thread_table(libtrace)->messages.pipefd[0];
     1120                       
     1121                        do {
     1122                                ret = select(largestfd+1, &rfds, NULL, NULL, NULL);
     1123                                if (ret == -1 && errno != EINTR)
     1124                                        perror("Select() failed");
     1125                        }
     1126                        while (ret == -1);
     1127                       
     1128                        assert (ret == 1 || ret == 2); // No timeout 0 is not an option
     1129                       
     1130                        if (FD_ISSET(get_thread_table(libtrace)->messages.pipefd[0], &rfds)) {
     1131                                // Not an error but check the message queue we have something
     1132                                return -2;
     1133                        }
     1134                        // Otherwise we must have a packet
     1135                        hdr->wirelen = recvmsg(fd, &msghdr, 0);
     1136                }
     1137        } else {
     1138                hdr->wirelen = recvmsg(fd, &msghdr, MSG_TRUNC);
     1139        }
     1140       
    9191141        if (hdr->wirelen==~0U) {
    9201142                trace_set_err(libtrace,errno,"recvmsg");
     
    9641186        if (cmsg == NULL) {
    9651187                struct timeval tv;
    966                 if (ioctl(FORMAT(libtrace->format_data)->fd,
    967                                   SIOCGSTAMP,&tv)==0) {
     1188                if (ioctl(fd, SIOCGSTAMP,&tv)==0) {
    9681189                        hdr->tv.tv_sec = tv.tv_sec;
    9691190                        hdr->tv.tv_usec = tv.tv_usec;
     
    9861207}
    9871208
     1209static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1210{
     1211        int fd = FORMAT(libtrace->format_data)->fd;
     1212        return linuxnative_read_packet_fd(libtrace, packet, fd, 0);
     1213}
     1214
     1215static int linuxnative_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet)
     1216{
     1217        int fd = PERPKT_FORMAT(t)->fd;
     1218        //fprintf(stderr, "Thread number is #%d fd=%d\n", t->perpkt_num, PERPKT_FORMAT(t)->fd);
     1219        return linuxnative_read_packet_fd(libtrace, packet, fd, 1);
     1220}
     1221
    9881222#define LIBTRACE_BETWEEN(test,a,b) ((test) >= (a) && (test) < (b))
    9891223static int linuxring_get_capture_length(const libtrace_packet_t *packet);
     
    9921226/* Release a frame back to the kernel or free() if it's a malloc'd buffer
    9931227 */
    994 inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet ){
     1228inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet){
    9951229        /* Free the old packet */
    9961230        if(packet->buffer == NULL)
     
    10041238                struct linux_format_data_t *ftd = FORMAT(libtrace->format_data);
    10051239               
    1006                 /* Check it's within our buffer first */
    1007                 if(LIBTRACE_BETWEEN((char *) packet->buffer,
     1240                /* Check it's within our buffer first - consider the pause resume case it might have already been free'd lets hope we get another buffer */
     1241                // For now let any one free anything
     1242                /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
    10081243                                (char *) ftd->rx_ring,
    10091244                                ftd->rx_ring
    1010                                 + ftd->req.tp_block_size * ftd->req.tp_block_nr)){
     1245                                + ftd->req.tp_block_size * ftd->req.tp_block_nr)){*/
    10111246                        TO_TP_HDR(packet->buffer)->tp_status = 0;
    10121247                        packet->buffer = NULL;
    1013                 }
    1014         }
    1015 }
    1016 
    1017 static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1248                /*}*/
     1249        }
     1250}
     1251
     1252/**
     1253 * Free any resources being kept for this packet, Note: libtrace
     1254 * will ensure all fields are zeroed correctly.
     1255 */
     1256static void linuxring_fin_packet(libtrace_packet_t *packet)
     1257{
     1258
     1259        if (packet->buffer == NULL)
     1260                return;
     1261        assert(packet->trace);
     1262       
     1263        // Started should always match the existence of the rx_ring
     1264        assert(!!FORMAT(packet->trace->format_data)->rx_ring == !!packet->trace->started);
     1265       
     1266        // Our packets are always under our control
     1267        assert(packet->buf_control == TRACE_CTRL_EXTERNAL);
     1268       
     1269        if (FORMAT(packet->trace->format_data)->rx_ring) // If we don't have a ring its already been destroyed or paused
     1270                ring_release_frame(packet->trace, packet);
     1271        else
     1272                packet->buffer = NULL;
     1273}
     1274
     1275inline static int linuxring_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, int *rxring_offset, char *rx_ring, int message) {
    10181276
    10191277        struct tpacket2_hdr *header;
    1020         struct pollfd pollset; 
    10211278        int ret;
    10221279        unsigned int snaplen;
     
    10281285       
    10291286        /* Fetch the current frame */
    1030         header = GET_CURRENT_BUFFER(libtrace);
     1287        header = ((void*) rx_ring) + *rxring_offset * FORMAT(libtrace->format_data)->req.tp_frame_size; // GET_CURRENT_BUFFER(libtrace);
    10311288        assert((((unsigned long) header) & (pagesize - 1)) == 0);
    10321289
     
    10361293         */
    10371294        while (!(header->tp_status & TP_STATUS_USER)) {
    1038                 pollset.fd = FORMAT(libtrace->format_data)->fd;
    1039                 pollset.events = POLLIN;
    1040                 pollset.revents = 0;
    1041                 /* Wait for more data */
    1042                 ret = poll(&pollset, 1, -1);
    1043                 if (ret < 0) {
    1044                         if (errno != EINTR)
    1045                                 trace_set_err(libtrace,errno,"poll()");
    1046                         return -1;
     1295                if (message) {
     1296                        struct pollfd pollset[2];
     1297                        pollset[0].fd = fd;
     1298                        pollset[0].events = POLLIN;
     1299                        pollset[0].revents = 0;
     1300                        pollset[1].fd = libtrace_message_queue_get_fd(&get_thread_table(libtrace)->messages);
     1301                        pollset[1].events = POLLIN;
     1302                        pollset[1].revents = 0;
     1303                        /* Wait for more data or a message*/
     1304                        ret = poll(pollset, 2, -1);
     1305                        if (ret < 0) {
     1306                                if (errno != EINTR)
     1307                                        trace_set_err(libtrace,errno,"poll()");
     1308                                return -1;
     1309                        }
     1310                        // Check for a message otherwise loop
     1311                        if (pollset[1].revents)
     1312                                return -2;
     1313                } else {
     1314                        struct pollfd pollset;
     1315                        pollset.fd = fd;
     1316                        pollset.events = POLLIN;
     1317                        pollset.revents = 0;
     1318
     1319                        /* Wait for more data or a message*/
     1320                        ret = poll(&pollset, 1, -1);
     1321                        if (ret < 0) {
     1322                                if (errno != EINTR)
     1323                                        trace_set_err(libtrace,errno,"poll()");
     1324                                return -1;
     1325                        }
    10471326                }
    10481327        }
     
    10601339
    10611340        /* Move to next buffer */
    1062         FORMAT(libtrace->format_data)->rxring_offset++;
    1063         FORMAT(libtrace->format_data)->rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
     1341        (*rxring_offset)++;
     1342        *rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
    10641343
    10651344        /* We just need to get prepare_packet to set all our packet pointers
     
    10711350                                linuxring_get_capture_length(packet);
    10721351
     1352}
     1353
     1354static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1355        int fd = FORMAT(libtrace->format_data)->fd;
     1356        int *rxring_offset = &FORMAT(libtrace->format_data)->rxring_offset;
     1357        char *rx_ring = FORMAT(libtrace->format_data)->rx_ring;
     1358        return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 0);
     1359}
     1360
     1361static int linuxring_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
     1362        fprintf(stderr, "Thread number is #%d\n", t->perpkt_num);
     1363        int fd = PERPKT_FORMAT(t)->fd;
     1364        int *rxring_offset = &PERPKT_FORMAT(t)->rxring_offset;
     1365        char *rx_ring = PERPKT_FORMAT(t)->rx_ring;
     1366        return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 1);
    10731367}
    10741368
     
    11281422
    11291423        return ret;
    1130 
    1131 }
     1424}
     1425
    11321426static int linuxring_write_packet(libtrace_out_t *trace,
    11331427                libtrace_packet_t *packet)
     
    15601854        trace_event_device,             /* trace_event */
    15611855        linuxnative_help,               /* help */
     1856        NULL,                                   /* next pointer */
     1857        {true, -1},              /* Live, no thread limit */
     1858        linuxnative_pstart_input,                       /* pstart_input */
     1859        linuxnative_pread_packet,                       /* pread_packet */
     1860        linuxnative_ppause_input,                       /* ppause */
     1861        linuxnative_fin_input,                          /* p_fin */
     1862        linuxnative_pconfig_input,                      /* pconfig input */
     1863        linux_pregister_thread,
    15621864        NULL
    15631865};
     
    15801882        linuxring_read_packet,  /* read_packet */
    15811883        linuxring_prepare_packet,       /* prepare_packet */
    1582         NULL,                           /* fin_packet */
     1884        linuxring_fin_packet,                           /* fin_packet */
    15831885        linuxring_write_packet, /* write_packet */
    15841886        linuxring_get_link_type,        /* get_link_type */
     
    16031905        linuxring_event,                /* trace_event */
    16041906        linuxring_help,         /* help */
     1907        NULL,                           /* next pointer */
     1908        {true, -1},              /* Live, no thread limit */
     1909        linuxnative_pstart_input,                       /* pstart_input */
     1910        linuxring_pread_packet,                 /* pread_packet */
     1911        linuxnative_ppause_input,                       /* ppause */
     1912        linuxnative_fin_input,                          /* p_fin */
     1913        linuxnative_pconfig_input,
     1914        linux_pregister_thread,
    16051915        NULL
    16061916};
     
    16551965        trace_event_device,             /* trace_event */
    16561966        linuxnative_help,               /* help */
    1657         NULL
     1967        NULL,                   /* next pointer */
     1968        NON_PARALLEL(true)
    16581969};
    16591970
     
    16982009        NULL,                           /* trace_event */
    16992010        linuxring_help,                 /* help */
    1700         NULL
     2011        NULL,                   /* next pointer */
     2012        NON_PARALLEL(true)
    17012013};
    17022014
  • lib/format_pcap.c

    ra6c77b0 ra6c77b0  
    805805        trace_event_trace,              /* trace_event */
    806806        pcap_help,                      /* help */
    807         NULL                            /* next pointer */
     807        NULL,                   /* next pointer */
     808        NON_PARALLEL(false)
    808809};
    809810
     
    848849        trace_event_device,             /* trace_event */
    849850        pcapint_help,                   /* help */
    850         NULL                            /* next pointer */
     851        NULL,                   /* next pointer */
     852        NON_PARALLEL(true)
    851853};
    852854
  • lib/format_pcapfile.c

    rf7bcbfb rb13b939  
    774774        pcapfile_event,         /* trace_event */
    775775        pcapfile_help,                  /* help */
    776         NULL                            /* next pointer */
     776        NULL,                   /* next pointer */
     777        NON_PARALLEL(false)
    777778};
    778779
  • lib/format_rt.c

    r90e8d92 rb13b939  
    858858        trace_event_rt,             /* trace_event */
    859859        rt_help,                        /* help */
    860         NULL                            /* next pointer */
     860        NULL,                   /* next pointer */
     861        NON_PARALLEL(true) /* This is normally live */
    861862};
    862863
  • lib/format_tsh.c

    rc909fad rb13b939  
    269269        trace_event_trace,              /* trace_event */
    270270        tsh_help,                       /* help */
    271         NULL                            /* next pointer */
     271        NULL,                   /* next pointer */
     272        NON_PARALLEL(false)
    272273};
    273274
     
    317318        trace_event_trace,              /* trace_event */
    318319        tsh_help,                       /* help */
    319         NULL                            /* next pointer */
     320        NULL,                   /* next pointer */
     321        NON_PARALLEL(false)
    320322};
    321323
  • lib/libtrace.h.in

    r17f954f r50ce607  
    195195#endif
    196196
     197// Used to fight against false sharing
     198#define CACHE_LINE_SIZE 64
     199#define ALIGN_STRUCT(x) __attribute__((aligned(x)))
     200
    197201#ifdef _MSC_VER
    198202    #ifdef LT_BUILDING_DLL
     
    223227/** Opaque structure holding information about a bpf filter */
    224228typedef struct libtrace_filter_t libtrace_filter_t;
     229
     230/** Structure holding information about a result */
     231typedef struct libtrace_result_t {
     232        uint64_t key;
     233        void * value;
     234        int is_packet;
     235} libtrace_result_t;
     236
     237typedef struct libtrace_thread_t libtrace_thread_t;
    225238
    226239/** If the packet has allocated its own memory the buffer_control should be
     
    509522        uint8_t transport_proto;        /**< Cached transport protocol */
    510523        uint32_t l4_remaining;          /**< Cached transport remaining */
     524        uint64_t order; /**< Notes the order of this packet in relation to the input */
     525        uint64_t hash; /**< A hash of the packet as supplied by the user */
     526        int error; /**< The error status of pread_packet */
    511527} libtrace_packet_t;
    512528
     
    31473163/*@}*/
    31483164
     3165union libtrace_64byte_things {
     3166        void *ptr;
     3167        int64_t sint64;
     3168        uint64_t uint64;
     3169        uint32_t uint32s[2];
     3170        int32_t sint32s[2];
     3171        uint32_t uint32;
     3172        int32_t sint32;
     3173        int sint;
     3174        unsigned int uint;
     3175        char schars[8];
     3176        char uchars[8];
     3177};
     3178
     3179typedef struct libtrace_message_t {
     3180        int code;
     3181        union libtrace_64byte_things additional;
     3182        libtrace_thread_t *sender;
     3183} libtrace_message_t;
     3184
     3185typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
     3186typedef void* (*fn_reducer)(libtrace_t* trace, void* global_blob);
     3187typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
     3188
     3189DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer);
     3190DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet);
     3191DLLEXPORT int trace_ppause(libtrace_t *libtrace);
     3192DLLEXPORT int trace_pstop(libtrace_t *libtrace);
     3193DLLEXPORT void trace_join(libtrace_t * trace);
     3194DLLEXPORT void print_contention_stats (libtrace_t *libtrace);
     3195
     3196DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
     3197DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result);
     3198DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, void * value);
     3199DLLEXPORT void* libtrace_result_get_value(libtrace_result_t * result);
     3200DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, void * value);
     3201DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
     3202
     3203// Ways to access Global and TLS storage that we provide the user
     3204DLLEXPORT void * trace_get_global(libtrace_t *trace);
     3205DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data);
     3206DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data);
     3207DLLEXPORT void * trace_get_tls(libtrace_thread_t *t);
     3208
     3209
     3210DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value);
     3211DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet);
     3212typedef struct libtrace_vector libtrace_vector_t;
     3213DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results);
     3214
     3215DLLEXPORT int trace_post_reduce(libtrace_t *libtrace);
     3216DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace);
     3217DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     3218DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     3219DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message);
     3220DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
     3221DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message);
     3222DLLEXPORT int trace_finished(libtrace_t * libtrace);
     3223DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet);
     3224DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet);
     3225DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order);
     3226DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash);
     3227DLLEXPORT uint64_t tv_to_usec(struct timeval *tv);
     3228
     3229DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
     3230
     3231typedef enum {
     3232        /**
     3233         * Sets the hasher function, if NULL(default) no hashing is used a
     3234         * cores will get packets on a first in first served basis
     3235         */
     3236        TRACE_OPTION_SET_HASHER,
     3237       
     3238        /**
     3239         * See diagrams, this sets the maximum size of freelist used to
     3240         * maintain packets and there memory buffers.
     3241         * NOTE setting this to less than recommend could cause deadlock a
     3242         * trace that manages its own packets.
     3243         * A unblockable error message will be printed.
     3244         */
     3245        TRACE_OPTION_SET_PACKET_FREELIST_SIZE,
     3246       
     3247        /**
     3248         * See diagrams, this sets the maximum size of buffers used between
     3249         * the single hasher thread and the buffer.
     3250         * NOTE setting this to less than recommend could cause deadlock a
     3251         * trace that manages its own packets.
     3252         * A unblockable warning message will be printed to stderr in this case.
     3253         */
     3254        TRACE_OPTION_SET_PERPKT_BUFFER_SIZE,
     3255       
     3256        /**
     3257         * Libtrace set perpkt thread count
     3258         */
     3259        TRACE_OPTION_SET_PERPKT_THREAD_COUNT,
     3260       
     3261        /**
     3262         * Libtrace should expect sequential keys from the output to count
     3263         * up starting numbered from 1, 2, 3 ...
     3264         * such as is the case with numbered packets.
     3265         *
     3266         * ALSO consider - TRACE_OPTIONS_ORDERED_RESULTS suitable for live formats
     3267         */
     3268         TRACE_OPTION_SEQUENTIAL,
     3269         
     3270         /**
     3271          * Libtrace ordered results, results in each queue are ordered by key
     3272          * however my not be sequential, a typically case is packet timestamps
     3273          * the reducer will receive packets in order - note threasholds
     3274          * will be used such that a empty queue wont break things
     3275          */
     3276         TRACE_OPTION_ORDERED,
     3277         
     3278         
     3279         /**
     3280          * When accepting ordered results if a threashold is meet before an
     3281          * older result is available from another queue drop that packet
     3282          */
     3283          TRACE_DROP_OUT_OF_ORDER,
     3284
     3285          /**
     3286           * Delays packets so they are played back in trace-time rather than as fast
     3287           * as possible.
     3288           */
     3289          TRACE_OPTION_TRACETIME,
     3290
     3291          /**
     3292           * Specifies the interval between tick packets in milliseconds, if 0
     3293           * or less this is ignored.
     3294           */
     3295          TRACE_OPTION_TICK_INTERVAL
     3296} trace_parallel_option_t;
     3297
     3298enum libtrace_messages {
     3299        MESSAGE_STARTED,
     3300        MESSAGE_DO_PAUSE,
     3301        MESSAGE_PAUSING,
     3302        MESSAGE_PAUSED,
     3303        MESSAGE_DO_STOP,
     3304        MESSAGE_STOPPED,
     3305        MESSAGE_FIRST_PACKET,
     3306        MESSAGE_PERPKT_ENDED,
     3307        MESSAGE_PERPKT_RESUMED,
     3308        MESSAGE_PERPKT_PAUSED,
     3309        MESSAGE_PERPKT_EOF,
     3310        MESSAGE_POST_REDUCE,
     3311        MESSAGE_POST_RANGE,
     3312        MESSAGE_TICK,
     3313        MESSAGE_USER
     3314};
     3315
     3316enum hasher_types {
     3317        /**
     3318         * Balance load across CPUs best as possible, this is basically to say do
     3319         * not care about hash. This might still might be implemented
     3320         * using a hash or round robin etc. under the hood depending on the format
     3321         */
     3322        HASHER_BALANCE,
     3323
     3324        /** Use a hash which is bi-directional for TCP flows, that is packets with
     3325         * the same hash are sent to the same thread. All non TCP packets will be
     3326         * sent to the same thread. UDP may or may not be sent to separate
     3327         * threads like TCP, this depends on the format support.
     3328         */
     3329        HASHER_BIDIRECTIONAL,
     3330       
     3331        /**
     3332         * Use a hash which is uni-directional across TCP flows, that means the
     3333         * opposite directions of the same 5 tuple might end up on separate cores.
     3334         * Otherwise is identical to HASHER_BIDIRECTIONAL
     3335         */
     3336        HASHER_UNIDIRECTIONAL,
     3337
     3338        /**
     3339         * Always use the user supplied hasher, this currently disables native
     3340         * support and is likely significantly slower.
     3341         */
     3342        HASHER_CUSTOM,
     3343
     3344        /**
     3345         * This is not a valid option, used internally only!!! TODO remove
     3346         * Set by the format if the hashing is going to be done in hardware
     3347         */
     3348        HASHER_HARDWARE
     3349};
     3350
     3351typedef struct libtrace_info_t {
     3352        /**
     3353         * True if a live format (i.e. packets have to be tracetime).
     3354         * Otherwise false, indicating packets can be read as fast
     3355         * as possible from the format.
     3356         */
     3357        bool live;
     3358
     3359        /**
     3360         * The maximum number of threads supported by a parallel trace. 1
     3361         * if parallel support is not native (in this case libtrace will simulate
     3362         * an unlimited number of threads), -1 means unlimited and 0 unknown.
     3363         */
     3364        int max_threads;
     3365
     3366        /* TODO hash fn supported list */
     3367
     3368        /* TODO consider time/clock details?? */
     3369} libtrace_info_t;
     3370
     3371DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value);
     3372DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3373DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3374DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data);
     3375DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace);
     3376
    31493377#ifdef __cplusplus
    31503378} /* extern "C" */
  • lib/libtrace_int.h

    r10f924c r50ce607  
    148148#endif
    149149
     150#include "data-struct/ring_buffer.h"
     151#include "data-struct/vector.h"
     152#include "data-struct/message_queue.h"
     153#include "data-struct/deque.h"
     154#include "data-struct/sliding_window.h"
    150155
    151156//#define RP_BUFSIZE 65536U
     
    166171        bool waiting;
    167172};
     173
     174enum thread_types {
     175        THREAD_EMPTY,
     176        THREAD_HASHER,
     177        THREAD_PERPKT,
     178        THREAD_REDUCER,
     179        THREAD_KEEPALIVE
     180};
     181
     182enum thread_states {
     183        THREAD_RUNNING,
     184        THREAD_FINISHING,
     185        THREAD_FINISHED,
     186        THREAD_PAUSED,
     187        THREAD_STATE_MAX
     188};
     189
     190// Reduce expects sequential data
     191#define REDUCE_SEQUENTIAL 0x1
     192// Reduce is working on ordered data
     193#define REDUCE_ORDERED 0x2
     194// Reduce should sort the data
     195#define REDUCE_SORT 0x4
     196// Drop out of order valid with
     197#define REDUCE_DROP_OOO 0x8
     198// Reduce reads all queues with same key
     199#define REDUCE_STEPPING 0x10
     200
     201/**
     202 * Information of this thread
     203 */
     204struct libtrace_thread_t {
     205        libtrace_t * trace;
     206        void* ret;
     207        enum thread_types type;
     208        enum thread_states state;
     209        void* user_data; // TLS for the user to use
     210        void* format_data; // TLS for the format to use
     211        pthread_t tid;
     212        int perpkt_num; // A number from 0-X that represents this perpkt threads number
     213                                // in the table, intended to quickly identify this thread
     214                                // -1 represents NA (such as the case this is not a perpkt thread)
     215        libtrace_ringbuffer_t rbuffer; // Input
     216        libtrace_vector_t vector; // Output
     217        libtrace_queue_t deque; // Real Output type makes more sense
     218        libtrace_message_queue_t messages; // Message handling
     219        // Temp storage for time sensitive results
     220        uint64_t tmp_key;
     221        void *tmp_data;
     222        pthread_spinlock_t tmp_spinlock;
     223        // Set to true once the first packet has been stored
     224        bool recorded_first;
     225        // For thread safety reason we actually must store this here
     226        int64_t tracetime_offset_usec;
     227};
     228
     229/**
     230 * Storage to note time value against each.
     231 * Used both internally to do trace time playback
     232 * and can be used externally to assist applications which need
     233 * a trace starting time such as tracertstats.
     234 */
     235struct first_packets {
     236        pthread_spinlock_t lock;
     237        size_t count; // If == perpkt_thread_count threads we have all
     238        size_t first; // Valid if count != 0
     239        struct __packet_storage_magic_type {
     240                libtrace_packet_t * packet;
     241                struct timeval tv;
     242        } * packets;
     243};
     244
     245#define TRACE_STATES \
     246        X(STATE_NEW) \
     247        X(STATE_RUNNING) \
     248        X(STATE_PAUSING) \
     249        X(STATE_PAUSED) \
     250        X(STATE_FINSHED) \
     251        X(STATE_DESTROYED) \
     252        X(STATE_JOINED) \
     253        X(STATE_ERROR)
     254
     255#define X(a) a,
     256enum trace_state {
     257        TRACE_STATES
     258};
     259#undef X
     260
     261#define X(a) case a: return #a;
     262static inline char *get_trace_state_name(enum trace_state ts){
     263        switch(ts) {
     264                TRACE_STATES
     265                default:
     266                        return "UNKNOWN";
     267        }
     268}
     269#undef X
    168270
    169271/** A libtrace input trace
     
    188290        uint64_t filtered_packets;     
    189291        /** The filename from the uri for the trace */
    190         char *uridata;                 
     292        char *uridata;
    191293        /** The libtrace IO reader for this trace (if applicable) */
    192         io_t *io;                       
     294        io_t *io;
    193295        /** Error information for the trace */
    194         libtrace_err_t err;             
     296        libtrace_err_t err;
    195297        /** Boolean flag indicating whether the trace has been started */
    196         bool started;                   
     298        bool started;
     299        /** Synchronise writes/reads across this format object and attached threads etc */
     300        pthread_mutex_t libtrace_lock;
     301        /** State */
     302        enum trace_state state;
     303        /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */
     304        pthread_cond_t perpkt_cond;
     305        /* Keep track of counts of threads in any given state */
     306        int perpkt_thread_states[THREAD_STATE_MAX];
     307
     308        /** For the sliding window hasher implementation */
     309        pthread_rwlock_t window_lock;
     310        /** Set once trace_join has been called */
     311        bool joined;
     312        /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */
     313        bool perpkt_queue_full;
     314        /** Global storage for this trace, shared among all the threads  */
     315        void* global_blob;
     316        /** Requested size of the pkt buffer (currently only used if using dedicated hasher thread) */
     317        int packet_freelist_size;
     318        /** The actual freelist */
     319        libtrace_ringbuffer_t packet_freelist;
     320        /** The number of packets that can queue per thread - XXX consider deadlocks with non malloc()'d packets that need to be released */
     321        int perpkt_buffer_size;
     322        /** The reducer flags */
     323        int reducer_flags;
     324        /** The tick interval - in milliseconds (0 or -ve==disabled) */
     325        int tick_interval;
     326        /** Used to track the next expected key */
     327        uint64_t expected_key;
     328        /** User defined per_pkt function called when a pkt is ready */
     329        fn_per_pkt per_pkt;
     330        /** User defined reducer function entry point XXX not hooked up */
     331        fn_reducer reducer;
     332        /** The hasher function */
     333        enum hasher_types hasher_type;
     334        /** The hasher function - NULL implies they don't care or balance */
     335        fn_hasher hasher; // If valid using a separate thread
     336        void *hasher_data;
     337       
     338        libtrace_thread_t hasher_thread;
     339        libtrace_thread_t reducer_thread;
     340        libtrace_thread_t keepalive_thread;
     341        int perpkt_thread_count;
     342        libtrace_thread_t * perpkt_threads; // All our perpkt threads
     343        libtrace_slidingwindow_t sliding_window;
     344        sem_t sem;
     345        // Used to keep track of the first packet seen on each thread
     346        struct first_packets first_packets;
     347        int tracetime;
    197348};
     349
     350void trace_fin_packet(libtrace_packet_t *packet);
     351void libtrace_zero_thread(libtrace_thread_t * t);
     352void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
     353libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
     354int get_thread_table_num(libtrace_t *libtrace);
     355
    198356
    199357/** A libtrace output trace
     
    202360struct libtrace_out_t {
    203361        /** The capture format for the output trace */
    204         struct libtrace_format_t *format;
     362        struct libtrace_format_t *format;
    205363        /** Pointer to the "global" data for the capture format module */
    206364        void *format_data;             
     
    210368        libtrace_err_t err;
    211369        /** Boolean flag indicating whether the trace has been started */
    212         bool started;                   
     370        bool started;
    213371};
    214372
     
    303461} PACKED libtrace_pflog_header_t;
    304462
    305 
    306 
    307463/** A libtrace capture format module */
    308464/* All functions should return -1, or NULL on failure */
     
    734890        /** Prints some useful help information to standard output. */
    735891        void (*help)(void);
    736 
     892       
    737893        /** Next pointer, should always be NULL - used by the format module
    738894         * manager. */
    739895        struct libtrace_format_t *next;
     896
     897        /** Holds information about the trace format */
     898        struct libtrace_info_t info;
     899
     900        /** Starts or unpauses an input trace in parallel mode - note that
     901         * this function is often the one that opens the file or device for
     902         * reading.
     903         *
     904         * @param libtrace      The input trace to be started or unpaused
     905         * @return If successful the number of threads started, 0 indicates
     906         *                 no threads started and this should be done automatically.
     907         *                 Otherwise in event of an error -1 is returned.
     908         *
     909         */
     910        int (*pstart_input)(libtrace_t *trace);
     911       
     912        /** Read a packet in the new parallel mode
     913         * @return same as read_packet, with the addition of return -2 to represent
     914         * interrupted due to message waiting. */
     915        int (*pread_packet)(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t *packet);
     916       
     917        /** Pause a parallel trace
     918         *
     919         * @param libtrace      The input trace to be paused
     920         */
     921        int (*ppause_input)(libtrace_t *trace);
     922       
     923        /** Called after all threads have been paused, Finish (close) a parallel trace
     924     *
     925         * @param libtrace      The input trace to be stopped
     926         */
     927        int (*pfin_input)(libtrace_t *trace);
     928       
     929        /** Applies a configuration option to an input trace.
     930         *
     931         * @param libtrace      The input trace to apply the option to
     932         * @param option        The option that is being configured
     933         * @param value         A pointer to the value that the option is to be
     934         *                      set to
     935         * @return 0 if successful, -1 if the option is unsupported or an error
     936         * occurs
     937         */
     938        int (*pconfig_input)(libtrace_t *libtrace,trace_parallel_option_t option,void *value);
     939
     940        /**
     941         * Register a thread for use with the format or using the packets produced
     942         * by it. This is NOT only used for threads reading packets infact all
     943         * threads use this.
     944         *
     945         * Some use cases include setting up any thread local storage required for
     946         * to read packets and free packets. For DPDK we require any thread that
     947         * may release or read a packet to have have an internal number associated
     948         * with it.
     949         *
     950         * The thread type can be used to see if this thread is going to be used
     951         * to read packets or otherwise.
     952         *
     953         * @return 0 if successful, -1 if the option is unsupported or an error
     954         * occurs (such as a maximum of threads being reached)
     955         */
     956        int (*pregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t, bool reader);
     957
     958        /**
     959         * If needed any memory allocated with pregister_thread can be released
     960         * in this function. The thread will be destroyed directly after this
     961         * function is called.
     962         */
     963        void (*punregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t);
    740964};
     965
     966/** Macro to zero out a single thread format */
     967#define NON_PARALLEL(live) \
     968{live, 1},              /* trace info */ \
     969NULL,                   /* pstart_input */ \
     970NULL,                   /* pread_packet */ \
     971NULL,                   /* ppause_input */ \
     972NULL,                   /* pfin_input */ \
     973NULL,                   /* pconfig_input */ \
     974NULL,                   /* pregister_thread */ \
     975NULL                    /* punregister_thread */
    741976
    742977/** The list of registered capture formats */
  • lib/trace.c

    rf7bcbfb rc99b1e5  
    9999#include "rt_protocol.h"
    100100
     101#include <pthread.h>
     102#include <signal.h>
     103
    101104#define MAXOPTS 1024
    102105
     
    106109
    107110int libtrace_halt = 0;
     111
     112/* Set once pstart is called used for backwards compatibility reasons */
     113int libtrace_parallel = 0;
    108114
    109115/* strncpy is not assured to copy the final \0, so we
     
    253259        libtrace->filtered_packets = 0;
    254260        libtrace->accepted_packets = 0;
     261       
     262        /* Parallel inits */
     263        // libtrace->libtrace_lock
     264        // libtrace->perpkt_cond;
     265        libtrace->state = STATE_NEW;
     266        libtrace->perpkt_queue_full = false;
     267        libtrace->reducer_flags = 0;
     268        libtrace->global_blob = NULL;
     269        libtrace->per_pkt = NULL;
     270        libtrace->reducer = NULL;
     271        libtrace->hasher = NULL;
     272        libtrace->packet_freelist_size = 0;
     273        libtrace->perpkt_buffer_size = 0;
     274        libtrace->expected_key = 0;
     275        libtrace_zero_ringbuffer(&libtrace->packet_freelist);
     276        libtrace_zero_thread(&libtrace->hasher_thread);
     277        libtrace_zero_thread(&libtrace->reducer_thread);
     278        libtrace_zero_thread(&libtrace->keepalive_thread);
     279        libtrace_zero_slidingwindow(&libtrace->sliding_window);
     280        libtrace->reducer_thread.type = THREAD_EMPTY;
     281        libtrace->perpkt_thread_count = 0;
     282        libtrace->perpkt_threads = NULL;
     283        libtrace->tracetime = 0;
     284        libtrace->tick_interval = 0;
     285        libtrace->first_packets.first = 0;
     286        libtrace->first_packets.count = 0;
     287        libtrace->first_packets.packets = NULL;
    255288
    256289        /* Parse the URI to determine what sort of trace we are dealing with */
     
    348381        libtrace->io = NULL;
    349382        libtrace->filtered_packets = 0;
     383       
     384        /* Parallel inits */
     385        // libtrace->libtrace_lock
     386        // libtrace->perpkt_cond;
     387        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
     388        libtrace->perpkt_queue_full = false;
     389        libtrace->reducer_flags = 0;
     390        libtrace->global_blob = NULL;
     391        libtrace->per_pkt = NULL;
     392        libtrace->reducer = NULL;
     393        libtrace->hasher = NULL;
     394        libtrace->expected_key = 0;
     395        libtrace->packet_freelist_size = 0;
     396        libtrace->perpkt_buffer_size = 0;
     397        libtrace_zero_ringbuffer(&libtrace->packet_freelist);
     398        libtrace_zero_thread(&libtrace->hasher_thread);
     399        libtrace_zero_thread(&libtrace->reducer_thread);
     400        libtrace_zero_thread(&libtrace->keepalive_thread);
     401        libtrace_zero_slidingwindow(&libtrace->sliding_window);
     402        libtrace->reducer_thread.type = THREAD_EMPTY;
     403        libtrace->perpkt_thread_count = 0;
     404        libtrace->perpkt_threads = NULL;
     405        libtrace->tracetime = 0;
     406        libtrace->tick_interval = 0;
    350407       
    351408        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    583640 */
    584641DLLEXPORT void trace_destroy(libtrace_t *libtrace) {
    585         assert(libtrace);
     642    int i;
     643        assert(libtrace);
     644
     645        /* destroy any packet that are still around */
     646        if (libtrace->state != STATE_NEW && libtrace->first_packets.packets) {
     647                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     648                        if(libtrace->first_packets.packets[i].packet) {
     649                                trace_destroy_packet(libtrace->first_packets.packets[i].packet);
     650                        }
     651                }
     652                free(libtrace->first_packets.packets);
     653                assert(pthread_spin_destroy(&libtrace->first_packets.lock) == 0);
     654        }
     655
    586656        if (libtrace->format) {
    587657                if (libtrace->started && libtrace->format->pause_input)
     
    590660                        libtrace->format->fin_input(libtrace);
    591661        }
    592         /* Need to free things! */
    593         if (libtrace->uridata)
     662        /* Need to free things! */
     663        if (libtrace->uridata)
    594664                free(libtrace->uridata);
     665       
     666        /* Empty any packet memory */
     667        if (libtrace->state != STATE_NEW) {
     668                libtrace_packet_t * packet;
     669                while (libtrace_ringbuffer_try_read(&libtrace->packet_freelist,(void **) &packet))
     670                        trace_destroy_packet(packet);
     671               
     672                libtrace_ringbuffer_destroy(&libtrace->packet_freelist);
     673               
     674                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     675                        assert (libtrace_vector_get_size(&libtrace->perpkt_threads[i].vector) == 0);
     676                        libtrace_vector_destroy(&libtrace->perpkt_threads[i].vector);
     677                }
     678                free(libtrace->perpkt_threads);
     679                libtrace->perpkt_threads = NULL;
     680                libtrace->perpkt_thread_count = 0;
     681        }
     682       
    595683        if (libtrace->event.packet) {
    596684                /* Don't use trace_destroy_packet here - there is almost
     
    605693                 free(libtrace->event.packet);
    606694        }
    607         free(libtrace);
     695        free(libtrace);
    608696}
    609697
     
    661749        dest->type=packet->type;
    662750        dest->buf_control=TRACE_CTRL_PACKET;
     751        dest->order = packet->order;
    663752        /* Reset the cache - better to recalculate than try to convert
    664753         * the values over to the new packet */
     
    675764 */
    676765DLLEXPORT void trace_destroy_packet(libtrace_packet_t *packet) {
     766        /* Free any resources possibly associated with the packet */
     767        if (libtrace_parallel && packet->trace && packet->trace->format->fin_packet) {
     768                packet->trace->format->fin_packet(packet);
     769        }
     770       
    677771        if (packet->buf_control == TRACE_CTRL_PACKET && packet->buffer) {
    678772                free(packet->buffer);
     
    683777                                 */
    684778        free(packet);
    685 }       
     779}
     780
     781/**
     782 * Removes any possible data stored againt the trace and releases any data.
     783 * This will not destroy a reusable good malloc'd buffer (TRACE_CTRL_PACKET)
     784 * use trace_destroy_packet() for those diabolical purposes.
     785 */
     786void trace_fin_packet(libtrace_packet_t *packet) {
     787        if (packet)
     788        {
     789                if (packet->trace && packet->trace->format->fin_packet) {
     790                        packet->trace->format->fin_packet(packet);
     791                        //gettimeofday(&tv, NULL);
     792                        //printf ("%d.%06d DESTROYED #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
     793                }
     794
     795                // No matter what we remove the header and link pointers
     796                packet->trace = NULL;
     797                packet->header = NULL;
     798                packet->payload = NULL;
     799
     800                if (packet->buf_control != TRACE_CTRL_PACKET)
     801                {
     802                        packet->buffer = NULL;
     803                }
     804
     805                packet->trace = NULL;
     806                packet->hash = 0;
     807                packet->order = 0;
     808                trace_clear_cache(packet);
     809        }
     810}
    686811
    687812/* Read one packet from the trace into buffer. Note that this function will
     
    707832        }
    708833        assert(packet);
    709      
    710         /* Store the trace we are reading from into the packet opaque
    711          * structure */
    712         packet->trace = libtrace;
    713 
    714         /* Finalise the packet, freeing any resources the format module
    715          * may have allocated it
    716          */
    717         if (libtrace->format->fin_packet) {
    718                 libtrace->format->fin_packet(packet);
    719         }
    720 
    721834
    722835        if (libtrace->format->read_packet) {
    723836                do {
    724837                        size_t ret;
    725                         /* Clear the packet cache */
    726                         trace_clear_cache(packet);
     838                        /* Finalise the packet, freeing any resources the format module
     839                         * may have allocated it and zeroing all data associated with it.
     840                         */
     841                        trace_fin_packet(packet);
     842                        /* Store the trace we are reading from into the packet opaque
     843                         * structure */
     844                        packet->trace = libtrace;
    727845                        ret=libtrace->format->read_packet(libtrace,packet);
    728846                        if (ret==(size_t)-1 || ret==0) {
     
    743861                                                libtrace->snaplen);
    744862                        }
     863                        trace_packet_set_order(packet, libtrace->accepted_packets);
    745864                        ++libtrace->accepted_packets;
    746865                        return ret;
     
    9461065        }
    9471066
    948         return tv;
     1067    return tv;
    9491068}
    9501069
     
    11991318                libtrace_linktype_t linktype    ) {
    12001319#ifdef HAVE_BPF_FILTER
     1320        /* It just so happens that the underlying libs used by pthread arn't
     1321         * thread safe, namely lex/flex thingys, so single threaded compile
     1322         * multi threaded running should be safe.
     1323         */
     1324        static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    12011325        assert(filter);
    12021326
     
    12201344                                        "Unknown pcap equivalent linktype");
    12211345                        return -1;
     1346                }
     1347                assert (pthread_mutex_lock(&mutex) == 0);
     1348                /* Make sure not one bet us to this */
     1349                if (filter->flag) {
     1350                        printf("Someone bet us to compile the filter\n");
     1351                        assert (pthread_mutex_unlock(&mutex) == 0);
     1352                        return 1;
    12221353                }
    12231354                pcap=(pcap_t *)pcap_open_dead(
     
    12331364                                        pcap_geterr(pcap));
    12341365                        pcap_close(pcap);
     1366                        assert (pthread_mutex_unlock(&mutex) == 0);
    12351367                        return -1;
    12361368                }
    12371369                pcap_close(pcap);
    12381370                filter->flag=1;
     1371                assert (pthread_mutex_unlock(&mutex) == 0);
    12391372        }
    12401373        return 0;
     
    12561389        libtrace_linktype_t linktype;
    12571390        libtrace_packet_t *packet_copy = (libtrace_packet_t*)packet;
     1391#ifdef HAVE_LLVM
     1392        static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
     1393#endif
    12581394
    12591395        assert(filter);
     
    13061442         * what the link type was
    13071443         */
     1444        // Note internal mutex locking used here
    13081445        if (trace_bpf_compile(filter,packet_copy,linkptr,linktype)==-1) {
    13091446                if (free_packet_needed) {
     
    13161453#if HAVE_LLVM
    13171454        if (!filter->jitfilter) {
    1318                 filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
     1455                assert(pthread_mutex_lock(&mutex) == 0);
     1456                /* Again double check here like the bpf filter */
     1457                if(filter->jitfilter)
     1458                        printf("Someone bet us to compile the JIT thingy\n");
     1459                else
     1460                /* Looking at compile_program source this appears to be thread safe
     1461                 * however if this gets called twice we will leak this memory :(
     1462                 * as such lock here anyways */
     1463                        filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
     1464                assert(pthread_mutex_unlock(&mutex) == 0);
    13191465        }
    13201466#endif
  • test/Makefile

    r262a093 r262a093  
    55
    66INCLUDE = -I$(PREFIX)/lib -I$(PREFIX)/libpacketdump
    7 CFLAGS = -Wall -Wimplicit -Wformat -W -pedantic -pipe -g -O2 \
     7CFLAGS = -Wall -Wimplicit -Wformat -W -pedantic -pipe -g -O2 -std=gnu99 -pthread \
    88                -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE -D_LARGEFILE64_SOURCE
    99CFLAGS += $(INCLUDE)
     
    1111LDLIBS = -L$(PREFIX)/lib/.libs -L$(PREFIX)/libpacketdump/.libs -ltrace -lpacketdump
    1212
     13BINS_DATASTRUCT = test-datastruct-vector test-datastruct-deque \
     14        test-datastruct-ringbuffer
     15BINS_PARALLEL = test-format-parallel test-format-parallel-hasher \
     16        test-format-parallel-singlethreaded test-format-parallel-stressthreads \
     17        test-format-parallel-singlethreaded-hasher
     18
    1319BINS = test-pcap-bpf test-event test-time test-dir test-wireless test-errors \
    14         test-plen test-autodetect test-ports test-fragment test-live test-live-snaplen
     20        test-plen test-autodetect test-ports test-fragment test-live test-live-snaplen \
     21        $(BINS_DATASTRUCT) $(BINS_PARALLEL)
    1522
    1623.PHONY: all clean distclean install depend test
     
    1926
    2027clean:
    21         $(RM) $(BINS) $(OBJS) test-format  test-decode test-convert \
     28        $(RM) $(BINS) $(OBJS) test-format test-decode test-convert \
    2229        test-decode2 test-write test-drops test-convert2
    2330
  • tools/traceanon/Makefile.am

    rc0a5a50 r29bbef0  
    1 bin_PROGRAMS = traceanon
     1bin_PROGRAMS = traceanon traceanon_parallel
    22
    33man_MANS = traceanon.1
     
    66include ../Makefile.tools
    77traceanon_SOURCES = traceanon.c rijndael.h rijndael.c panon.h panon.c ipenc.c ipenc.h
     8traceanon_parallel_SOURCES = traceanon_parallel.c rijndael.h rijndael.c panon.h panon.c ipenc.c ipenc.h
    89
    910# rijndael.c does lots of nasty casting that is going to be a nightmare to fix
     
    1112# messy and hopefully isn't actually an issue.
    1213traceanon_CFLAGS = $(AM_CFLAGS)
     14traceanon_parallel_CFLAGS = $(AM_CFLAGS)
  • tools/traceanon/panon.c

    ra3041a4 r29bbef0  
    88#include "panon.h"
    99
    10 static uint8_t m_key[16];
    11 static uint8_t m_pad[16];
     10static __thread uint8_t m_key[16];
     11static __thread  uint8_t m_pad[16];
    1212
    1313#define CACHEBITS 20
     
    1616//static uint32_t enc_cache[CACHESIZE];
    1717
    18 static uint32_t *enc_cache = 0;
    19 static uint32_t fullcache[2][2];
     18static __thread  uint32_t *enc_cache = 0; // Should be ok shared across multiple
     19static __thread  uint32_t fullcache[2][2]; // Needs to be against on thread
    2020
    2121
  • tools/tracertstats/Makefile.am

    r530bcf0 r29bbef0  
    1 bin_PROGRAMS = tracertstats
     1bin_PROGRAMS = tracertstats tracertstats_parallel
    22man_MANS = tracertstats.1
    33EXTRA_DIST = $(man_MANS)
     
    1616tracertstats_SOURCES = tracertstats.c output.h output.c $(OUTPUT_MODULES)
    1717tracertstats_LDADD = -ltrace $(OUTPUT_PNG_LD)
     18tracertstats_parallel_SOURCES = tracertstats_parallel.c output.h output.c $(OUTPUT_MODULES)
     19tracertstats_parallel_LDADD = -ltrace $(OUTPUT_PNG_LD)
  • tools/tracestats/Makefile.am

    r3b8a5ef r29bbef0  
    1 bin_PROGRAMS = tracestats
     1bin_PROGRAMS = tracestats tracestats_parallel
    22bin_SCRIPTS = tracesummary
    33
     
    77include ../Makefile.tools
    88tracestats_SOURCES = tracestats.c
     9tracestats_parallel_SOURCES = tracestats_parallel.c
Note: See TracChangeset for help on using the changeset viewer.