Changes in / [ea602cd:9e429e8]


Ignore:
Files:
34 added
23 edited

Legend:

Unmodified
Added
Removed
  • README

    ra7c8f4a ra7c8f4a  
     1This fork of Libtrace aims to support parallel packet processing.
     2
     3This is still work in progress and is full of bugs, some of the original
     4Libtrace functions might not function correctly breaking the supplied tools.
     5
    16libtrace 3.0.21
    27
  • lib/Makefile.am

    re5dedd5 r2498008  
    22include_HEADERS = libtrace.h dagformat.h lt_inttypes.h daglegacy.h rt_protocol.h erftypes.h 
    33
    4 AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@
    5 AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@
     4AM_CFLAGS=@LIBCFLAGS@ @CFLAG_VISIBILITY@ -pthread
     5AM_CXXFLAGS=@LIBCXXFLAGS@ @CFLAG_VISIBILITY@ -pthread
    66
    77extra_DIST = format_template.c
     
    4444endif
    4545
    46 libtrace_la_SOURCES = trace.c common.h \
     46libtrace_la_SOURCES = trace.c trace_parallel.c common.h \
    4747                format_erf.c format_pcap.c format_legacy.c \
    4848                format_rt.c format_helper.c format_helper.h format_pcapfile.c \
     
    5656                $(DAGSOURCE) format_erf.h \
    5757                $(BPFJITSOURCE) \
    58                 libtrace_arphrd.h
     58                libtrace_arphrd.h \
     59                data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \
     60                data-struct/deque.c data-struct/sliding_window.c data-struct/object_cache.c \
     61                hash_toeplitz.c combiner_ordered.c combiner_sorted.c combiner_unordered.c
    5962
    6063if DAG2_4
  • lib/format_atmhdr.c

    r5952ff0 rb13b939  
    231231        trace_event_trace,              /* trace_event */
    232232        NULL,                           /* help */
    233         NULL                            /* next pointer */
     233        NULL,                            /* next pointer */
     234        NON_PARALLEL(false)
    234235};
    235236       
  • lib/format_bpf.c

    r08f5060 r08f5060  
    614614        trace_event_device,     /* trace_event */
    615615        bpf_help,               /* help */
    616         NULL
     616        NULL,                   /* next pointer */
     617        NON_PARALLEL(true)
    617618};
    618619#else   /* HAVE_DECL_BIOCSETIF */
     
    663664        NULL,                   /* trace_event */
    664665        bpf_help,               /* help */
    665         NULL
     666        NULL,                   /* next pointer */
     667        NON_PARALLEL(true)
    666668};
    667669#endif  /* HAVE_DECL_BIOCSETIF */
  • lib/format_dag24.c

    rc70f59f rc70f59f  
    558558        trace_event_dag,                /* trace_event */
    559559        dag_help,                       /* help */
    560         NULL                            /* next pointer */
     560        NULL,                            /* next pointer */
     561    NON_PARALLEL(true)
    561562};
    562563
  • lib/format_dag25.c

    r51d1f64 r51d1f64  
    12571257        trace_event_dag,                /* trace_event */
    12581258        dag_help,                       /* help */
    1259         NULL                            /* next pointer */
     1259        NULL, /* pstart_input */
     1260        NULL, /* pread_packet */
     1261        NULL, /* ppause_input */
     1262        NULL, /* pfin_input */
     1263        NULL, /* pconfig_input */
     1264        NULL,                            /* next pointer */
     1265        NON_PARALLEL(true)
    12601266};
    12611267
  • lib/format_dpdk.c

    r9f43919 r9f43919  
    4141 */
    4242
     43#define _GNU_SOURCE
     44
    4345#include "config.h"
    4446#include "libtrace.h"
     
    4648#include "format_helper.h"
    4749#include "libtrace_arphrd.h"
     50#include "hash_toeplitz.h"
    4851
    4952#ifdef HAVE_INTTYPES_H
     
    124127#include <rte_mempool.h>
    125128#include <rte_mbuf.h>
     129#include <rte_launch.h>
     130#include <rte_lcore.h>
     131#include <rte_per_lcore.h>
     132#include <pthread.h>
    126133
    127134/* The default size of memory buffers to use - This is the max size of standard
     
    159166#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    160167#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     168#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     169
    161170#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    162171                        (uint64_t) tv.tv_usec*1000ull)
     
    181190
    182191/* Print verbose messages to stdout */
    183 #define DEBUG 0
     192#define DEBUG 1
    184193
    185194/* Use clock_gettime() for nanosecond resolution rather than gettimeofday()
     
    228237    DPDK_RUNNING,
    229238    DPDK_PAUSED,
     239};
     240
     241struct dpdk_per_lcore_t
     242{
     243        // TODO move time stamp stuff here
     244        uint16_t queue_id;
     245        uint8_t port;
    230246};
    231247
     
    248264#endif
    249265    char mempool_name[MEMPOOL_NAME_LEN]; /* The name of the mempool that we are using */
     266    uint8_t rss_key[40]; // This is the RSS KEY
    250267#if HAS_HW_TIMESTAMPS_82580
    251268    /* Timestamping only relevent to RX */
     
    254271    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    255272#endif
     273        // DPDK normally seems to have a limit of 8 queues for a given card
     274        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
    256275};
    257276
     
    447466
    448467/**
     468 * Expects to be called from the master lcore and moves it to the given dpdk id
     469 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     470 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     471 *               and not already in use.
     472 * @return 0 is successful otherwise -1 on error.
     473 */
     474static inline int dpdk_move_master_lcore(size_t core) {
     475    struct rte_config *cfg = rte_eal_get_configuration();
     476    cpu_set_t cpuset;
     477    int i;
     478
     479    assert (core < RTE_MAX_LCORE);
     480    assert (rte_get_master_lcore() == rte_lcore_id());
     481
     482    if (core == rte_lcore_id())
     483        return 0;
     484
     485    // Make sure we are not overwriting someone else
     486    assert(!rte_lcore_is_enabled(core));
     487
     488    // Move the core
     489    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     490    cfg->lcore_role[core] = ROLE_RTE;
     491    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     492    rte_eal_get_configuration()->master_lcore = core;
     493    RTE_PER_LCORE(_lcore_id) = core;
     494
     495    // Now change the affinity
     496    CPU_ZERO(&cpuset);
     497
     498    if (lcore_config[core].detected) {
     499        CPU_SET(core, &cpuset);
     500    } else {
     501        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     502            if (lcore_config[i].detected)
     503                CPU_SET(i, &cpuset);
     504        }
     505    }
     506
     507    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     508    if (i != 0) {
     509        // TODO proper libtrace style error here!!
     510        fprintf(stderr, "pthread_setaffinity_np failed\n");
     511        return -1;
     512    }
     513    return 0;
     514}
     515
     516
     517/**
    449518 * XXX This is very bad XXX
    450519 * But we have to do something to allow getopts nesting
     
    477546                                        char * err, int errlen) {
    478547    int ret; /* Returned error codes */
    479     struct rte_pci_addr use_addr; /* The only address that we don't blacklist */
     548    struct rte_pci_addr use_addr; /* The only address that we don't blacklist */   
    480549    char cpu_number[10] = {0}; /* The CPU mask we want to bind to */
    481550    char mem_map[20] = {0}; /* The memory name */
    482551    long nb_cpu; /* The number of CPUs in the system */
    483552    long my_cpu; /* The CPU number we want to bind to */
     553    int i;
     554    struct rte_config *cfg = rte_eal_get_configuration();
    484555        struct saved_getopts save_opts;
    485556   
     
    490561#endif
    491562    /*
     563     * Using unique file prefixes mean separate memory is used, unlinking
     564     * the two processes. However be careful we still cannot access a
     565     * port that already in use.
     566     *
    492567     * Using unique file prefixes mean separate memory is used, unlinking
    493568     * the two processes. However be careful we still cannot access a
     
    497572                "--file-prefix", mem_map, "-m", "256", NULL};
    498573    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    499 
     574   
    500575    /* This initialises the Environment Abstraction Layer (EAL)
    501576     * If we had slave workers these are put into WAITING state
     
    540615    }
    541616
    542     /* Make our mask */
    543     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
     617    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
     618       info older versions */
     619    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     620    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    544621
    545622
     
    556633    }
    557634        restore_getopts(&save_opts);
     635    // These are still running but will never do anything with DPDK v1.7 we
     636    // should remove this XXX in the future
     637    for(i = 0; i < RTE_MAX_LCORE; ++i) {
     638        if (rte_lcore_is_enabled(i) && i != rte_get_master_lcore()) {
     639            cfg->lcore_role[i] = ROLE_OFF;
     640            cfg->lcore_count--;
     641        }
     642    }
     643    // Only the master should be running
     644    assert(cfg->lcore_count == 1);
     645
     646    dpdk_move_master_lcore(my_cpu-1);
    558647
    559648#if DEBUG
     
    572661#endif
    573662
    574     /* Blacklist all ports besides the one that we want to use */
     663    /* Black list all ports besides the one that we want to use */
    575664        if ((ret = blacklist_devices(format_data, &use_addr)) < 0) {
    576665                snprintf(err, errlen, "Intel DPDK - Whitelisting PCI device failed,"
     
    594683        return -1;
    595684    }
     685   
     686    struct rte_eth_dev_info dev_info;
     687    rte_eth_dev_info_get(0, &dev_info);
     688    printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",
     689                (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues);
    596690
    597691    return 0;
     
    622716    FORMAT(libtrace)->wrap_count = 0;
    623717#endif
    624 
     718       
    625719    if (dpdk_init_environment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
    626720        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     
    667761};
    668762
     763static int dpdk_pconfig_input (libtrace_t *libtrace,
     764                                trace_parallel_option_t option,
     765                                void *data) {
     766        switch (option) {
     767                case TRACE_OPTION_SET_HASHER:
     768                        switch (*((enum hasher_types *) data))
     769                        {
     770                                case HASHER_BALANCE:
     771                                case HASHER_UNIDIRECTIONAL:
     772                                        toeplitz_create_unikey(FORMAT(libtrace)->rss_key);
     773                                        return 0;
     774                                case HASHER_BIDIRECTIONAL:
     775                                        toeplitz_create_bikey(FORMAT(libtrace)->rss_key);
     776                                        return 0;
     777                                case HASHER_HARDWARE:
     778                                case HASHER_CUSTOM:
     779                                        // We don't support these
     780                                        return -1;
     781                        }
     782        break;
     783        }
     784        return -1;
     785}
    669786/**
    670787 * Note here snaplen excludes the MAC checksum. Packets over
     
    716833static struct rte_eth_conf port_conf = {
    717834        .rxmode = {
     835                .mq_mode = ETH_RSS,
    718836                .split_hdr_size = 0,
    719837                .header_split   = 0, /**< Header Split disabled */
     
    739857        .txmode = {
    740858                .mq_mode = ETH_DCB_NONE,
     859        },
     860        .rx_adv_conf = {
     861                .rss_conf = {
     862                        // .rss_key = &rss_key, // We set this per format
     863                        .rss_hf = ETH_RSS_IPV4_UDP | ETH_RSS_IPV6 | ETH_RSS_IPV4 | ETH_RSS_IPV4_TCP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
     864                },
    741865        },
    742866};
     
    863987     */
    864988   
     989   
     990    port_conf.rx_adv_conf.rss_conf.rss_key = format_data->rss_key;
     991   
    865992    /* This must be called first before another *eth* function
    866993     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
     
    9241051}
    9251052
     1053/* Attach memory to the port and start (or restart) the port/s.
     1054 */
     1055static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues){
     1056    int ret, i; /* Check return values for errors */
     1057    struct rte_eth_link link_info; /* Wait for link */
     1058   
     1059    /* Already started */
     1060    if (format_data->paused == DPDK_RUNNING)
     1061        return 0;
     1062
     1063    /* First time started we need to alloc our memory, doing this here
     1064     * rather than in environment setup because we don't have snaplen then */
     1065    if (format_data->paused == DPDK_NEVER_STARTED) {
     1066        if (format_data->snaplen == 0) {
     1067            format_data->snaplen = RX_MBUF_SIZE;
     1068            port_conf.rxmode.jumbo_frame = 0;
     1069            port_conf.rxmode.max_rx_pkt_len = 0;
     1070        } else {
     1071            /* Use jumbo frames */
     1072            port_conf.rxmode.jumbo_frame = 1;
     1073            port_conf.rxmode.max_rx_pkt_len = format_data->snaplen;
     1074        }
     1075
     1076        /* This is additional overhead so make sure we allow space for this */
     1077#if GET_MAC_CRC_CHECKSUM
     1078        format_data->snaplen += ETHER_CRC_LEN;
     1079#endif
     1080#if HAS_HW_TIMESTAMPS_82580
     1081        format_data->snaplen += sizeof(struct hw_timestamp_82580);
     1082#endif
     1083
     1084        /* Create the mbuf pool, which is the place our packets are allocated
     1085         * from - TODO figure out if there is is a free function (I cannot see one)
     1086         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
     1087         * allocate however that extra 1 packet is not used.
     1088         * (I assume <= vs < error some where in DPDK code)
     1089         * TX requires nb_tx_buffers + 1 in the case the queue is full
     1090         * so that will fill the new buffer and wait until slots in the
     1091         * ring become available.
     1092         */
     1093#if DEBUG
     1094    printf("Creating mempool named %s\n", format_data->mempool_name);
     1095#endif
     1096        format_data->pktmbuf_pool =
     1097            rte_mempool_create(format_data->mempool_name,
     1098                       format_data->nb_rx_buf*rx_queues + format_data->nb_tx_buf + 1,
     1099                       format_data->snaplen + sizeof(struct rte_mbuf)
     1100                                        + RTE_PKTMBUF_HEADROOM,
     1101                       8, sizeof(struct rte_pktmbuf_pool_private),
     1102                       rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL,
     1103                       0, MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET);
     1104
     1105        if (format_data->pktmbuf_pool == NULL) {
     1106            snprintf(err, errlen, "Intel DPDK - Initialisation of mbuf "
     1107                        "pool failed: %s", strerror(rte_errno));
     1108            return -1;
     1109        }
     1110    }
     1111   
     1112    /* ----------- Now do the setup for the port mapping ------------ */
     1113    /* Order of calls must be
     1114     * rte_eth_dev_configure()
     1115     * rte_eth_tx_queue_setup()
     1116     * rte_eth_rx_queue_setup()
     1117     * rte_eth_dev_start()
     1118     * other rte_eth calls
     1119     */
     1120   
     1121    /* This must be called first before another *eth* function
     1122     * 1 rx, 1 tx queue, port_conf sets checksum stripping etc */
     1123    ret = rte_eth_dev_configure(format_data->port, rx_queues, 1, &port_conf);
     1124    if (ret < 0) {
     1125        snprintf(err, errlen, "Intel DPDK - Cannot configure device port"
     1126                            " %"PRIu8" : %s", format_data->port,
     1127                            strerror(-ret));
     1128        return -1;
     1129    }
     1130#if DEBUG
     1131    printf("Doing dev configure\n");
     1132#endif
     1133    /* Initialise the TX queue a minimum value if using this port for
     1134     * receiving. Otherwise a larger size if writing packets.
     1135     */
     1136    ret = rte_eth_tx_queue_setup(format_data->port, format_data->queue_id,
     1137                        format_data->nb_tx_buf, SOCKET_ID_ANY, &tx_conf);
     1138    if (ret < 0) {
     1139        snprintf(err, errlen, "Intel DPDK - Cannot configure TX queue on port"
     1140                            " %"PRIu8" : %s", format_data->port,
     1141                            strerror(-ret));
     1142        return -1;
     1143    }
     1144   
     1145    for (i=0; i < rx_queues; i++) {
     1146#if DEBUG
     1147    printf("Doing queue configure\n");
     1148#endif 
     1149                /* Initialise the RX queue with some packets from memory */
     1150                ret = rte_eth_rx_queue_setup(format_data->port, i,
     1151                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
     1152                                                                &rx_conf, format_data->pktmbuf_pool);
     1153        /* Init per_thread data structures */
     1154        format_data->per_lcore[i].port = format_data->port;
     1155        format_data->per_lcore[i].queue_id = i;
     1156
     1157                if (ret < 0) {
     1158                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     1159                                                " %"PRIu8" : %s", format_data->port,
     1160                                                strerror(-ret));
     1161                        return -1;
     1162                }
     1163        }
     1164   
     1165#if DEBUG
     1166    fprintf(stderr, "Doing start device\n");
     1167#endif 
     1168    /* Start device */
     1169    ret = rte_eth_dev_start(format_data->port);
     1170#if DEBUG
     1171    fprintf(stderr, "Done start device\n");
     1172#endif 
     1173    if (ret < 0) {
     1174        snprintf(err, errlen, "Intel DPDK - rte_eth_dev_start failed : %s",
     1175                    strerror(-ret));
     1176        return -1;
     1177    }
     1178
     1179
     1180    /* Default promiscuous to on */
     1181    if (format_data->promisc == -1)
     1182        format_data->promisc = 1;
     1183   
     1184    if (format_data->promisc == 1)
     1185        rte_eth_promiscuous_enable(format_data->port);
     1186    else
     1187        rte_eth_promiscuous_disable(format_data->port);
     1188   
     1189   
     1190    /* We have now successfully started/unpased */
     1191    format_data->paused = DPDK_RUNNING;
     1192   
     1193    // Can use remote launch for all
     1194    /*RTE_LCORE_FOREACH_SLAVE(i) {
     1195                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
     1196        }*/
     1197   
     1198    /* Wait for the link to come up */
     1199    rte_eth_link_get(format_data->port, &link_info);
     1200#if DEBUG
     1201    fprintf(stderr, "Link status is %d %d %d\n", (int) link_info.link_status,
     1202            (int) link_info.link_duplex, (int) link_info.link_speed);
     1203#endif
     1204
     1205    return 0;
     1206}
     1207
    9261208static int dpdk_start_input (libtrace_t *libtrace) {
    9271209    char err[500];
     
    9341216        return -1;
    9351217    }
     1218    return 0;
     1219}
     1220
     1221static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
     1222    struct rte_eth_dev_info dev_info;
     1223    rte_eth_dev_info_get(port_id, &dev_info);
     1224    return dev_info.max_rx_queues;
     1225}
     1226
     1227static inline size_t dpdk_processor_count () {
     1228    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1229    if (nb_cpu <= 0)
     1230        return 1;
     1231    else
     1232        return (size_t) nb_cpu;
     1233}
     1234
     1235static int dpdk_pstart_input (libtrace_t *libtrace) {
     1236    char err[500];
     1237    int i=0, phys_cores=0;
     1238    int tot = libtrace->perpkt_thread_count;
     1239    err[0] = 0;
     1240
     1241    if (rte_lcore_id() != rte_get_master_lcore())
     1242        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
     1243
     1244    // If the master is not on the last thread we move it there
     1245    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1246        // Consider error handling here
     1247        dpdk_move_master_lcore(RTE_MAX_LCORE - 1) == -1;
     1248    }
     1249
     1250    // Don't exceed the number of cores in the system/detected by dpdk
     1251    // We don't have to force this but performance wont be good if we don't
     1252    for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1253        if (lcore_config[i].detected) {
     1254            if (rte_lcore_is_enabled(i))
     1255                fprintf(stderr, "Found core %d already in use!\n", i);
     1256            else
     1257                phys_cores++;
     1258        }
     1259    }
     1260
     1261        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1262    tot = MIN(tot, phys_cores);
     1263
     1264        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
     1265       
     1266    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     1267        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "%s", err);
     1268        free(libtrace->format_data);
     1269        libtrace->format_data = NULL;
     1270        return -1;
     1271    }
     1272
     1273    // Make sure we only start the number that we should
     1274    libtrace->perpkt_thread_count = tot;
     1275    return 0;
     1276}
     1277
     1278
     1279/**
     1280 * Register a thread with the DPDK system,
     1281 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1282 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1283 * gives it.
     1284 *
     1285 * We then allow a mapper thread to be started on every real core as DPDK would
     1286 * we also bind these to the corresponding CPU cores.
     1287 *
     1288 * @param libtrace A pointer to the trace
     1289 * @param reading True if the thread will be used to read packets, i.e. will
     1290 *                call pread_packet(), false if thread used to process packet
     1291 *                in any other manner including statistics functions.
     1292 */
     1293static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1294{
     1295    struct rte_config *cfg = rte_eal_get_configuration();
     1296    int i;
     1297    int new_id = -1;
     1298
     1299    // If 'reading packets' fill in cores from 0 up and bind affinity
     1300    // otherwise start from the MAX core (which is also the master) and work backwards
     1301    // in this case physical cores on the system will not exist so we don't bind
     1302    // these to any particular physical core
     1303    if (reading) {
     1304        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1305            if (!rte_lcore_is_enabled(i)) {
     1306                new_id = i;
     1307                if (!lcore_config[i].detected)
     1308                    fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
     1309                break;
     1310            }
     1311        }
     1312    } else {
     1313        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1314            if (!rte_lcore_is_enabled(i)) {
     1315                new_id = i;
     1316                break;
     1317            }
     1318        }
     1319    }
     1320
     1321    if (new_id == -1) {
     1322        assert(cfg->lcore_count == RTE_MAX_LCORE);
     1323        // TODO proper libtrace style error here!!
     1324        fprintf(stderr, "Too many threads for DPDK!!\n");
     1325        return -1;
     1326    }
     1327
     1328    // Enable the core in global DPDK structs
     1329    cfg->lcore_role[new_id] = ROLE_RTE;
     1330    cfg->lcore_count++;
     1331    // Set TLS to reflect our new number
     1332    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
     1333    fprintf(stderr, "original id%d", rte_lcore_id());
     1334    RTE_PER_LCORE(_lcore_id) = new_id;
     1335    fprintf(stderr, " new id%d\n", rte_lcore_id());
     1336
     1337    if (reading) {
     1338        // Set affinity bind to corresponding core
     1339        cpu_set_t cpuset;
     1340        CPU_ZERO(&cpuset);
     1341        CPU_SET(rte_lcore_id(), &cpuset);
     1342        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1343        if (i != 0) {
     1344            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
     1345            return -1;
     1346        }
     1347    }
     1348
     1349    // Map our TLS to the thread data
     1350    if (reading) {
     1351        if(t->type == THREAD_PERPKT) {
     1352            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
     1353        } else {
     1354            t->format_data = &FORMAT(libtrace)->per_lcore[0];
     1355        }
     1356    }
     1357}
     1358
     1359
     1360/**
     1361 * Unregister a thread with the DPDK system.
     1362 *
     1363 * Only previously registered threads should be calling this just before
     1364 * they are destroyed.
     1365 */
     1366static int dpdk_punregister_thread(libtrace_t libtrace, libtrace_thread_t *t UNUSED)
     1367{
     1368    struct rte_config *cfg = rte_eal_get_configuration();
     1369
     1370    assert(rte_lcore_id() >= 0 && rte_lcore_id() < RTE_MAX_LCORE);
     1371
     1372    // Skip if master!!
     1373    if (rte_lcore_id() == rte_get_master_lcore()) {
     1374        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1375        return 0;
     1376    }
     1377
     1378    // Disable this core in global DPDK structs
     1379    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1380    cfg->lcore_count--;
     1381    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1382    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
    9361383    return 0;
    9371384}
     
    12691716}
    12701717
     1718
     1719static void dpdk_fin_packet(libtrace_packet_t *packet)
     1720{
     1721        if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1722                rte_pktmbuf_free(packet->buffer);
     1723                packet->buffer = NULL;
     1724        }
     1725}
     1726
    12711727static int dpdk_read_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
    12721728    int nb_rx; /* Number of rx packets we've recevied */
     
    12981754            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
    12991755        }
     1756    }
     1757   
     1758    /* We'll never get here - but if we did it would be bad */
     1759    return -1;
     1760}
     1761
     1762static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
     1763    int nb_rx; /* Number of rx packets we've recevied */
     1764    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
     1765
     1766    /* Free the last packet buffer */
     1767    if (packet->buffer != NULL) {
     1768        /* Buffer is owned by DPDK */
     1769        if ( packet->buf_control == TRACE_CTRL_EXTERNAL) {
     1770            rte_pktmbuf_free(packet->buffer);
     1771            packet->buffer = NULL;
     1772        } else
     1773        /* Buffer is owned by packet i.e. has been malloc'd */
     1774        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1775            free(packet->buffer);
     1776            packet->buffer = NULL;
     1777        }
     1778    }
     1779   
     1780    packet->buf_control = TRACE_CTRL_EXTERNAL;
     1781    packet->type = TRACE_RT_DATA_DPDK;
     1782   
     1783    /* Wait for a packet */
     1784    while (1) {
     1785        /* Poll for a single packet */
     1786        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
     1787                            PERPKT_FORMAT(t)->queue_id, pkts_burst, 1);
     1788        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
     1789                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     1790            return dpdk_ready_pkt(libtrace, packet, pkts_burst[0]);
     1791        }
     1792        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
     1793        if (libtrace_message_queue_count(&t->messages) > 0) {
     1794                        printf("Extra message yay");
     1795                        return -2;
     1796                }
    13001797    }
    13011798   
     
    14631960}
    14641961
    1465  static struct libtrace_format_t dpdk = {
     1962static struct libtrace_format_t dpdk = {
    14661963        "dpdk",
    14671964        "$Id: format_dpdk.c 1805 2013-03-08 02:01:35Z salcock $",
     
    14801977        dpdk_read_packet,           /* read_packet */
    14811978        dpdk_prepare_packet,    /* prepare_packet */
    1482         NULL,                               /* fin_packet */
     1979        dpdk_fin_packet,                                    /* fin_packet */
    14831980        dpdk_write_packet,          /* write_packet */
    14841981        dpdk_get_link_type,         /* get_link_type */
     
    15032000        dpdk_trace_event,               /* trace_event */
    15042001    dpdk_help,              /* help */
    1505         NULL
     2002    NULL,                   /* next pointer */
     2003    {true, 8},              /* Live, NICs typically have 8 threads */
     2004    dpdk_pstart_input, /* pstart_input */
     2005        dpdk_pread_packet, /* pread_packet */
     2006        dpdk_pause_input, /* ppause */
     2007        dpdk_fin_input, /* p_fin */
     2008        dpdk_pconfig_input, /* pconfig_input */
     2009    dpdk_pregister_thread, /* pregister_thread */
     2010    dpdk_punregister_thread /* unpregister_thread */
    15062011};
    15072012
  • lib/format_duck.c

    r9b097ea rb13b939  
    360360        NULL,                           /* trace_event */
    361361        duck_help,                      /* help */
    362         NULL                            /* next pointer */
     362        NULL,                            /* next pointer */
     363        NON_PARALLEL(false)
    363364};
    364365
  • lib/format_erf.c

    rc70f59f rc70f59f  
    828828        erf_event,                      /* trace_event */
    829829        erf_help,                       /* help */
    830         NULL                            /* next pointer */
     830        NULL,                           /* next pointer */
     831        NON_PARALLEL(false)
    831832};
    832833
     
    871872        erf_event,                      /* trace_event */
    872873        erf_help,                       /* help */
    873         NULL                            /* next pointer */
     874        NULL,                           /* next pointer */
     875        NON_PARALLEL(false)
    874876};
    875877
  • lib/format_legacy.c

    r1ca603b rb13b939  
    552552        trace_event_trace,              /* trace_event */
    553553        legacyatm_help,                 /* help */
    554         NULL                            /* next pointer */
     554        NULL,                           /* next pointer */
     555        NON_PARALLEL(false)
    555556};
    556557
     
    595596        trace_event_trace,              /* trace_event */
    596597        legacyeth_help,                 /* help */
    597         NULL                            /* next pointer */
     598        NULL,                           /* next pointer */
     599        NON_PARALLEL(false)
    598600};
    599601
     
    639641        legacypos_help,                 /* help */
    640642        NULL,                           /* next pointer */
     643        NON_PARALLEL(false)
    641644};
    642645
     
    682685        legacynzix_help,                /* help */
    683686        NULL,                           /* next pointer */
     687        NON_PARALLEL(false)
    684688};
    685689       
  • lib/format_linux.c

    r63af502 r2498008  
    7272#include <sys/mman.h>
    7373
     74#include <fcntl.h>
     75
    7476/* MAX_ORDER is defined in linux/mmzone.h. 10 is default for 2.4 kernel.
    7577 * max_order will be decreased by one if the ring buffer fails to allocate.
     
    147149#define PACKET_HDRLEN   11
    148150#define PACKET_TX_RING  13
     151#define PACKET_FANOUT   18
    149152#define TP_STATUS_USER  0x1
    150153#define TP_STATUS_SEND_REQUEST  0x1
     
    154157#define TPACKET_ALIGN(x)        (((x)+TPACKET_ALIGNMENT-1)&~(TPACKET_ALIGNMENT-1))
    155158#define TPACKET_HDRLEN         (TPACKET_ALIGN(sizeof(struct tpacket2_hdr)) + sizeof(struct sockaddr_ll))
     159
     160/* Since 3.1 kernel we have packet_fanout support */
     161// schedule to socket by skb's rxhash - the implementation is bi-directional
     162#define PACKET_FANOUT_HASH              0
     163// schedule round robin
     164#define PACKET_FANOUT_LB                1
     165// schedule to the same socket that received the packet
     166#define PACKET_FANOUT_CPU               2
     167// Something to do with fragmented packets and hashing problems !! TODO figure out if this needs to be on
     168#define PACKET_FANOUT_FLAG_DEFRAG       0x8000
     169/* Included but unused by libtrace since Linux 3.10 */
     170// if one socket if full roll over to the next
     171#define PACKET_FANOUT_ROLLOVER          3
     172// This flag makes any other system roll over
     173#define PACKET_FANOUT_FLAG_ROLLOVER     0x1000
     174/* Included but unused by libtrace since Linux 3.12 */
     175// schedule random
     176#define PACKET_FANOUT_RND               4
     177
    156178
    157179enum tpacket_versions {
     
    185207        unsigned int tp_frame_nr;    /* Total number of frames */
    186208};
     209
     210struct linux_per_thread_t {
     211        char *rx_ring;
     212        int rxring_offset;
     213        int fd;
     214        // The flag layout should be the same for all (I Hope)
     215        // max_order
     216} ALIGN_STRUCT(CACHE_LINE_SIZE);
    187217
    188218struct linux_format_data_t {
     
    212242        /* Used to determine buffer size for the ring buffer */
    213243        uint32_t max_order;
     244        /* Used for the parallel case, fanout is the mode */
     245        uint16_t fanout_flags;
     246        /* The group lets Linux know which sockets to group together
     247         * so we use a random here to try avoid collisions */
     248        uint16_t fanout_group;
     249        /* When running in parallel mode this is malloc'd with an array
     250         * file descriptors from packet fanout will use, here we assume/hope
     251         * that every ring can get setup the same */
     252        struct linux_per_thread_t *per_thread;
    214253};
    215254
     
    266305
    267306#define FORMAT(x) ((struct linux_format_data_t*)(x))
     307#define PERPKT_FORMAT(x) ((struct linux_per_thread_t*)(x->format_data))
    268308#define DATAOUT(x) ((struct linux_output_format_data_t*)((x)->format_data))
    269309
     
    367407        FORMAT(libtrace->format_data)->rxring_offset = 0;
    368408        FORMAT(libtrace->format_data)->max_order = MAX_ORDER;
     409        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB; // This might be best or alternatively PACKET_FANOUT_LB
     410        // Some examples use pid for the group however that would limit a single
     411        // application to use only int/ring format, instead using rand
     412        FORMAT(libtrace->format_data)->fanout_group = (uint16_t) rand();
     413        FORMAT(libtrace->format_data)->per_thread = NULL;
    369414}
    370415static int linuxring_init_input(libtrace_t *libtrace)
    371416{       
    372417        init_input(libtrace);
    373         FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_RING;
     418        FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_RING;
    374419        return 0;
    375420}
     
    377422{
    378423        init_input(libtrace);
    379         FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_NATIVE;
     424        FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_NATIVE;
    380425        return 0;
    381426}
     
    508553                }
    509554               
    510                 if (setsockopt(FORMAT(libtrace->format_data)->fd,
     555                if (setsockopt(FORMAT(libtrace->format_data)->fd,
    511556                                        SOL_SOCKET,
    512557                                        SO_ATTACH_FILTER,
     
    588633        return 0;
    589634}
    590 static int linuxring_start_input(libtrace_t *libtrace){
    591 
    592         char error[2048];       
     635
     636/**
     637 * Converts a socket, either packet_mmap or standard raw socket into a
     638 * fanout socket.
     639 * NOTE: This means we can read from the socket with multiple queues,
     640 * each must be setup (identically) and then this called upon them
     641 *
     642 * @return 0 success, -1 error
     643 */
     644static inline int socket_to_packet_fanout(int fd,
     645                                        uint16_t fanout_flags,
     646                                        uint16_t fanout_group) {
     647        int fanout_opt = ((int)fanout_flags << 16) | (int)fanout_group;
     648        if (setsockopt(fd, SOL_PACKET, PACKET_FANOUT,
     649                        &fanout_opt, sizeof(fanout_opt)) == -1) {
     650                return -1;
     651        }
     652        return 0;
     653}
     654
     655static int linuxnative_ppause_input(libtrace_t *libtrace)
     656{
     657        int i;
     658        int tot = libtrace->perpkt_thread_count;
     659        printf("CAlling native pause packet\n");
     660       
     661        for (i = 0; i < tot; i++) {
     662                close(FORMAT(libtrace->format_data)->per_thread[i].fd);
     663        }
     664       
     665        free(FORMAT(libtrace->format_data)->per_thread);
     666        FORMAT(libtrace->format_data)->per_thread = NULL;
     667        return 0;
     668}
     669
     670static int linuxring_start_input(libtrace_t *libtrace)
     671{
     672        char error[2048];
    593673
    594674        /* We set the socket up the same and then convert it to PACKET_MMAP */
     
    615695}
    616696
     697static int linuxnative_pstart_input(libtrace_t *libtrace) {
     698        int i = 0;
     699        int tot = libtrace->perpkt_thread_count;
     700        int iserror = 0;
     701        // We store this here otherwise it will be leaked if the memory doesn't know
     702        struct linux_per_thread_t *per_thread = NULL;
     703       
     704        if (!FORMAT(libtrace->format_data)->per_thread) {
     705                //per_thread = calloc(tot, sizeof(struct linux_per_thread_t));
     706                posix_memalign((void **)&per_thread, CACHE_LINE_SIZE, tot*sizeof(struct linux_per_thread_t));
     707                FORMAT(libtrace->format_data)->per_thread = per_thread;
     708        } else {
     709                // Whats going on this might not work 100%
     710                // We assume all sockets have been closed ;)
     711                printf("Pause and then start called again lets hope that perpkt_thread_count hasn't changed\n");
     712        }
     713       
     714        printf("Calling native pstart packet\n");
     715        for (i = 0; i < tot; ++i)
     716        {
     717                if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_NATIVE) {
     718                        if (linuxnative_start_input(libtrace) != 0) {
     719                                iserror = 1;
     720                                break;
     721                        }
     722                } else {
     723                        // This must be ring
     724                        if (linuxring_start_input(libtrace) != 0) {
     725                                iserror = 1;
     726                                break;
     727                        }
     728                }
     729                if (socket_to_packet_fanout(FORMAT(libtrace->format_data)->fd, FORMAT(libtrace->format_data)->fanout_flags, FORMAT(libtrace->format_data)->fanout_group) != 0)
     730                {
     731                        iserror = 1;
     732                        // Clean up here to keep consistent with every one else
     733                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Converting the fd to a socket fanout failed");
     734                        close(FORMAT(libtrace->format_data)->fd);
     735                        free(libtrace->format_data);
     736                        libtrace->format_data = NULL;
     737                        break;
     738                }
     739                per_thread[i].fd = FORMAT(libtrace->format_data)->fd;
     740                if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_RING) {
     741                        per_thread[i].rxring_offset = FORMAT(libtrace->format_data)->rxring_offset;
     742                        per_thread[i].rx_ring = FORMAT(libtrace->format_data)->rx_ring;
     743                }
     744        }
     745       
     746        // Roll back those that failed - by this point in time the format_data
     747        // has been freed
     748        if (iserror) {
     749                for (i = i - 1; i >= 0; i--) {
     750                        close(per_thread[i].fd);
     751                }
     752                free(per_thread);
     753                per_thread = NULL;
     754                return -1;
     755        }
     756       
     757        return 0;
     758}
     759
     760static int linux_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading) {
     761        fprintf(stderr, "registering thread %d!!\n", t->perpkt_num);
     762    if (reading) {
     763        if(t->type == THREAD_PERPKT) {
     764            t->format_data = &FORMAT(libtrace->format_data)->per_thread[t->perpkt_num];
     765        } else {
     766            t->format_data = &FORMAT(libtrace->format_data)->per_thread[0];
     767        }
     768    }
     769    return 0;
     770}
     771
    617772static int linuxnative_start_output(libtrace_out_t *libtrace)
    618773{
     
    621776                free(DATAOUT(libtrace));
    622777                return -1;
    623         }       
     778        }
    624779
    625780        return 0;
     
    666821        return 0;
    667822}
     823
    668824static int linuxring_pause_input(libtrace_t *libtrace)
    669825{
     
    765921                         */
    766922                        f->flag = 1;
    767                 }
     923                }
    768924
    769925                pcap_close(pcap);
     
    813969#endif /* HAVE_NETPACKET_PACKET_H */
    814970
     971
     972static int linuxnative_pconfig_input(libtrace_t *libtrace,
     973                trace_parallel_option_t option,
     974                void *data)
     975{
     976        switch(option) {
     977                case TRACE_OPTION_SET_HASHER:
     978                        switch (*((enum hasher_types *)data)) {
     979                                case HASHER_BALANCE:
     980                                        // Do fanout
     981                                        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB;
     982                                        // Or we could balance to the CPU
     983                                        return 0;
     984                                case HASHER_BIDIRECTIONAL:
     985                                case HASHER_UNIDIRECTIONAL:
     986                                        FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_HASH;
     987                                        return 0;
     988                                case HASHER_CUSTOM:
     989                                case HASHER_HARDWARE:
     990                                        return -1;
     991                        }
     992                        break;
     993                /* Avoid default: so that future options will cause a warning
     994                 * here to remind us to implement it, or flag it as
     995                 * unimplementable
     996                 */
     997        }
     998       
     999        /* Don't set an error - trace_config will try to deal with the
     1000         * option and will set an error if it fails */
     1001        return -1;
     1002}
     1003
     1004
    8151005static int linuxnative_prepare_packet(libtrace_t *libtrace UNUSED,
    8161006                libtrace_packet_t *packet, void *buffer,
     
    8841074
    8851075#ifdef HAVE_NETPACKET_PACKET_H
    886 static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1076libtrace_thread_t * get_thread_table(libtrace_t *libtrace) ;
     1077inline static int linuxnative_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, const int check_queue)
    8871078{
    8881079        struct libtrace_linuxnative_header *hdr;
     
    8921083        struct cmsghdr *cmsg;
    8931084        int snaplen;
     1085
    8941086        uint32_t flags = 0;
    8951087        fd_set readfds;
     
    9301122        iovec.iov_base = (void*)(packet->buffer+sizeof(*hdr));
    9311123        iovec.iov_len = snaplen;
    932 
     1124       
     1125        if (check_queue) {
     1126                // Check for a packet - TODO only Linux has MSG_DONTWAIT should use fctl O_NONBLOCK
     1127                hdr->wirelen = recvmsg(fd, &msghdr, MSG_DONTWAIT | MSG_TRUNC);
     1128                if ((int) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
     1129                        // Do message queue check or select
     1130                        int ret;
     1131                        fd_set rfds;
     1132                        FD_ZERO(&rfds);
     1133                        FD_SET(fd, &rfds);
     1134                        FD_SET(get_thread_table(libtrace)->messages.pipefd[0], &rfds);
     1135                        int largestfd = fd > get_thread_table(libtrace)->messages.pipefd[0] ? fd : get_thread_table(libtrace)->messages.pipefd[0];
     1136                        do {
     1137                                ret = select(largestfd+1, &rfds, NULL, NULL, NULL);
     1138                                if (ret == -1 && errno != EINTR)
     1139                                        perror("Select() failed");
     1140                        }
     1141                        while (ret == -1);
     1142                       
     1143                        assert (ret == 1 || ret == 2); // No timeout 0 is not an option
     1144                       
     1145                        if (FD_ISSET(get_thread_table(libtrace)->messages.pipefd[0], &rfds)) {
     1146                                // Not an error but check the message queue we have something
     1147                                return -2;
     1148                        }
     1149                        // Otherwise we must have a packet
     1150                        hdr->wirelen = recvmsg(fd, &msghdr, 0);
     1151                }
     1152        } else {
    9331153        /* Use select to allow us to time out occasionally to check if someone
    9341154         * has hit Ctrl-C or otherwise wants us to stop reading and return
     
    9611181                        return 0;
    9621182        }
    963 
    9641183        hdr->wirelen = recvmsg(FORMAT(libtrace->format_data)->fd, &msghdr, MSG_TRUNC);
    965 
     1184        }
     1185
     1186                       
     1187                       
     1188       
    9661189        if (hdr->wirelen==~0U) {
    9671190                trace_set_err(libtrace,errno,"recvmsg");
     
    10111234        if (cmsg == NULL) {
    10121235                struct timeval tv;
    1013                 if (ioctl(FORMAT(libtrace->format_data)->fd,
    1014                                   SIOCGSTAMP,&tv)==0) {
     1236                if (ioctl(fd, SIOCGSTAMP,&tv)==0) {
    10151237                        hdr->tv.tv_sec = tv.tv_sec;
    10161238                        hdr->tv.tv_usec = tv.tv_usec;
     
    10331255}
    10341256
     1257static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     1258{
     1259        int fd = FORMAT(libtrace->format_data)->fd;
     1260        return linuxnative_read_packet_fd(libtrace, packet, fd, 0);
     1261}
     1262
     1263static int linuxnative_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet)
     1264{
     1265        int fd = PERPKT_FORMAT(t)->fd;
     1266        //fprintf(stderr, "Thread number is #%d fd=%d\n", t->perpkt_num, PERPKT_FORMAT(t)->fd);
     1267        return linuxnative_read_packet_fd(libtrace, packet, fd, 1);
     1268}
     1269
    10351270#define LIBTRACE_BETWEEN(test,a,b) ((test) >= (a) && (test) < (b))
    10361271static int linuxring_get_capture_length(const libtrace_packet_t *packet);
     
    10391274/* Release a frame back to the kernel or free() if it's a malloc'd buffer
    10401275 */
    1041 inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet ){
     1276inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet){
    10421277        /* Free the old packet */
    10431278        if(packet->buffer == NULL)
     
    10511286                struct linux_format_data_t *ftd = FORMAT(libtrace->format_data);
    10521287               
    1053                 /* Check it's within our buffer first */
    1054                 if(LIBTRACE_BETWEEN((char *) packet->buffer,
     1288                /* Check it's within our buffer first - consider the pause resume case it might have already been free'd lets hope we get another buffer */
     1289                // For now let any one free anything
     1290                /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
    10551291                                (char *) ftd->rx_ring,
    10561292                                ftd->rx_ring
    1057                                 + ftd->req.tp_block_size * ftd->req.tp_block_nr)){
     1293                                + ftd->req.tp_block_size * ftd->req.tp_block_nr)){*/
    10581294                        TO_TP_HDR(packet->buffer)->tp_status = 0;
    10591295                        packet->buffer = NULL;
    1060                 }
    1061         }
    1062 }
    1063 
    1064 static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1296                /*}*/
     1297        }
     1298}
     1299
     1300/**
     1301 * Free any resources being kept for this packet, Note: libtrace
     1302 * will ensure all fields are zeroed correctly.
     1303 */
     1304static void linuxring_fin_packet(libtrace_packet_t *packet)
     1305{
     1306
     1307        if (packet->buffer == NULL)
     1308                return;
     1309        assert(packet->trace);
     1310       
     1311        // Started should always match the existence of the rx_ring
     1312        assert(!!FORMAT(packet->trace->format_data)->rx_ring == !!packet->trace->started);
     1313       
     1314        // Our packets are always under our control
     1315        assert(packet->buf_control == TRACE_CTRL_EXTERNAL);
     1316       
     1317        if (FORMAT(packet->trace->format_data)->rx_ring) // If we don't have a ring its already been destroyed or paused
     1318                ring_release_frame(packet->trace, packet);
     1319        else
     1320                packet->buffer = NULL;
     1321}
     1322
     1323inline static int linuxring_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, int *rxring_offset, char *rx_ring, int message) {
    10651324
    10661325        struct tpacket2_hdr *header;
    1067         struct pollfd pollset; 
    10681326        int ret;
    10691327        unsigned int snaplen;
     
    10751333       
    10761334        /* Fetch the current frame */
    1077         header = GET_CURRENT_BUFFER(libtrace);
     1335        header = ((void*) rx_ring) + *rxring_offset * FORMAT(libtrace->format_data)->req.tp_frame_size; // GET_CURRENT_BUFFER(libtrace);
    10781336        assert((((unsigned long) header) & (pagesize - 1)) == 0);
    10791337
    10801338        while (1) {
    1081                 pollset.fd = FORMAT(libtrace->format_data)->fd;
    1082                 pollset.events = POLLIN;
    1083                 pollset.revents = 0;
    1084                 /* Wait for more data */
     1339                if (message) {
     1340                        struct pollfd pollset[2];
     1341                        pollset[0].fd = fd;
     1342                        pollset[0].events = POLLIN;
     1343                        pollset[0].revents = 0;
     1344                        pollset[1].fd = libtrace_message_queue_get_fd(&get_thread_table(libtrace)->messages);
     1345                        pollset[1].events = POLLIN;
     1346                        pollset[1].revents = 0;
     1347                        /* Wait for more data or a message*/
     1348                        ret = poll(pollset, 2, -1);
     1349                        if (ret < 0) {
     1350                                if (errno != EINTR)
     1351                                        trace_set_err(libtrace,errno,"poll()");
     1352                                return -1;
     1353                        }
     1354                        // Check for a message otherwise loop
     1355                        if (pollset[1].revents)
     1356                                return -2;
     1357                } else {
     1358                        struct pollfd pollset;
     1359                        pollset.fd = fd;
     1360                        pollset.events = POLLIN;
     1361                        pollset.revents = 0;
     1362
     1363                        /* Wait for more data or a message*/
    10851364                ret = poll(&pollset, 1, 500);
    1086                 if (ret < 0) {
    1087                         if (errno != EINTR)
    1088                                 trace_set_err(libtrace,errno,"poll()");
    1089                         return -1;
     1365                        if (ret < 0) {
     1366                                if (errno != EINTR)
     1367                                        trace_set_err(libtrace,errno,"poll()");
     1368                                return -1;
    10901369                } else if (ret == 0) {
    10911370                        /* Poll timed out - check if we should exit */
     
    11011380                if (header->tp_status & TP_STATUS_USER)
    11021381                        break;
     1382                }
    11031383        }
    11041384
     
    11151395
    11161396        /* Move to next buffer */
    1117         FORMAT(libtrace->format_data)->rxring_offset++;
    1118         FORMAT(libtrace->format_data)->rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
     1397        (*rxring_offset)++;
     1398        *rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
    11191399
    11201400        /* We just need to get prepare_packet to set all our packet pointers
     
    11261406                                linuxring_get_capture_length(packet);
    11271407
     1408}
     1409
     1410static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1411        int fd = FORMAT(libtrace->format_data)->fd;
     1412        int *rxring_offset = &FORMAT(libtrace->format_data)->rxring_offset;
     1413        char *rx_ring = FORMAT(libtrace->format_data)->rx_ring;
     1414        return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 0);
     1415}
     1416
     1417static int linuxring_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
     1418        //fprintf(stderr, "Thread number is #%d\n", t->perpkt_num);
     1419        int fd = PERPKT_FORMAT(t)->fd;
     1420        int *rxring_offset = &PERPKT_FORMAT(t)->rxring_offset;
     1421        char *rx_ring = PERPKT_FORMAT(t)->rx_ring;
     1422        return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 1);
    11281423}
    11291424
     
    11831478
    11841479        return ret;
    1185 
    1186 }
     1480}
     1481
    11871482static int linuxring_write_packet(libtrace_out_t *trace,
    11881483                libtrace_packet_t *packet)
     
    14961791/* Number of packets that passed filtering */
    14971792static uint64_t linuxnative_get_captured_packets(libtrace_t *trace) {
     1793        struct tpacket_stats stats;
     1794
    14981795        if (trace->format_data == NULL)
    14991796                return UINT64_MAX;
     
    15041801        }
    15051802
    1506 #ifdef HAVE_NETPACKET_PACKET_H 
    1507         if ((FORMAT(trace->format_data)->stats_valid & 1)
    1508                         || FORMAT(trace->format_data)->stats_valid == 0) {
    1509                 socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
    1510                 getsockopt(FORMAT(trace->format_data)->fd,
    1511                                 SOL_PACKET,
    1512                                 PACKET_STATISTICS,
    1513                                 &FORMAT(trace->format_data)->stats,
    1514                                 &len);
    1515                 FORMAT(trace->format_data)->stats_valid |= 1;
     1803#ifdef HAVE_NETPACKET_PACKET_H
     1804
     1805        if ((FORMAT(trace->format_data)->stats_valid & 1)
     1806                || FORMAT(trace->format_data)->stats_valid == 0) {
     1807                if (FORMAT(trace->format_data)->per_thread) {
     1808                        int i;
     1809                        FORMAT(trace->format_data)->stats.tp_drops = 0;
     1810                        FORMAT(trace->format_data)->stats.tp_packets = 0;
     1811                        for (i = 0; i < trace->perpkt_thread_count; ++i) {
     1812                                socklen_t len = sizeof(stats);
     1813                                getsockopt(FORMAT(trace->format_data)->per_thread[i].fd,
     1814                                           SOL_PACKET,
     1815                                           PACKET_STATISTICS,
     1816                                           &stats,
     1817                                           &len);
     1818                                FORMAT(trace->format_data)->stats.tp_drops += stats.tp_drops;
     1819                                FORMAT(trace->format_data)->stats.tp_packets += stats.tp_packets;
     1820                        }
     1821                        FORMAT(trace->format_data)->stats_valid |= 1;
     1822                } else {
     1823                        socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
     1824                        getsockopt(FORMAT(trace->format_data)->fd,
     1825                                   SOL_PACKET,
     1826                                   PACKET_STATISTICS,
     1827                                   &FORMAT(trace->format_data)->stats,
     1828                                   &len);
     1829                        FORMAT(trace->format_data)->stats_valid |= 1;
     1830                }
    15161831        }
    15171832
     
    15261841 */
    15271842static uint64_t linuxnative_get_dropped_packets(libtrace_t *trace) {
     1843        struct tpacket_stats stats;
    15281844        if (trace->format_data == NULL)
    15291845                return UINT64_MAX;
     
    15331849                return UINT64_MAX;
    15341850        }
    1535        
     1851
    15361852#ifdef HAVE_NETPACKET_PACKET_H 
    15371853        if ((FORMAT(trace->format_data)->stats_valid & 2)
    1538                         || (FORMAT(trace->format_data)->stats_valid==0)) {
    1539                 socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
    1540                 getsockopt(FORMAT(trace->format_data)->fd,
    1541                                 SOL_PACKET,
    1542                                 PACKET_STATISTICS,
    1543                                 &FORMAT(trace->format_data)->stats,
    1544                                 &len);
    1545                 FORMAT(trace->format_data)->stats_valid |= 2;
     1854                || (FORMAT(trace->format_data)->stats_valid==0)) {
     1855                if (FORMAT(trace->format_data)->per_thread) {
     1856                        int i;
     1857                        FORMAT(trace->format_data)->stats.tp_drops = 0;
     1858                        FORMAT(trace->format_data)->stats.tp_packets = 0;
     1859                        for (i = 0; i < trace->perpkt_thread_count; ++i) {
     1860                                socklen_t len = sizeof(stats);
     1861                                getsockopt(FORMAT(trace->format_data)->per_thread[i].fd,
     1862                                           SOL_PACKET,
     1863                                           PACKET_STATISTICS,
     1864                                           &stats,
     1865                                           &len);
     1866                                FORMAT(trace->format_data)->stats.tp_drops += stats.tp_drops;
     1867                                FORMAT(trace->format_data)->stats.tp_packets += stats.tp_packets;
     1868                        }
     1869                        FORMAT(trace->format_data)->stats_valid |= 2;
     1870                } else {
     1871                        socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
     1872                        getsockopt(FORMAT(trace->format_data)->fd,
     1873                                   SOL_PACKET,
     1874                                   PACKET_STATISTICS,
     1875                                   &FORMAT(trace->format_data)->stats,
     1876                                   &len);
     1877                        FORMAT(trace->format_data)->stats_valid |= 2;
     1878                }
    15461879        }
    15471880
     
    16151948        trace_event_device,             /* trace_event */
    16161949        linuxnative_help,               /* help */
     1950        NULL,                                   /* next pointer */
     1951        {true, -1},              /* Live, no thread limit */
     1952        linuxnative_pstart_input,                       /* pstart_input */
     1953        linuxnative_pread_packet,                       /* pread_packet */
     1954        linuxnative_ppause_input,                       /* ppause */
     1955        linuxnative_fin_input,                          /* p_fin */
     1956        linuxnative_pconfig_input,                      /* pconfig input */
     1957        linux_pregister_thread,
    16171958        NULL
    16181959};
     
    16351976        linuxring_read_packet,  /* read_packet */
    16361977        linuxring_prepare_packet,       /* prepare_packet */
    1637         NULL,                           /* fin_packet */
     1978        linuxring_fin_packet,                           /* fin_packet */
    16381979        linuxring_write_packet, /* write_packet */
    16391980        linuxring_get_link_type,        /* get_link_type */
     
    16581999        linuxring_event,                /* trace_event */
    16592000        linuxring_help,         /* help */
     2001        NULL,                           /* next pointer */
     2002        {true, -1},              /* Live, no thread limit */
     2003        linuxnative_pstart_input,                       /* pstart_input */
     2004        linuxring_pread_packet,                 /* pread_packet */
     2005        linuxnative_ppause_input,                       /* ppause */
     2006        linuxnative_fin_input,                          /* p_fin */
     2007        linuxnative_pconfig_input,
     2008        linux_pregister_thread,
    16602009        NULL
    16612010};
     
    17102059        trace_event_device,             /* trace_event */
    17112060        linuxnative_help,               /* help */
    1712         NULL
     2061        NULL,                   /* next pointer */
     2062        NON_PARALLEL(true)
    17132063};
    17142064
     
    17532103        NULL,                           /* trace_event */
    17542104        linuxring_help,                 /* help */
    1755         NULL
     2105        NULL,                   /* next pointer */
     2106        NON_PARALLEL(true)
    17562107};
    17572108
  • lib/format_pcap.c

    r4649fea r4649fea  
    834834        trace_event_trace,              /* trace_event */
    835835        pcap_help,                      /* help */
    836         NULL                            /* next pointer */
     836        NULL,                   /* next pointer */
     837        NON_PARALLEL(false)
    837838};
    838839
     
    877878        trace_event_device,             /* trace_event */
    878879        pcapint_help,                   /* help */
    879         NULL                            /* next pointer */
     880        NULL,                   /* next pointer */
     881        NON_PARALLEL(true)
    880882};
    881883
  • lib/format_pcapfile.c

    rc70f59f rc70f59f  
    774774        pcapfile_event,         /* trace_event */
    775775        pcapfile_help,                  /* help */
    776         NULL                            /* next pointer */
     776        NULL,                   /* next pointer */
     777        NON_PARALLEL(false)
    777778};
    778779
  • lib/format_rt.c

    rc70f59f rc70f59f  
    862862        trace_event_rt,             /* trace_event */
    863863        rt_help,                        /* help */
    864         NULL                            /* next pointer */
     864        NULL,                   /* next pointer */
     865        NON_PARALLEL(true) /* This is normally live */
    865866};
    866867
  • lib/format_tsh.c

    rc909fad rb13b939  
    269269        trace_event_trace,              /* trace_event */
    270270        tsh_help,                       /* help */
    271         NULL                            /* next pointer */
     271        NULL,                   /* next pointer */
     272        NON_PARALLEL(false)
    272273};
    273274
     
    317318        trace_event_trace,              /* trace_event */
    318319        tsh_help,                       /* help */
    319         NULL                            /* next pointer */
     320        NULL,                   /* next pointer */
     321        NON_PARALLEL(false)
    320322};
    321323
  • lib/libtrace.h.in

    r17f954f rd994324  
    115115/** DAG driver version installed on the current system */
    116116#define DAG_DRIVER_V "@DAG_VERSION_NUM@"
     117
     118/**
     119  * A version of assert that always runs the first argument even
     120  * when not debugging, however only asserts the condition if debugging
     121  * Intended for use mainly with pthread locks etc. which have error
     122  * returns but *should* never actually fail.
     123  */
     124#ifdef NDEBUG
     125#define ASSERT_RET(run, cond) run
     126#else
     127#define ASSERT_RET(run, cond) assert(run cond)
     128//#define ASSERT_RET(run, cond) run
     129#endif
    117130   
    118131#ifdef __cplusplus
     
    195208#endif
    196209
     210// Used to fight against false sharing
     211#define CACHE_LINE_SIZE 64
     212#define ALIGN_STRUCT(x) __attribute__((aligned(x)))
     213
    197214#ifdef _MSC_VER
    198215    #ifdef LT_BUILDING_DLL
     
    223240/** Opaque structure holding information about a bpf filter */
    224241typedef struct libtrace_filter_t libtrace_filter_t;
     242
     243typedef struct libtrace_thread_t libtrace_thread_t;
    225244
    226245/** If the packet has allocated its own memory the buffer_control should be
     
    509528        uint8_t transport_proto;        /**< Cached transport protocol */
    510529        uint32_t l4_remaining;          /**< Cached transport remaining */
     530        uint64_t order; /**< Notes the order of this packet in relation to the input */
     531        uint64_t hash; /**< A hash of the packet as supplied by the user */
     532        int error; /**< The error status of pread_packet */
    511533} libtrace_packet_t;
    512534
     
    31473169/*@}*/
    31483170
     3171/**
     3172 * A collection of types for convenience used in place of a
     3173 * simple void* to allow a any type of data to be stored.
     3174 *
     3175 * This is expected to be 8 bytes in length.
     3176 */
     3177typedef union {
     3178        /* Pointers */
     3179        void *ptr;
     3180        libtrace_packet_t *pkt;
     3181
     3182        /* C99 Integer types */
     3183        /* NOTE: Standard doesn't require 64-bit
     3184     * but x32 and x64 gcc does */
     3185        int64_t sint64;
     3186        uint64_t uint64;
     3187
     3188        uint32_t uint32s[2];
     3189        int32_t sint32s[2];
     3190        uint32_t uint32;
     3191        int32_t sint32;
     3192
     3193        uint16_t uint16s[4];
     3194        int16_t sint16s[4];
     3195        uint16_t uint16;
     3196        int16_t sint16;
     3197
     3198        uint8_t uint8s[8];
     3199        int8_t sint8s[8];
     3200        uint8_t uint8;
     3201        int8_t sint8;
     3202
     3203        size_t size;
     3204
     3205        /* C basic types - we cannot be certian of the size */
     3206        int sint;
     3207        unsigned int uint;
     3208
     3209        signed char schars[8];
     3210        unsigned char uchars[8];
     3211        signed char schar;
     3212        unsigned char uchar;
     3213
     3214        /* Real numbers */
     3215        float rfloat;
     3216        double rdouble;
     3217} libtrace_generic_types_t;
     3218
     3219typedef struct libtrace_message_t {
     3220        int code;
     3221        libtrace_generic_types_t additional;
     3222        libtrace_thread_t *sender;
     3223} libtrace_message_t;
     3224
     3225/** Structure holding information about a result */
     3226typedef struct libtrace_result_t {
     3227        uint64_t key;
     3228        libtrace_generic_types_t value;
     3229        int type;
     3230} libtrace_result_t;
     3231#define RESULT_NORMAL 0
     3232#define RESULT_PACKET 1
     3233#define RESULT_TICK   2
     3234
     3235
     3236typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
     3237typedef void (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
     3238typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
     3239
     3240DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter);
     3241DLLEXPORT int trace_ppause(libtrace_t *libtrace);
     3242DLLEXPORT int trace_pstop(libtrace_t *libtrace);
     3243DLLEXPORT void trace_join(libtrace_t * trace);
     3244DLLEXPORT void print_contention_stats (libtrace_t *libtrace);
     3245
     3246DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
     3247DLLEXPORT uint64_t libtrace_result_get_key(libtrace_result_t * result);
     3248DLLEXPORT void libtrace_result_set_value(libtrace_result_t * result, libtrace_generic_types_t value);
     3249DLLEXPORT libtrace_generic_types_t libtrace_result_get_value(libtrace_result_t * result);
     3250DLLEXPORT void libtrace_result_set_key_value(libtrace_result_t * result, uint64_t key, libtrace_generic_types_t value);
     3251DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
     3252
     3253// Ways to access Global and TLS storage that we provide the user
     3254DLLEXPORT void * trace_get_global(libtrace_t *trace);
     3255DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data);
     3256DLLEXPORT void * trace_set_tls(libtrace_thread_t *t, void * data);
     3257DLLEXPORT void * trace_get_tls(libtrace_thread_t *t);
     3258
     3259
     3260DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, libtrace_generic_types_t value, int type);
     3261typedef struct libtrace_vector libtrace_vector_t;
     3262
     3263DLLEXPORT int trace_post_reporter(libtrace_t *libtrace);
     3264DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace);
     3265DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     3266DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message);
     3267DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message);
     3268DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
     3269DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message);
     3270DLLEXPORT int trace_finished(libtrace_t * libtrace);
     3271DLLEXPORT uint64_t trace_packet_get_order(libtrace_packet_t * packet);
     3272DLLEXPORT uint64_t trace_packet_get_hash(libtrace_packet_t * packet);
     3273DLLEXPORT void trace_packet_set_order(libtrace_packet_t * packet, uint64_t order);
     3274DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash);
     3275DLLEXPORT uint64_t tv_to_usec(struct timeval *tv);
     3276
     3277DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
     3278
     3279DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt);
     3280DLLEXPORT void libtrace_make_result_safe(libtrace_result_t *res);
     3281
     3282typedef enum {
     3283        /**
     3284         * Sets the hasher function, if NULL(default) no hashing is used a
     3285         * cores will get packets on a first in first served basis
     3286         */
     3287        TRACE_OPTION_SET_HASHER,
     3288       
     3289        /**
     3290         * Libtrace set perpkt thread count
     3291         */
     3292        TRACE_OPTION_SET_PERPKT_THREAD_COUNT,
     3293       
     3294        /**
     3295         * Delays packets so they are played back in trace-time rather than as fast
     3296         * as possible.
     3297         */
     3298        TRACE_OPTION_TRACETIME,
     3299
     3300        /**
     3301         * Specifies the interval between tick packets in milliseconds, if 0
     3302         * or less this is ignored.
     3303         */
     3304        TRACE_OPTION_TICK_INTERVAL,
     3305        TRACE_OPTION_GET_CONFIG,
     3306        TRACE_OPTION_SET_CONFIG
     3307} trace_parallel_option_t;
     3308
     3309enum libtrace_messages {
     3310        MESSAGE_STARTING,
     3311        MESSAGE_RESUMING,
     3312        MESSAGE_STOPPING,
     3313        MESSAGE_PAUSING,
     3314        MESSAGE_DO_PAUSE,
     3315        MESSAGE_DO_STOP,
     3316        MESSAGE_FIRST_PACKET,
     3317        MESSAGE_PERPKT_ENDED,
     3318        MESSAGE_PERPKT_RESUMED,
     3319        MESSAGE_PERPKT_PAUSED,
     3320        MESSAGE_PERPKT_EOF,
     3321        MESSAGE_POST_REPORTER,
     3322        MESSAGE_POST_RANGE,
     3323        MESSAGE_TICK,
     3324        MESSAGE_USER
     3325};
     3326
     3327enum hasher_types {
     3328        /**
     3329         * Balance load across CPUs best as possible, this is basically to say do
     3330         * not care about hash. This might still might be implemented
     3331         * using a hash or round robin etc. under the hood depending on the format
     3332         */
     3333        HASHER_BALANCE,
     3334
     3335        /** Use a hash which is bi-directional for TCP flows, that is packets with
     3336         * the same hash are sent to the same thread. All non TCP packets will be
     3337         * sent to the same thread. UDP may or may not be sent to separate
     3338         * threads like TCP, this depends on the format support.
     3339         */
     3340        HASHER_BIDIRECTIONAL,
     3341       
     3342        /**
     3343         * Use a hash which is uni-directional across TCP flows, that means the
     3344         * opposite directions of the same 5 tuple might end up on separate cores.
     3345         * Otherwise is identical to HASHER_BIDIRECTIONAL
     3346         */
     3347        HASHER_UNIDIRECTIONAL,
     3348
     3349        /**
     3350         * Always use the user supplied hasher, this currently disables native
     3351         * support and is likely significantly slower.
     3352         */
     3353        HASHER_CUSTOM,
     3354
     3355        /**
     3356         * This is not a valid option, used internally only!!! TODO remove
     3357         * Set by the format if the hashing is going to be done in hardware
     3358         */
     3359        HASHER_HARDWARE
     3360};
     3361
     3362typedef struct libtrace_info_t {
     3363        /**
     3364         * True if a live format (i.e. packets have to be tracetime).
     3365         * Otherwise false, indicating packets can be read as fast
     3366         * as possible from the format.
     3367         */
     3368        bool live;
     3369
     3370        /**
     3371         * The maximum number of threads supported by a parallel trace. 1
     3372         * if parallel support is not native (in this case libtrace will simulate
     3373         * an unlimited number of threads), -1 means unlimited and 0 unknown.
     3374         */
     3375        int max_threads;
     3376
     3377        /* TODO hash fn supported list */
     3378
     3379        /* TODO consider time/clock details?? */
     3380} libtrace_info_t;
     3381
     3382DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value);
     3383DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3384DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3385DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data);
     3386DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace);
     3387
     3388/**
     3389 * Tuning the parallel sizes
     3390 */
     3391struct user_configuration {
     3392        // Packet memory cache settings (ocache_init) total
     3393        /**
     3394         * See diagrams, this sets the maximum size of freelist used to
     3395         * maintain packets and their memory buffers.
     3396         * NOTE setting this to less than recommend could cause deadlock a
     3397         * trace that manages its own packets.
     3398         * A unblockable error message will be printed.
     3399         */
     3400        size_t packet_cache_size;
     3401        /**
     3402         * Per thread local cache size for the packet freelist
     3403         */
     3404        size_t packet_thread_cache_size;
     3405        /**
     3406         * If true the total number of packets that can be created by a trace is limited
     3407         * to the packet_cache_size, otherwise once packet_cache_size is exceeded alloc
     3408         * and free will be used to create and free packets, this will be slower than
     3409         * using the freelist and could run a machine out of memory.
     3410         *
     3411         * However this does make it easier to ensure that deadlocks will not occur
     3412         * due to running out of packets
     3413         */
     3414        bool fixed_packet_count;
     3415        /**
     3416         * When reading from a single threaded input source to reduce
     3417         * lock contention a 'burst' of packets is read per pkt thread
     3418         * this determines the bursts size.
     3419         */
     3420        size_t burst_size;
     3421        // Each perpkt thread has a queue leading into the reporter
     3422        //size_t reporter_queue_size;
     3423
     3424        /**
     3425         * The tick interval - in milliseconds
     3426         * When a live trace is used messages are sent at the tick
     3427         * interval to ensure that all perpkt threads receive data
     3428         * this allows results to be printed in cases flows are
     3429         * not being directed to a certian thread, while still
     3430         * maintaining order.
     3431         */
     3432        size_t tick_interval;
     3433
     3434        /**
     3435         * Like the tick interval but used in the case of file format
     3436         * This specifies the number of packets before inserting a tick to
     3437         * every thread.
     3438         */
     3439        size_t tick_count;
     3440
     3441        /**
     3442         * The number of per packet threads requested, 0 means use default.
     3443         * Default typically be the number of processor threads detected less one or two.
     3444         */
     3445        size_t perpkt_threads;
     3446
     3447        /**
     3448         * See diagrams, this sets the maximum size of buffers used between
     3449         * the single hasher thread and the buffer.
     3450         * NOTE setting this to less than recommend could cause deadlock a
     3451         * trace that manages its own packets.
     3452         * A unblockable warning message will be printed to stderr in this case.
     3453         */
     3454        /** The number of packets that can queue per thread from hasher thread */
     3455        size_t hasher_queue_size;
     3456
     3457        /**
     3458         * If true use a polling hasher queue, that means that we will spin/or yeild
     3459         * when rather than blocking on a lock. This applies to both the hasher thread
     3460         * and perpkts reading the queues.
     3461         */
     3462        bool hasher_polling;
     3463
     3464        /**
     3465         * If true the reporter thread will continuously poll waiting for results
     3466         * if false they are only checked when a message is received, this message
     3467         * is controlled by reporter_thold.
     3468         */
     3469        bool reporter_polling;
     3470
     3471        /**
     3472         * Perpkt thread result queue size before triggering the reporter step to read results
     3473         */
     3474        size_t reporter_thold;
     3475
     3476        /**
     3477         * Prints a line to standard error for every state change
     3478         * for both the trace as a whole and for each thread.
     3479         */
     3480        bool debug_state;
     3481};
     3482#include <stdio.h>
     3483DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str);
     3484DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file);
     3485DLLEXPORT int libtrace_get_perpkt_count(libtrace_t* t);
     3486
     3487/**
     3488 * The methods we use to combine multiple outputs into a single output
     3489 * This is not considered a stable API however is public.
     3490 * Where possible use built in combiners
     3491 *
     3492 * NOTE this structure is duplicated per trace and as such can
     3493 * have functions rewritten, and in fact should if possible.
     3494 */
     3495typedef struct libtrace_combine libtrace_combine_t;
     3496struct libtrace_combine {
     3497
     3498        /**
     3499         * Called at the start of the trace to allow datastructures
     3500         * to be initilised and allow functions to be swapped if approriate.
     3501         *
     3502         * Also factors such as whether the trace is live or not can
     3503         * be used to determine the functions used.
     3504         * @return 0 if successful, -1 if an error occurs
     3505         */
     3506        int (*initialise)(libtrace_t *,libtrace_combine_t *);
     3507
     3508        /**
     3509         * Called when the trace ends, clean up any memory here
     3510         * from libtrace_t * init.
     3511         */
     3512        void (*destroy)(libtrace_t *, libtrace_combine_t *);
     3513
     3514        /**
     3515         * Publish a result against it's a threads queue.
     3516         * If null publish directly, expected to be used
     3517         * as a single threaded optimisation and can be
     3518         * set to NULL by init if this case is detected.
     3519         */
     3520        void (*publish)(libtrace_t *, int thread_id, libtrace_combine_t *, libtrace_result_t *);
     3521
     3522        /**
     3523         * Read as many results as possible from the trace.
     3524         * Directy calls the users code to handle results from here.
     3525         *
     3526         * THIS SHOULD BE NON-BLOCKING AND READ AS MANY AS POSSIBLE
     3527         * If publish is NULL, this probably should be NULL also otherwise
     3528         * it will not be called.
     3529         */
     3530        void (*read)(libtrace_t *, libtrace_combine_t *);
     3531
     3532        /**
     3533         * Called when the trace is finished to flush the final
     3534         * results to the reporter thread.
     3535         *
     3536         * There may be no results, in which case this should
     3537         * just return.
     3538         *
     3539         * Libtrace state:
     3540         * Called from reporter thread
     3541         * No perpkt threads will be running, i.e. publish will not be
     3542         * called again.
     3543         *
     3544         * If publish is NULL, this probably should be NULL also otherwise
     3545         * it will not be called.
     3546         */
     3547        void (*read_final)(libtrace_t *, libtrace_combine_t *);
     3548
     3549        /**
     3550         * Pause must make sure any results of the type packet are safe.
     3551         * That means trace_copy_packet() and destroy the original.
     3552         * This also should be NULL if publish is NULL.
     3553         */
     3554        void (*pause)(libtrace_t *, libtrace_combine_t *);
     3555
     3556        /**
     3557         * Data storage for all the combiner threads
     3558         */
     3559        void *queues;
     3560
     3561        /**
     3562         * Configuration options, what this does is upto the combiner
     3563         * chosen.
     3564         */
     3565        libtrace_generic_types_t configuration;
     3566};
     3567
     3568DLLEXPORT void trace_set_combiner(libtrace_t *trace, const libtrace_combine_t *combiner, libtrace_generic_types_t config);
     3569
     3570#define READ_EOF 0
     3571#define READ_ERROR -1
     3572#define READ_MESSAGE -2
     3573// Used for inband tick message
     3574#define READ_TICK -3
     3575
     3576#define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration));
     3577
    31493578#ifdef __cplusplus
    31503579} /* extern "C" */
  • lib/libtrace_int.h

    r10f924c rd994324  
    148148#endif
    149149
     150#include "data-struct/ring_buffer.h"
     151#include "data-struct/object_cache.h"
     152#include "data-struct/vector.h"
     153#include "data-struct/message_queue.h"
     154#include "data-struct/deque.h"
     155#include "data-struct/sliding_window.h"
    150156
    151157//#define RP_BUFSIZE 65536U
     
    166172        bool waiting;
    167173};
     174
     175enum thread_types {
     176        THREAD_EMPTY,
     177        THREAD_HASHER,
     178        THREAD_PERPKT,
     179        THREAD_REPORTER,
     180        THREAD_KEEPALIVE
     181};
     182
     183enum thread_states {
     184        THREAD_RUNNING,
     185        THREAD_FINISHING,
     186        THREAD_FINISHED,
     187        THREAD_PAUSED,
     188        THREAD_STATE_MAX
     189};
     190
     191/**
     192 * Information of this thread
     193 */
     194struct libtrace_thread_t {
     195        int accepted_packets; // The number of packets accepted only used if pread
     196        // is retreving packets
     197        // Set to true once the first packet has been stored
     198        bool recorded_first;
     199        // For thread safety reason we actually must store this here
     200        int64_t tracetime_offset_usec;
     201        void* user_data; // TLS for the user to use
     202        void* format_data; // TLS for the format to use
     203        libtrace_message_queue_t messages; // Message handling
     204        libtrace_ringbuffer_t rbuffer; // Input
     205        libtrace_t * trace;
     206        void* ret;
     207        enum thread_types type;
     208        enum thread_states state;
     209        pthread_t tid;
     210        int perpkt_num; // A number from 0-X that represents this perpkt threads number
     211                                // in the table, intended to quickly identify this thread
     212                                // -1 represents NA (such as the case this is not a perpkt thread)
     213};
     214
     215/**
     216 * Storage to note time value against each.
     217 * Used both internally to do trace time playback
     218 * and can be used externally to assist applications which need
     219 * a trace starting time such as tracertstats.
     220 */
     221struct first_packets {
     222        pthread_spinlock_t lock;
     223        size_t count; // If == perpkt_thread_count threads we have all
     224        size_t first; // Valid if count != 0
     225        struct __packet_storage_magic_type {
     226                libtrace_packet_t * packet;
     227                struct timeval tv;
     228        } * packets;
     229};
     230
     231#define TRACE_STATES \
     232        X(STATE_NEW) \
     233        X(STATE_RUNNING) \
     234        X(STATE_PAUSING) \
     235        X(STATE_PAUSED) \
     236        X(STATE_FINSHED) \
     237        X(STATE_DESTROYED) \
     238        X(STATE_JOINED) \
     239        X(STATE_ERROR)
     240
     241#define X(a) a,
     242enum trace_state {
     243        TRACE_STATES
     244};
     245#undef X
     246
     247#define X(a) case a: return #a;
     248static inline char *get_trace_state_name(enum trace_state ts){
     249        switch(ts) {
     250                TRACE_STATES
     251                default:
     252                        return "UNKNOWN";
     253        }
     254}
     255#undef X
    168256
    169257/** A libtrace input trace
     
    188276        uint64_t filtered_packets;     
    189277        /** The filename from the uri for the trace */
    190         char *uridata;                 
     278        char *uridata;
    191279        /** The libtrace IO reader for this trace (if applicable) */
    192         io_t *io;                       
     280        io_t *io;
    193281        /** Error information for the trace */
    194         libtrace_err_t err;             
     282        libtrace_err_t err;
    195283        /** Boolean flag indicating whether the trace has been started */
    196         bool started;                   
     284        bool started;
     285        /** Synchronise writes/reads across this format object and attached threads etc */
     286        pthread_mutex_t libtrace_lock;
     287        /** State */
     288        enum trace_state state;
     289        /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */
     290        pthread_cond_t perpkt_cond;
     291        /* Keep track of counts of threads in any given state */
     292        int perpkt_thread_states[THREAD_STATE_MAX];
     293
     294        /** For the sliding window hasher implementation */
     295        pthread_rwlock_t window_lock;
     296        /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */
     297        bool perpkt_queue_full;
     298        /** Global storage for this trace, shared among all the threads  */
     299        void* global_blob;
     300        /** The actual freelist */
     301        libtrace_ocache_t packet_freelist;
     302        /** User defined per_pkt function called when a pkt is ready */
     303        fn_per_pkt per_pkt;
     304        /** User defined reporter function entry point XXX not hooked up */
     305        fn_reporter reporter;
     306        /** The hasher function */
     307        enum hasher_types hasher_type;
     308        /** The hasher function - NULL implies they don't care or balance */
     309        fn_hasher hasher; // If valid using a separate thread
     310        void *hasher_data;
     311       
     312        libtrace_thread_t hasher_thread;
     313        libtrace_thread_t reporter_thread;
     314        libtrace_thread_t keepalive_thread;
     315        int perpkt_thread_count;
     316        libtrace_thread_t * perpkt_threads; // All our perpkt threads
     317        libtrace_slidingwindow_t sliding_window;
     318        sem_t sem;
     319        // Used to keep track of the first packet seen on each thread
     320        struct first_packets first_packets;
     321        int tracetime;
     322
     323        /*
     324         * Caches statistic counters in the case that our trace is
     325         * paused or stopped before this counter is taken
     326         */
     327        uint64_t dropped_packets;
     328        uint64_t received_packets;
     329        struct user_configuration config;
     330        libtrace_combine_t combiner;
    197331};
     332
     333void trace_fin_packet(libtrace_packet_t *packet);
     334void libtrace_zero_thread(libtrace_thread_t * t);
     335void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
     336libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
     337int get_thread_table_num(libtrace_t *libtrace);
     338
    198339
    199340/** A libtrace output trace
     
    202343struct libtrace_out_t {
    203344        /** The capture format for the output trace */
    204         struct libtrace_format_t *format;
     345        struct libtrace_format_t *format;
    205346        /** Pointer to the "global" data for the capture format module */
    206347        void *format_data;             
     
    210351        libtrace_err_t err;
    211352        /** Boolean flag indicating whether the trace has been started */
    212         bool started;                   
     353        bool started;
    213354};
    214355
     
    303444} PACKED libtrace_pflog_header_t;
    304445
    305 
    306 
    307446/** A libtrace capture format module */
    308447/* All functions should return -1, or NULL on failure */
     
    734873        /** Prints some useful help information to standard output. */
    735874        void (*help)(void);
    736 
     875       
    737876        /** Next pointer, should always be NULL - used by the format module
    738877         * manager. */
    739878        struct libtrace_format_t *next;
     879
     880        /** Holds information about the trace format */
     881        struct libtrace_info_t info;
     882
     883        /** Starts or unpauses an input trace in parallel mode - note that
     884         * this function is often the one that opens the file or device for
     885         * reading.
     886         *
     887         * @param libtrace      The input trace to be started or unpaused
     888         * @return If successful the number of threads started, 0 indicates
     889         *                 no threads started and this should be done automatically.
     890         *                 Otherwise in event of an error -1 is returned.
     891         *
     892         */
     893        int (*pstart_input)(libtrace_t *trace);
     894       
     895        /** Read a packet in the new parallel mode
     896         * @return same as read_packet, with the addition of return -2 to represent
     897         * interrupted due to message waiting. */
     898        int (*pread_packet)(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t *packet);
     899       
     900        /** Pause a parallel trace
     901         *
     902         * @param libtrace      The input trace to be paused
     903         */
     904        int (*ppause_input)(libtrace_t *trace);
     905       
     906        /** Called after all threads have been paused, Finish (close) a parallel trace
     907     *
     908         * @param libtrace      The input trace to be stopped
     909         */
     910        int (*pfin_input)(libtrace_t *trace);
     911       
     912        /** Applies a configuration option to an input trace.
     913         *
     914         * @param libtrace      The input trace to apply the option to
     915         * @param option        The option that is being configured
     916         * @param value         A pointer to the value that the option is to be
     917         *                      set to
     918         * @return 0 if successful, -1 if the option is unsupported or an error
     919         * occurs
     920         */
     921        int (*pconfig_input)(libtrace_t *libtrace,trace_parallel_option_t option,void *value);
     922
     923        /**
     924         * Register a thread for use with the format or using the packets produced
     925         * by it. This is NOT only used for threads reading packets infact all
     926         * threads use this.
     927         *
     928         * Some use cases include setting up any thread local storage required for
     929         * to read packets and free packets. For DPDK we require any thread that
     930         * may release or read a packet to have have an internal number associated
     931         * with it.
     932         *
     933         * The thread type can be used to see if this thread is going to be used
     934         * to read packets or otherwise.
     935         *
     936         * @return 0 if successful, -1 if the option is unsupported or an error
     937         * occurs (such as a maximum of threads being reached)
     938         */
     939        int (*pregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t, bool reader);
     940
     941        /**
     942         * If needed any memory allocated with pregister_thread can be released
     943         * in this function. The thread will be destroyed directly after this
     944         * function is called.
     945         */
     946        void (*punregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t);
    740947};
     948
     949/** Macro to zero out a single thread format */
     950#define NON_PARALLEL(live) \
     951{live, 1},              /* trace info */ \
     952NULL,                   /* pstart_input */ \
     953NULL,                   /* pread_packet */ \
     954NULL,                   /* ppause_input */ \
     955NULL,                   /* pfin_input */ \
     956NULL,                   /* pconfig_input */ \
     957NULL,                   /* pregister_thread */ \
     958NULL                    /* punregister_thread */
    741959
    742960/** The list of registered capture formats */
  • lib/trace.c

    r7fda5c5 rd994324  
    9999#include "rt_protocol.h"
    100100
     101#include <pthread.h>
     102#include <signal.h>
     103
    101104#define MAXOPTS 1024
    102105
     
    106109
    107110int libtrace_halt = 0;
     111
     112/* Set once pstart is called used for backwards compatibility reasons */
     113int libtrace_parallel = 0;
    108114
    109115/* strncpy is not assured to copy the final \0, so we
     
    253259        libtrace->filtered_packets = 0;
    254260        libtrace->accepted_packets = 0;
     261       
     262        /* Parallel inits */
     263        // libtrace->libtrace_lock
     264        // libtrace->perpkt_cond;
     265        libtrace->state = STATE_NEW;
     266        libtrace->perpkt_queue_full = false;
     267        libtrace->global_blob = NULL;
     268        libtrace->per_pkt = NULL;
     269        libtrace->reporter = NULL;
     270        libtrace->hasher = NULL;
     271        libtrace_zero_ocache(&libtrace->packet_freelist);
     272        libtrace_zero_thread(&libtrace->hasher_thread);
     273        libtrace_zero_thread(&libtrace->reporter_thread);
     274        libtrace_zero_thread(&libtrace->keepalive_thread);
     275        libtrace_zero_slidingwindow(&libtrace->sliding_window);
     276        libtrace->reporter_thread.type = THREAD_EMPTY;
     277        libtrace->perpkt_thread_count = 0;
     278        libtrace->perpkt_threads = NULL;
     279        libtrace->tracetime = 0;
     280        libtrace->first_packets.first = 0;
     281        libtrace->first_packets.count = 0;
     282        libtrace->first_packets.packets = NULL;
     283        libtrace->dropped_packets = UINT64_MAX;
     284        ZERO_USER_CONFIG(libtrace->config);
    255285
    256286        /* Parse the URI to determine what sort of trace we are dealing with */
     
    348378        libtrace->io = NULL;
    349379        libtrace->filtered_packets = 0;
     380       
     381        /* Parallel inits */
     382        // libtrace->libtrace_lock
     383        // libtrace->perpkt_cond;
     384        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
     385        libtrace->perpkt_queue_full = false;
     386        libtrace->global_blob = NULL;
     387        libtrace->per_pkt = NULL;
     388        libtrace->reporter = NULL;
     389        libtrace->hasher = NULL;
     390        libtrace_zero_ocache(&libtrace->packet_freelist);
     391        libtrace_zero_thread(&libtrace->hasher_thread);
     392        libtrace_zero_thread(&libtrace->reporter_thread);
     393        libtrace_zero_thread(&libtrace->keepalive_thread);
     394        libtrace_zero_slidingwindow(&libtrace->sliding_window);
     395        libtrace->reporter_thread.type = THREAD_EMPTY;
     396        libtrace->perpkt_thread_count = 0;
     397        libtrace->perpkt_threads = NULL;
     398        libtrace->tracetime = 0;
     399        ZERO_USER_CONFIG(libtrace->config);
    350400       
    351401        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    583633 */
    584634DLLEXPORT void trace_destroy(libtrace_t *libtrace) {
    585         assert(libtrace);
     635    int i;
     636        assert(libtrace);
     637
     638        /* destroy any packet that are still around */
     639        if (libtrace->state != STATE_NEW && libtrace->first_packets.packets) {
     640                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     641                        if(libtrace->first_packets.packets[i].packet) {
     642                                trace_destroy_packet(libtrace->first_packets.packets[i].packet);
     643                        }
     644                }
     645                free(libtrace->first_packets.packets);
     646                ASSERT_RET(pthread_spin_destroy(&libtrace->first_packets.lock), == 0);
     647        }
     648
    586649        if (libtrace->format) {
    587650                if (libtrace->started && libtrace->format->pause_input)
     
    590653                        libtrace->format->fin_input(libtrace);
    591654        }
    592         /* Need to free things! */
    593         if (libtrace->uridata)
     655        /* Need to free things! */
     656        if (libtrace->uridata)
    594657                free(libtrace->uridata);
     658       
     659        /* Empty any packet memory */
     660        if (libtrace->state != STATE_NEW) {
     661                // This has all of our packets
     662                libtrace_ocache_destroy(&libtrace->packet_freelist);
     663                if (libtrace->combiner.destroy)
     664                        libtrace->combiner.destroy(libtrace, &libtrace->combiner);
     665                free(libtrace->perpkt_threads);
     666                libtrace->perpkt_threads = NULL;
     667                libtrace->perpkt_thread_count = 0;
     668        }
     669       
    595670        if (libtrace->event.packet) {
    596671                /* Don't use trace_destroy_packet here - there is almost
     
    605680                 free(libtrace->event.packet);
    606681        }
    607         free(libtrace);
     682        free(libtrace);
    608683}
    609684
     
    633708}
    634709
    635 DLLEXPORT libtrace_packet_t *trace_create_packet(void) 
    636 {
    637         libtrace_packet_t *packet = 
     710DLLEXPORT libtrace_packet_t *trace_create_packet(void)
     711{
     712        libtrace_packet_t *packet =
    638713                (libtrace_packet_t*)calloc((size_t)1,sizeof(libtrace_packet_t));
    639714
     
    661736        dest->type=packet->type;
    662737        dest->buf_control=TRACE_CTRL_PACKET;
     738        dest->order = packet->order;
     739        dest->hash = packet->hash;
     740        dest->error = packet->error;
    663741        /* Reset the cache - better to recalculate than try to convert
    664742         * the values over to the new packet */
     
    675753 */
    676754DLLEXPORT void trace_destroy_packet(libtrace_packet_t *packet) {
     755        /* Free any resources possibly associated with the packet */
     756        if (libtrace_parallel && packet->trace && packet->trace->format->fin_packet) {
     757                packet->trace->format->fin_packet(packet);
     758        }
     759       
    677760        if (packet->buf_control == TRACE_CTRL_PACKET && packet->buffer) {
    678761                free(packet->buffer);
     
    683766                                 */
    684767        free(packet);
    685 }       
     768}
     769
     770/**
     771 * Removes any possible data stored againt the trace and releases any data.
     772 * This will not destroy a reusable good malloc'd buffer (TRACE_CTRL_PACKET)
     773 * use trace_destroy_packet() for those diabolical purposes.
     774 */
     775void trace_fin_packet(libtrace_packet_t *packet) {
     776        if (packet)
     777        {
     778                if (packet->trace && packet->trace->format->fin_packet) {
     779                        packet->trace->format->fin_packet(packet);
     780                        //gettimeofday(&tv, NULL);
     781                        //printf ("%d.%06d DESTROYED #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
     782                }
     783
     784                // No matter what we remove the header and link pointers
     785                packet->trace = NULL;
     786                packet->header = NULL;
     787                packet->payload = NULL;
     788
     789                if (packet->buf_control != TRACE_CTRL_PACKET)
     790                {
     791                        packet->buffer = NULL;
     792                }
     793
     794                packet->trace = NULL;
     795                packet->hash = 0;
     796                packet->order = 0;
     797                trace_clear_cache(packet);
     798        }
     799}
    686800
    687801/* Read one packet from the trace into buffer. Note that this function will
     
    707821        }
    708822        assert(packet);
    709      
    710         /* Store the trace we are reading from into the packet opaque
    711          * structure */
    712         packet->trace = libtrace;
    713 
    714         /* Finalise the packet, freeing any resources the format module
    715          * may have allocated it
    716          */
    717         if (libtrace->format->fin_packet) {
    718                 libtrace->format->fin_packet(packet);
    719         }
    720 
    721823
    722824        if (libtrace->format->read_packet) {
    723825                do {
    724826                        size_t ret;
    725                         int filtret;
    726                         /* Clear the packet cache */
    727                         trace_clear_cache(packet);
     827                        int filtret;
     828                        /* Finalise the packet, freeing any resources the format module
     829                         * may have allocated it and zeroing all data associated with it.
     830                         */
     831                        trace_fin_packet(packet);
     832                        /* Store the trace we are reading from into the packet opaque
     833                         * structure */
     834                        packet->trace = libtrace;
    728835                        ret=libtrace->format->read_packet(libtrace,packet);
    729836                        if (ret==(size_t)-1 || ret==0) {
     
    750857                                                libtrace->snaplen);
    751858                        }
     859                        trace_packet_set_order(packet, libtrace->accepted_packets);
    752860                        ++libtrace->accepted_packets;
    753861                        return ret;
     
    813921DLLEXPORT int trace_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    814922        assert(libtrace);
    815         assert(packet); 
     923        assert(packet);
    816924        /* Verify the packet is valid */
    817925        if (!libtrace->started) {
     
    9531061        }
    9541062
    955         return tv;
     1063    return tv;
    9561064}
    9571065
     
    11231231                 * function so don't increment them here.
    11241232                 */
    1125                 event=packet->trace->format->trace_event(trace,packet);
    1126         }
     1233                event=packet->trace->format->trace_event(trace,packet);
     1234                }
    11271235        return event;
    11281236
     
    12071315                libtrace_linktype_t linktype    ) {
    12081316#ifdef HAVE_BPF_FILTER
     1317        /* It just so happens that the underlying libs used by pthread arn't
     1318         * thread safe, namely lex/flex thingys, so single threaded compile
     1319         * multi threaded running should be safe.
     1320         */
     1321        static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    12091322        assert(filter);
    12101323
     
    12281341                                        "Unknown pcap equivalent linktype");
    12291342                        return -1;
     1343                }
     1344                assert (pthread_mutex_lock(&mutex) == 0);
     1345                /* Make sure not one bet us to this */
     1346                if (filter->flag) {
     1347                        printf("Someone bet us to compile the filter\n");
     1348                        assert (pthread_mutex_unlock(&mutex) == 0);
     1349                        return 1;
    12301350                }
    12311351                pcap=(pcap_t *)pcap_open_dead(
     
    12411361                                        pcap_geterr(pcap));
    12421362                        pcap_close(pcap);
     1363                        assert (pthread_mutex_unlock(&mutex) == 0);
    12431364                        return -1;
    12441365                }
    12451366                pcap_close(pcap);
    12461367                filter->flag=1;
     1368                assert (pthread_mutex_unlock(&mutex) == 0);
    12471369        }
    12481370        return 0;
     
    12641386        libtrace_linktype_t linktype;
    12651387        libtrace_packet_t *packet_copy = (libtrace_packet_t*)packet;
     1388#ifdef HAVE_LLVM
     1389        static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
     1390#endif
    12661391
    12671392        assert(filter);
     
    13141439         * what the link type was
    13151440         */
     1441        // Note internal mutex locking used here
    13161442        if (trace_bpf_compile(filter,packet_copy,linkptr,linktype)==-1) {
    13171443                if (free_packet_needed) {
     
    13241450#if HAVE_LLVM
    13251451        if (!filter->jitfilter) {
    1326                 filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
     1452                ASSERT_RET(pthread_mutex_lock(&mutex), == 0);
     1453                /* Again double check here like the bpf filter */
     1454                if(filter->jitfilter)
     1455                        printf("Someone bet us to compile the JIT thingy\n");
     1456                else
     1457                /* Looking at compile_program source this appears to be thread safe
     1458                 * however if this gets called twice we will leak this memory :(
     1459                 * as such lock here anyways */
     1460                        filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
     1461                ASSERT_RET(pthread_mutex_unlock(&mutex), == 0);
    13271462        }
    13281463#endif
     
    18051940{
    18061941        assert(trace);
     1942        uint64_t ret;
     1943
    18071944        if (trace->format->get_received_packets) {
    1808                 return trace->format->get_received_packets(trace);
    1809         }
    1810         return (uint64_t)-1;
     1945                if ((ret = trace->format->get_received_packets(trace)) != UINT64_MAX)
     1946                        return ret;
     1947        }
     1948        // Read this cached value taken before the trace was closed
     1949        return trace->received_packets;
    18111950}
    18121951
     
    18241963{
    18251964        assert(trace);
     1965        uint64_t ret;
     1966
    18261967        if (trace->format->get_dropped_packets) {
    1827                 return trace->format->get_dropped_packets(trace);
    1828         }
    1829         return (uint64_t)-1;
     1968                if ((ret = trace->format->get_dropped_packets(trace)) != UINT64_MAX)
     1969                        return ret;
     1970        }
     1971        // Read this cached value taken before the trace was closed
     1972        return trace->dropped_packets;
    18301973}
    18311974
     
    18331976{
    18341977        assert(trace);
    1835         return trace->accepted_packets;
     1978        int i = 0;
     1979        uint64_t ret = trace->accepted_packets;
     1980        for (i = 0; i < trace->perpkt_thread_count; i++) {
     1981                ret += trace->perpkt_threads[i].accepted_packets;
     1982        }
     1983        return ret;
    18361984}
    18371985
  • test/Makefile

    r262a093 rf051c1b  
    55
    66INCLUDE = -I$(PREFIX)/lib -I$(PREFIX)/libpacketdump
    7 CFLAGS = -Wall -Wimplicit -Wformat -W -pedantic -pipe -g -O2 \
     7CFLAGS = -Wall -Wimplicit -Wformat -W -pedantic -pipe -g -O2 -std=gnu99 -pthread \
    88                -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE -D_LARGEFILE64_SOURCE
    99CFLAGS += $(INCLUDE)
     
    1111LDLIBS = -L$(PREFIX)/lib/.libs -L$(PREFIX)/libpacketdump/.libs -ltrace -lpacketdump
    1212
     13BINS_DATASTRUCT = test-datastruct-vector test-datastruct-deque \
     14        test-datastruct-ringbuffer
     15BINS_PARALLEL = test-format-parallel test-format-parallel-hasher \
     16        test-format-parallel-singlethreaded test-format-parallel-stressthreads \
     17        test-format-parallel-singlethreaded-hasher test-format-parallel-reporter
     18
    1319BINS = test-pcap-bpf test-event test-time test-dir test-wireless test-errors \
    14         test-plen test-autodetect test-ports test-fragment test-live test-live-snaplen
     20        test-plen test-autodetect test-ports test-fragment test-live test-live-snaplen \
     21        $(BINS_DATASTRUCT) $(BINS_PARALLEL)
    1522
    1623.PHONY: all clean distclean install depend test
     
    1926
    2027clean:
    21         $(RM) $(BINS) $(OBJS) test-format  test-decode test-convert \
     28        $(RM) $(BINS) $(OBJS) test-format test-decode test-convert \
    2229        test-decode2 test-write test-drops test-convert2
    2330
  • tools/traceanon/Makefile.am

    rc0a5a50 r29bbef0  
    1 bin_PROGRAMS = traceanon
     1bin_PROGRAMS = traceanon traceanon_parallel
    22
    33man_MANS = traceanon.1
     
    66include ../Makefile.tools
    77traceanon_SOURCES = traceanon.c rijndael.h rijndael.c panon.h panon.c ipenc.c ipenc.h
     8traceanon_parallel_SOURCES = traceanon_parallel.c rijndael.h rijndael.c panon.h panon.c ipenc.c ipenc.h
    89
    910# rijndael.c does lots of nasty casting that is going to be a nightmare to fix
     
    1112# messy and hopefully isn't actually an issue.
    1213traceanon_CFLAGS = $(AM_CFLAGS)
     14traceanon_parallel_CFLAGS = $(AM_CFLAGS)
  • tools/traceanon/panon.c

    ra3041a4 r29bbef0  
    88#include "panon.h"
    99
    10 static uint8_t m_key[16];
    11 static uint8_t m_pad[16];
     10static __thread uint8_t m_key[16];
     11static __thread  uint8_t m_pad[16];
    1212
    1313#define CACHEBITS 20
     
    1616//static uint32_t enc_cache[CACHESIZE];
    1717
    18 static uint32_t *enc_cache = 0;
    19 static uint32_t fullcache[2][2];
     18static __thread  uint32_t *enc_cache = 0; // Should be ok shared across multiple
     19static __thread  uint32_t fullcache[2][2]; // Needs to be against on thread
    2020
    2121
  • tools/tracertstats/Makefile.am

    r530bcf0 r29bbef0  
    1 bin_PROGRAMS = tracertstats
     1bin_PROGRAMS = tracertstats tracertstats_parallel
    22man_MANS = tracertstats.1
    33EXTRA_DIST = $(man_MANS)
     
    1616tracertstats_SOURCES = tracertstats.c output.h output.c $(OUTPUT_MODULES)
    1717tracertstats_LDADD = -ltrace $(OUTPUT_PNG_LD)
     18tracertstats_parallel_SOURCES = tracertstats_parallel.c output.h output.c $(OUTPUT_MODULES)
     19tracertstats_parallel_LDADD = -ltrace $(OUTPUT_PNG_LD)
  • tools/tracestats/Makefile.am

    r3b8a5ef r29bbef0  
    1 bin_PROGRAMS = tracestats
     1bin_PROGRAMS = tracestats tracestats_parallel
    22bin_SCRIPTS = tracesummary
    33
     
    77include ../Makefile.tools
    88tracestats_SOURCES = tracestats.c
     9tracestats_parallel_SOURCES = tracestats_parallel.c
Note: See TracChangeset for help on using the changeset viewer.