Changeset 50ce607


Ignore:
Timestamp:
07/18/14 14:20:32 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
b4b6b75
Parents:
8af0d01
Message:

Adds per thread storage to for the format to use against libtrace_threads.
Passes threads as arguments to reads to save overhead of looking these up.
Various changes to the DPDK system including registering a thread to allow our threads to be start with different DPDK thread numbers for thread local memory caches.

Location:
lib
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdk.c

    r8af0d01 r50ce607  
    4141 */
    4242
     43#define _GNU_SOURCE
     44
    4345#include "config.h"
    4446#include "libtrace.h"
     
    7678#include <rte_lcore.h>
    7779#include <rte_per_lcore.h>
     80#include <pthread.h>
    7881
    7982/* The default size of memory buffers to use - This is the max size of standard
     
    111114#define MBUF_PKTDATA(x) ((char *) x + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
    112115#define FORMAT(x) ((struct dpdk_format_data_t*)(x->format_data))
     116#define PERPKT_FORMAT(x) ((struct dpdk_per_lcore_t*)(x->format_data))
     117
    113118#define TV_TO_NS(tv) ((uint64_t) tv.tv_sec*1000000000ull + \
    114119                        (uint64_t) tv.tv_usec*1000ull)
     
    182187};
    183188
    184 struct per_lcore_t
     189struct dpdk_per_lcore_t
    185190{
    186191        // TODO move time stamp stuff here
    187192        uint16_t queue_id;
    188193        uint8_t port;
    189         uint8_t enabled;
    190194};
    191195
     
    213217    uint32_t wrap_count; /* Number of times the NIC clock has wrapped around completely */
    214218#endif
    215         // DPDK normally seems to have a limit of
    216         struct per_lcore_t per_lcore[RTE_MAX_LCORE];
     219        // DPDK normally seems to have a limit of 8 queues for a given card
     220        struct dpdk_per_lcore_t per_lcore[RTE_MAX_LCORE];
    217221};
    218222
     
    387391#endif
    388392
     393/**
     394 * Expects to be called from the master lcore and moves it to the given dpdk id
     395 * @param core (zero indexed) If core is on the physical system affinity is bound otherwise
     396 *               affinity is set to all cores. Must be less than RTE_MAX_LCORE
     397 *               and not already in use.
     398 * @return 0 is successful otherwise -1 on error.
     399 */
     400static inline int dpdk_move_master_lcore(size_t core) {
     401    struct rte_config *cfg = rte_eal_get_configuration();
     402    cpu_set_t cpuset;
     403    int i;
     404
     405    assert (core < RTE_MAX_LCORE);
     406    assert (rte_get_master_lcore() == rte_lcore_id());
     407
     408    if (core == rte_lcore_id())
     409        return 0;
     410
     411    // Make sure we are not overwriting someone else
     412    assert(!rte_lcore_is_enabled(core));
     413
     414    // Move the core
     415    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     416    cfg->lcore_role[core] = ROLE_RTE;
     417    lcore_config[core].thread_id = lcore_config[rte_lcore_id()].thread_id;
     418    rte_eal_get_configuration()->master_lcore = core;
     419    RTE_PER_LCORE(_lcore_id) = core;
     420
     421    // Now change the affinity
     422    CPU_ZERO(&cpuset);
     423
     424    if (lcore_config[core].detected) {
     425        CPU_SET(core, &cpuset);
     426    } else {
     427        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     428            if (lcore_config[i].detected)
     429                CPU_SET(i, &cpuset);
     430        }
     431    }
     432
     433    i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     434    if (i != 0) {
     435        // TODO proper libtrace style error here!!
     436        fprintf(stderr, "pthread_setaffinity_np failed\n");
     437        return -1;
     438    }
     439    return 0;
     440}
     441
     442
    389443static inline int dpdk_init_enviroment(char * uridata, struct dpdk_format_data_t * format_data,
    390444                                        char * err, int errlen) {
     
    395449    long nb_cpu; /* The number of CPUs in the system */
    396450    long my_cpu; /* The CPU number we want to bind to */
     451    int i;
     452    struct rte_config *cfg = rte_eal_get_configuration();
    397453   
    398454#if DEBUG
     
    402458#endif
    403459    /* Using proc-type auto allows this to be either primary or secondary
    404      * Secondary allows two intances of libtrace to be used on different
     460     * Secondary allows two instances of libtrace to be used on different
    405461     * ports. However current version of DPDK doesn't support this on the
    406      * same card (My understanding is this should work with two seperate
     462     * same card (My understanding is this should work with two separate
    407463     * cards).
    408464     *
     
    415471    int argc = sizeof(argv) / sizeof(argv[0]) - 1;
    416472   
    417     /* This initilises the Enviroment Abstraction Layer (EAL)
     473    /* This initialises the Environment Abstraction Layer (EAL)
    418474     * If we had slave workers these are put into WAITING state
    419475     *
     
    457513    }
    458514
    459     /* Make our mask */ //  0x1 << (my_cpu - 1)
    460     snprintf(cpu_number, sizeof(cpu_number), "%x", 0x3);
     515    /* Make our mask with all cores turned on this is so that DPDK to gets CPU
     516       info older versions */
     517    snprintf(cpu_number, sizeof(cpu_number), "%x", ~(UINT32_MAX<<MIN(31, nb_cpu)));
     518    //snprintf(cpu_number, sizeof(cpu_number), "%x", 0x1 << (my_cpu - 1));
    461519
    462520        /* Give this a name */
     
    470528        return -1;
    471529    }
     530
     531    // These are still running but will never do anything with DPDK v1.7 we
     532    // should remove this XXX in the future
     533    for(i = 0; i < RTE_MAX_LCORE; ++i) {
     534        if (rte_lcore_is_enabled(i) && i != rte_get_master_lcore()) {
     535            cfg->lcore_role[i] = ROLE_OFF;
     536            cfg->lcore_count--;
     537        }
     538    }
     539    // Only the master should be running
     540    assert(cfg->lcore_count == 1);
     541
     542    dpdk_move_master_lcore(my_cpu-1);
     543
    472544#if DEBUG
    473545    dump_configuration();
     
    512584    char err[500];
    513585    err[0] = 0;
    514     int i;
    515586   
    516587    libtrace->format_data = (struct dpdk_format_data_t *)
     
    532603    FORMAT(libtrace)->wrap_count = 0;
    533604#endif
    534         for (i = 0;i < RTE_MAX_LCORE; i++) {
    535                 // Disabled by default
    536                 FORMAT(libtrace)->per_lcore[i].enabled = 0;
    537         }
    538605       
    539606    if (dpdk_init_enviroment(libtrace->uridata, FORMAT(libtrace), err, sizeof(err)) != 0) {
     
    598665                                        return -1;
    599666                        }
     667        break;
    600668        }
    601669        return -1;
     
    868936}
    869937
    870 /* Attach memory to the port and start the port or restart the ports.
    871  */
    872 static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t * format_data, char *err, int errlen, uint16_t rx_queues){
     938/* Attach memory to the port and start (or restart) the port/s.
     939 */
     940static int dpdk_start_port_queues (libtrace_t *libtrace, struct dpdk_format_data_t *format_data, char *err, int errlen, uint16_t rx_queues){
    873941    int ret, i; /* Check return values for errors */
    874942    struct rte_eth_link link_info; /* Wait for link */
     
    900968
    901969        /* Create the mbuf pool, which is the place our packets are allocated
    902          * from - TODO figure out if there is is a free function (I cannot see one) 
     970         * from - TODO figure out if there is is a free function (I cannot see one)
    903971         * NOTE: RX queue requires nb_packets + 1 otherwise it fails to
    904972         * allocate however that extra 1 packet is not used.
     
    9681036                                                                format_data->nb_rx_buf, SOCKET_ID_ANY,
    9691037                                                                &rx_conf, format_data->pktmbuf_pool);
     1038        /* Init per_thread data structures */
     1039        format_data->per_lcore[i].port = format_data->port;
     1040        format_data->per_lcore[i].queue_id = i;
     1041
    9701042                if (ret < 0) {
    9711043                        snprintf(err, errlen, "Intel DPDK - Cannot configure RX queue on port"
     
    9771049   
    9781050#if DEBUG
    979     printf("Doing start device\n");
     1051    fprintf(stderr, "Doing start device\n");
    9801052#endif 
    9811053    /* Start device */
     
    10321104}
    10331105
    1034 static inline size_t dpdk_get_max_rx_queues(uint8_t port_id) {
     1106static inline size_t dpdk_get_max_rx_queues (uint8_t port_id) {
    10351107    struct rte_eth_dev_info dev_info;
    1036     rte_eth_dev_info_get(0, &dev_info);
     1108    rte_eth_dev_info_get(port_id, &dev_info);
    10371109    return dev_info.max_rx_queues;
     1110}
     1111
     1112static inline size_t dpdk_processor_count () {
     1113    long nb_cpu = sysconf(_SC_NPROCESSORS_ONLN);
     1114    if (nb_cpu <= 0)
     1115        return 1;
     1116    else
     1117        return (size_t) nb_cpu;
    10381118}
    10391119
    10401120static int dpdk_pstart_input (libtrace_t *libtrace) {
    10411121    char err[500];
    1042     int enabled_lcore_count = 0, i=0;
     1122    int i=0, phys_cores=0;
    10431123    int tot = libtrace->perpkt_thread_count;
    10441124    err[0] = 0;
    1045        
    1046         libtrace->perpkt_thread_count;
    1047        
    1048         for (i = 0; i < RTE_MAX_LCORE; i++)
    1049         {
    1050                 if (rte_lcore_is_enabled(i))
    1051                         enabled_lcore_count++;
    1052         }
    1053        
    1054         tot = MIN(libtrace->perpkt_thread_count, enabled_lcore_count);
    1055         tot = MIN(tot, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
    1056         fprintf(stderr, "Running pstart DPDK %d %d %d %d\n", tot, libtrace->perpkt_thread_count, enabled_lcore_count, rte_lcore_count());
     1125
     1126    if (rte_lcore_id() != rte_get_master_lcore())
     1127        fprintf(stderr, "Warning dpdk_pstart_input should be called from the master DPDK thread!\n");
     1128
     1129    // If the master is not on the last thread we move it there
     1130    if (rte_get_master_lcore() != RTE_MAX_LCORE - 1) {
     1131        // Consider error handling here
     1132        dpdk_move_master_lcore(RTE_MAX_LCORE - 1) == -1;
     1133    }
     1134
     1135    // Don't exceed the number of cores in the system/detected by dpdk
     1136    // We don't have to force this but performance wont be good if we don't
     1137    for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1138        if (lcore_config[i].detected) {
     1139            if (rte_lcore_is_enabled(i))
     1140                fprintf(stderr, "Found core %d already in use!\n", i);
     1141            else
     1142                phys_cores++;
     1143        }
     1144    }
     1145
     1146        tot = MIN(libtrace->perpkt_thread_count, dpdk_get_max_rx_queues(FORMAT(libtrace)->port));
     1147    tot = MIN(tot, phys_cores);
     1148
     1149        fprintf(stderr, "Running pstart DPDK tot=%d req=%d phys=%d\n", tot, libtrace->perpkt_thread_count, phys_cores);
    10571150       
    10581151    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
     
    10621155        return -1;
    10631156    }
    1064    
     1157
     1158    // Make sure we only start the number that we should
     1159    libtrace->perpkt_thread_count = tot;
    10651160    return 0;
    1066     return tot;
     1161}
     1162
     1163
     1164/**
     1165 * Register a thread with the DPDK system,
     1166 * When we start DPDK in parallel libtrace we move the 'main thread' to the
     1167 * MAXIMUM CPU core slot (32) and remove any affinity restrictions DPDK
     1168 * gives it.
     1169 *
     1170 * We then allow a mapper thread to be started on every real core as DPDK would
     1171 * we also bind these to the corresponding CPU cores.
     1172 *
     1173 * @param libtrace A pointer to the trace
     1174 * @param reading True if the thread will be used to read packets, i.e. will
     1175 *                call pread_packet(), false if thread used to process packet
     1176 *                in any other manner including statistics functions.
     1177 */
     1178static int dpdk_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading)
     1179{
     1180    struct rte_config *cfg = rte_eal_get_configuration();
     1181    int i;
     1182    int new_id = -1;
     1183
     1184    // If 'reading packets' fill in cores from 0 up and bind affinity
     1185    // otherwise start from the MAX core (which is also the master) and work backwards
     1186    // in this case physical cores on the system will not exist so we don't bind
     1187    // these to any particular physical core
     1188    if (reading) {
     1189        for (i = 0; i < RTE_MAX_LCORE; ++i) {
     1190            if (!rte_lcore_is_enabled(i)) {
     1191                new_id = i;
     1192                if (!lcore_config[i].detected)
     1193                    fprintf(stderr, "Warning the number of 'reading' threads exceed cores on machine!!\n");
     1194                break;
     1195            }
     1196        }
     1197    } else {
     1198        for (i = RTE_MAX_LCORE-1; i >= 0; --i) {
     1199            if (!rte_lcore_is_enabled(i)) {
     1200                new_id = i;
     1201                break;
     1202            }
     1203        }
     1204    }
     1205
     1206    if (new_id == -1) {
     1207        assert(cfg->lcore_count == RTE_MAX_LCORE);
     1208        // TODO proper libtrace style error here!!
     1209        fprintf(stderr, "Too many threads for DPDK!!\n");
     1210        return -1;
     1211    }
     1212
     1213    // Enable the core in global DPDK structs
     1214    cfg->lcore_role[new_id] = ROLE_RTE;
     1215    cfg->lcore_count++;
     1216    // Set TLS to reflect our new number
     1217    assert(rte_lcore_id() == 0); // I think new threads are going get a default thread number of 0
     1218    fprintf(stderr, "original id%d", rte_lcore_id());
     1219    RTE_PER_LCORE(_lcore_id) = new_id;
     1220    fprintf(stderr, " new id%d\n", rte_lcore_id());
     1221
     1222    if (reading) {
     1223        // Set affinity bind to corresponding core
     1224        cpu_set_t cpuset;
     1225        CPU_ZERO(&cpuset);
     1226        CPU_SET(rte_lcore_id(), &cpuset);
     1227        i = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
     1228        if (i != 0) {
     1229            fprintf(stderr, "Warning pthread_setaffinity_np failed\n");
     1230            return -1;
     1231        }
     1232    }
     1233
     1234    // Map our TLS to the thread data
     1235    if (reading) {
     1236        if(t->type == THREAD_PERPKT) {
     1237            t->format_data = &FORMAT(libtrace)->per_lcore[t->perpkt_num];
     1238        } else {
     1239            t->format_data = &FORMAT(libtrace)->per_lcore[0];
     1240        }
     1241    }
     1242}
     1243
     1244
     1245/**
     1246 * Unregister a thread with the DPDK system.
     1247 *
     1248 * Only previously registered threads should be calling this just before
     1249 * they are destroyed.
     1250 */
     1251static int dpdk_punregister_thread(libtrace_t libtrace, libtrace_thread_t *t UNUSED)
     1252{
     1253    struct rte_config *cfg = rte_eal_get_configuration();
     1254
     1255    assert(rte_lcore_id() >= 0 && rte_lcore_id() < RTE_MAX_LCORE);
     1256
     1257    // Skip if master!!
     1258    if (rte_lcore_id() == rte_get_master_lcore()) {
     1259        fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n");
     1260        return 0;
     1261    }
     1262
     1263    // Disable this core in global DPDK structs
     1264    cfg->lcore_role[rte_lcore_id()] = ROLE_OFF;
     1265    cfg->lcore_count--;
     1266    RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again
     1267    assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!!
     1268    return 0;
    10671269}
    10681270
     
    14421644    return -1;
    14431645}
    1444 libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
    1445 static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_packet_t *packet) {
     1646
     1647static int dpdk_pread_packet (libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
    14461648    int nb_rx; /* Number of rx packets we've recevied */
    14471649    struct rte_mbuf* pkts_burst[1]; /* Array of 1 pointer(s) */
     
    14501652    if (packet->buffer != NULL) {
    14511653        /* Buffer is owned by DPDK */
    1452         if ( packet->buf_control == TRACE_CTRL_EXTERNAL ) {
     1654        if ( packet->buf_control == TRACE_CTRL_EXTERNAL) {
    14531655            rte_pktmbuf_free(packet->buffer);
    14541656            packet->buffer = NULL;
     
    14671669    while (1) {
    14681670        /* Poll for a single packet */
    1469         nb_rx = rte_eth_rx_burst(FORMAT(libtrace)->port,
    1470                             get_thread_table_num(libtrace), pkts_burst, 1);
     1671        nb_rx = rte_eth_rx_burst(PERPKT_FORMAT(t)->port,
     1672                            PERPKT_FORMAT(t)->queue_id, pkts_burst, 1);
    14711673        if (nb_rx > 0) { /* Got a packet - otherwise we keep spining */
    14721674                        //fprintf(stderr, "Doing P READ PACKET port=%d q=%d\n", (int) FORMAT(libtrace)->port, (int) get_thread_table_num(libtrace));
     
    14741676        }
    14751677        // Check the message queue this could be (Well it shouldn't but anyway) be less than 0
    1476         if (libtrace_message_queue_count(&(get_thread_table(libtrace)->messages)) > 0) {
     1678        if (libtrace_message_queue_count(&t->messages) > 0) {
    14771679                        printf("Extra message yay");
    14781680                        return -2;
     
    16871889        dpdk_pause_input, /* ppause */
    16881890        dpdk_fin_input, /* p_fin */
    1689         dpdk_pconfig_input /* pconfig_input */
     1891        dpdk_pconfig_input, /* pconfig_input */
     1892    dpdk_pregister_thread, /* pregister_thread */
     1893    dpdk_punregister_thread /* unpregister_thread */
    16901894};
    16911895
  • lib/format_linux.c

    re3a639a r50ce607  
    214214        // The flag layout should be the same for all (I Hope)
    215215        // max_order
    216 };
     216} ALIGN_STRUCT(CACHE_LINE_SIZE);
    217217
    218218struct linux_format_data_t {
     
    305305
    306306#define FORMAT(x) ((struct linux_format_data_t*)(x))
     307#define PERPKT_FORMAT(x) ((struct linux_per_thread_t*)(x->format_data))
    307308#define DATAOUT(x) ((struct linux_output_format_data_t*)((x)->format_data))
    308309
     
    696697       
    697698        if (!FORMAT(libtrace->format_data)->per_thread) {
    698                 per_thread = calloc(tot, sizeof(struct linux_per_thread_t));
     699                //per_thread = calloc(tot, sizeof(struct linux_per_thread_t));
     700                posix_memalign((void **)&per_thread, CACHE_LINE_SIZE, tot*sizeof(struct linux_per_thread_t));
    699701                FORMAT(libtrace->format_data)->per_thread = per_thread;
    700702        } else {
     
    750752}
    751753
     754static int linux_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading) {
     755        fprintf(stderr, "registering thread %d!!\n", t->perpkt_num);
     756    if (reading) {
     757        if(t->type == THREAD_PERPKT) {
     758            t->format_data = &FORMAT(libtrace->format_data)->per_thread[t->perpkt_num];
     759        } else {
     760            t->format_data = &FORMAT(libtrace->format_data)->per_thread[0];
     761        }
     762    }
     763    return 0;
     764}
     765
    752766static int linuxnative_start_output(libtrace_out_t *libtrace)
    753767{
     
    10961110                // Check for a packet - TODO only Linux has MSG_DONTWAIT should use fctl O_NONBLOCK
    10971111                hdr->wirelen = recvmsg(fd, &msghdr, MSG_DONTWAIT);
    1098                 if ((unsigned) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
     1112                if ((int) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
    10991113                        // Do message queue check or select
    11001114                        int ret;
     
    11991213}
    12001214
    1201 static int linuxnative_pread_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
    1202 {
    1203         int fd = FORMAT(libtrace->format_data)->per_thread[get_thread_table_num(libtrace)].fd;
    1204         //printf("Thread number is #%d point %p\n", get_thread_table_num(libtrace), FORMAT(libtrace->format_data)->per_thread);
     1215static int linuxnative_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet)
     1216{
     1217        int fd = FORMAT(libtrace->format_data)->fd;
     1218        fprintf(stderr, "Thread number is #%d\n", t->perpkt_num);
    12051219        return linuxnative_read_packet_fd(libtrace, packet, fd, 1);
    12061220}
     
    13421356}
    13431357
    1344 static int linuxring_pread_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1345         int tnum = get_thread_table_num(libtrace);
    1346         int fd = FORMAT(libtrace->format_data)->per_thread[tnum].fd;
    1347         int *rxring_offset = &FORMAT(libtrace->format_data)->per_thread[tnum].rxring_offset;
    1348         char *rx_ring = FORMAT(libtrace->format_data)->per_thread[tnum].rx_ring;
    1349         printf("Thread number is #%d point %p\n", get_thread_table_num(libtrace), FORMAT(libtrace->format_data)->per_thread);
     1358static int linuxring_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
     1359        fprintf(stderr, "Thread number is #%d\n", t->perpkt_num);
     1360        int fd = PERPKT_FORMAT(t)->fd;
     1361        int *rxring_offset = &PERPKT_FORMAT(t)->rxring_offset;
     1362        char *rx_ring = PERPKT_FORMAT(t)->rx_ring;
    13501363        return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 1);
    13511364}
     
    14061419
    14071420        return ret;
    1408 
    1409 }
     1421}
     1422
    14101423static int linuxring_write_packet(libtrace_out_t *trace,
    14111424                libtrace_packet_t *packet)
     
    18441857        linuxnative_ppause_input,                       /* ppause */
    18451858        linuxnative_fin_input,                          /* p_fin */
    1846         linuxnative_pconfig_input                       /* pconfig input */
     1859        linuxnative_pconfig_input,                      /* pconfig input */
     1860        linux_pregister_thread,
     1861        NULL
    18471862};
    18481863
     
    18931908        linuxnative_ppause_input,                       /* ppause */
    18941909        linuxnative_fin_input,                          /* p_fin */
    1895         linuxnative_pconfig_input
    1896        
     1910        linuxnative_pconfig_input,
     1911        linux_pregister_thread,
     1912        NULL
    18971913};
    18981914#else
  • lib/libtrace.h.in

    r9594cf9 r50ce607  
    194194#  define PRINTF(formatpos, argpos)
    195195#endif
     196
     197// Used to fight against false sharing
     198#define CACHE_LINE_SIZE 64
     199#define ALIGN_STRUCT(x) __attribute__((aligned(x)))
    196200
    197201#ifdef _MSC_VER
     
    31633167
    31643168DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer);
    3165 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet);
     3169DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet);
    31663170DLLEXPORT int trace_ppause(libtrace_t *libtrace);
    31673171DLLEXPORT int trace_pstop(libtrace_t *libtrace);
  • lib/libtrace_int.h

    r3296252 r50ce607  
    208208        enum thread_states state;
    209209        void* user_data; // TLS for the user to use
     210        void* format_data; // TLS for the format to use
    210211        pthread_t tid;
    211212        int perpkt_num; // A number from 0-X that represents this perpkt threads number
     
    912913         * @return same as read_packet, with the addition of return -2 to represent
    913914         * interrupted due to message waiting. */
    914         int (*pread_packet)(libtrace_t *trace, libtrace_packet_t *packet);
     915        int (*pread_packet)(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t *packet);
    915916       
    916917        /** Pause a parallel trace
     
    936937         */
    937938        int (*pconfig_input)(libtrace_t *libtrace,trace_parallel_option_t option,void *value);
    938                
     939
     940        /**
     941         * Register a thread for use with the format or using the packets produced
     942         * by it. This is NOT only used for threads reading packets infact all
     943         * threads use this.
     944         *
     945         * Some use cases include setting up any thread local storage required for
     946         * to read packets and free packets. For DPDK we require any thread that
     947         * may release or read a packet to have have an internal number associated
     948         * with it.
     949         *
     950         * The thread type can be used to see if this thread is going to be used
     951         * to read packets or otherwise.
     952         *
     953         * @return 0 if successful, -1 if the option is unsupported or an error
     954         * occurs (such as a maximum of threads being reached)
     955         */
     956        int (*pregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t, bool reader);
     957
     958        /**
     959         * If needed any memory allocated with pregister_thread can be released
     960         * in this function. The thread will be destroyed directly after this
     961         * function is called.
     962         */
     963        void (*punregister_thread)(libtrace_t *libtrace, libtrace_thread_t *t);
    939964};
    940965
     
    946971NULL,                   /* ppause_input */ \
    947972NULL,                   /* pfin_input */ \
    948 NULL,                   /* pconfig_input */
     973NULL,                   /* pconfig_input */ \
     974NULL,                   /* pregister_thread */ \
     975NULL                    /* punregister_thread */
    949976
    950977/** The list of registered capture formats */
  • lib/trace_parallel.c

    r3296252 r50ce607  
    317317        assert(t);
    318318        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
     319        if (trace->format->pregister_thread) {
     320                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
     321        }
    319322        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    320323
     
    343346                                                // The hasher has stopped by this point, so the queue shouldn't be filling
    344347                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    345                                                         psize = trace_pread_packet(trace, &packet);
     348                                                        psize = trace_pread_packet(trace, t, &packet);
    346349                                                        if (psize > 0) {
    347350                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
     
    375378                        }
    376379                } else {
    377                         psize = trace_pread_packet(trace, &packet);
     380                        psize = trace_pread_packet(trace, t, &packet);
    378381                }
    379382
     
    413416        trace_send_message_to_reducer(trace, &message);
    414417
     418        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     419        if (trace->format->punregister_thread) {
     420                trace->format->punregister_thread(trace, t);
     421        }
     422        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     423
    415424        pthread_exit(NULL);
    416425};
     
    434443        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
    435444        printf("Hasher Thread started\n");
     445        if (trace->format->pregister_thread) {
     446                trace->format->pregister_thread(trace, t, true);
     447        }
    436448        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    437449        int pkt_skipped = 0;
     
    514526        message.additional.uint64 = 0;
    515527        trace_send_message_to_reducer(trace, &message);
     528        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     529        if (trace->format->pregister_thread) {
     530                trace->format->punregister_thread(trace, t);
     531        }
     532        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    516533
    517534        // TODO remove from TTABLE t sometime
     
    539556 * lock to read a packet from the underlying trace.
    540557 */
    541 inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
     558inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    542559{
    543560        // We need this to fill the 'first' packet table
    544         libtrace_thread_t *t = get_thread_table(libtrace);
    545561        if (!*packet) {
    546562                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
     
    565581 * 2. Move that into the packet provided (packet)
    566582 */
    567 inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
    568 {
    569         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    570         libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
    571 
     583inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
     584{
    572585        if (*packet) // Recycle the old get the new
    573586                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
     
    675688 * 2. Move that into the packet provided (packet)
    676689 */
    677 inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
    678 {
    679         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    680         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     690inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
     691{
    681692        int thread, ret/*, psize*/;
    682693
     
    696707                // Another thread cannot write a packet because a queue has filled up. Is it ours?
    697708                if (libtrace->perpkt_queue_full) {
    698                         contention_stats[this_thread].wait_for_fill_complete_hits++;
     709                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    699710                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    700711                        continue;
     
    718729                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
    719730                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
    720                 if (thread == this_thread) {
     731                if (thread == t->perpkt_num) {
    721732                        // If it's this thread we must be in order because we checked the buffer once we got the lock
    722733                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    728739                                libtrace->perpkt_queue_full = true;
    729740                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    730                                 contention_stats[this_thread].full_queue_hits++;
     741                                contention_stats[t->perpkt_num].full_queue_hits++;
    731742                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    732743                        }
     
    754765 * 2. Move that into the packet provided (packet)
    755766 */
    756 inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
    757 {
    758         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    759         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     767inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
     768{
    760769        int ret, i, thread/*, psize*/;
    761770
     
    785794                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    786795                        assert(sem_post(&libtrace->sem) == 0);
    787                         contention_stats[this_thread].wait_for_fill_complete_hits++;
     796                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    788797                        continue;
    789798                }
     
    833842                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
    834843                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
    835                                                 if (this_thread == thread)
     844                                                if (t->perpkt_num == thread)
    836845                                                {
    837846                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
     
    855864                                                        assert(sem_post(&libtrace->sem) == 0);
    856865
    857                                                 contention_stats[this_thread].full_queue_hits++;
     866                                                contention_stats[t->perpkt_num].full_queue_hits++;
    858867                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
    859868                                                // Grab these back
     
    10481057 *
    10491058 */
    1050 static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1059static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
    10511060
    10521061        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
     
    10731082                         * structure */
    10741083                        packet->trace = libtrace;
    1075                         ret=libtrace->format->pread_packet(libtrace,packet);
     1084                        ret=libtrace->format->pread_packet(libtrace, t, packet);
    10761085                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
    10771086                                return ret;
     
    11031112 * Read a packet from the parallel trace
    11041113 */
    1105 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
     1114DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    11061115{
    11071116        int ret;
    1108         libtrace_thread_t *t = get_thread_table(libtrace);
    11091117
    11101118        // Cleanup the packet passed back
     
    11151123                if (!*packet)
    11161124                        *packet = trace_create_packet();
    1117                 ret = trace_pread_packet_wrapper(libtrace, *packet);
     1125                ret = trace_pread_packet_wrapper(libtrace, t, *packet);
    11181126        } else if (trace_has_dedicated_hasher(libtrace)) {
    1119                 ret = trace_pread_packet_hasher_thread(libtrace, packet);
     1127                ret = trace_pread_packet_hasher_thread(libtrace, t, packet);
    11201128        } else if (!trace_has_dedicated_hasher(libtrace)) {
    11211129                /* We don't care about which core a packet goes to */
    1122                 ret = trace_pread_packet_first_in_first_served(libtrace, packet);
     1130                ret = trace_pread_packet_first_in_first_served(libtrace, t, packet);
    11231131        } /* else {
    11241132                ret = trace_pread_packet_hash_locked(libtrace, packet);
     
    11651173                return -1;
    11661174        }
     1175       
    11671176        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
    11681177        if (libtrace->state != STATE_NEW) {
Note: See TracChangeset for help on using the changeset viewer.