Changeset 17a3dff


Ignore:
Timestamp:
04/26/14 23:23:17 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
5ce14a5
Parents:
60e8e86
Message:

Rename from google map/reduce framework names to something more meaningful.
Rename mapper to perpkt since this is what it actually is in libtrace.

Files:
9 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dpdk.c

    r29bbef0 r17a3dff  
    827827    return 0;
    828828}
    829 int mapper_start(void *data); // This actually a void*
    830829
    831830/* Attach memory to the port and start the port or restart the ports.
     
    967966    // Can use remote launch for all
    968967    /*RTE_LCORE_FOREACH_SLAVE(i) {
    969                 rte_eal_remote_launch(mapper_start, (void *)libtrace, i);
     968                rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i);
    970969        }*/
    971970   
     
    996995    char err[500];
    997996    int enabled_lcore_count = 0, i=0;
    998     int tot = libtrace->mapper_thread_count;
     997    int tot = libtrace->perpkt_thread_count;
    999998    err[0] = 0;
    1000999       
    1001         libtrace->mapper_thread_count;
     1000        libtrace->perpkt_thread_count;
    10021001       
    10031002        for (i = 0; i < RTE_MAX_LCORE; i++)
     
    10071006        }
    10081007       
    1009         tot = MIN(libtrace->mapper_thread_count, enabled_lcore_count);
     1008        tot = MIN(libtrace->perpkt_thread_count, enabled_lcore_count);
    10101009        tot = MIN(tot, 8);
    1011         printf("Running pstart DPDK %d %d %d %d\n", tot, libtrace->mapper_thread_count, enabled_lcore_count, rte_lcore_count());
     1010        printf("Running pstart DPDK %d %d %d %d\n", tot, libtrace->perpkt_thread_count, enabled_lcore_count, rte_lcore_count());
    10121011       
    10131012    if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) {
  • lib/format_linux.c

    r29bbef0 r17a3dff  
    649649{
    650650        int i;
    651         int tot = libtrace->mapper_thread_count;
     651        int tot = libtrace->perpkt_thread_count;
    652652        printf("CAlling native pause packet\n");
    653653       
     
    690690static int linuxnative_pstart_input(libtrace_t *libtrace) {
    691691        int i = 0;
    692         int tot = libtrace->mapper_thread_count;
     692        int tot = libtrace->perpkt_thread_count;
    693693        int iserror = 0;
    694694        // We store this here otherwise it will be leaked if the memory doesn't know
     
    701701                // Whats going on this might not work 100%
    702702                // We assume all sockets have been closed ;)
    703                 printf("Pause and then start called again lets hope that mapper_thread_count hasn't changed\n");
     703                printf("Pause and then start called again lets hope that perpkt_thread_count hasn't changed\n");
    704704        }
    705705       
  • lib/libtrace.h.in

    rabda273 r17a3dff  
    31303130DLLEXPORT void trace_destroy_result(libtrace_result_t ** result);
    31313131
    3132 // Ways to access Global and TLS  storage that we provide the user
     3132// Ways to access Global and TLS storage that we provide the user
    31333133DLLEXPORT void * trace_get_global(libtrace_t *trace);
    31343134DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data);
     
    31513151DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash);
    31523152
     3153DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv);
    31533154
    31543155typedef enum {
     
    31753176         * A unblockable warning message will be printed to stderr in this case.
    31763177         */
    3177         TRACE_OPTION_SET_MAPPER_BUFFER_SIZE,
     3178        TRACE_OPTION_SET_PERPKT_BUFFER_SIZE,
    31783179       
    31793180        /**
    3180          * Libtrace set mapper thread count
     3181         * Libtrace set perpkt thread count
    31813182         */
    3182         TRACE_OPTION_SET_MAPPER_THREAD_COUNT,
     3183        TRACE_OPTION_SET_PERPKT_THREAD_COUNT,
    31833184       
    31843185        /**
     
    32203221        MESSAGE_STOPPED,
    32213222        MESSAGE_FIRST_PACKET,
    3222         MESSAGE_MAPPER_ENDED,
    3223         MESSAGE_MAPPER_RESUMED,
    3224         MESSAGE_MAPPER_PAUSED,
    3225         MESSAGE_MAPPER_EOF,
     3223        MESSAGE_PERPKT_ENDED,
     3224        MESSAGE_PERPKT_RESUMED,
     3225        MESSAGE_PERPKT_PAUSED,
     3226        MESSAGE_PERPKT_EOF,
    32263227        MESSAGE_POST_REDUCE,
    32273228        MESSAGE_POST_RANGE,
     
    32293230};
    32303231
     3232enum hasher_types {
     3233        HASHER_BALANCE, /** Balance load across CPUs best as possible this is basically to say don't care about hash, but this might still might be implemented using a hash or round robin etc.. */
     3234        HASHER_BIDIRECTIONAL, /** Use a hash which is uni-directional for TCP flows (IP src dest,TCP port src dest), non TCP
     3235                                                        Will be sent to the same place, with the exception of UDP which may or may not be sent to separate cores */
     3236        HASHER_UNIDIRECTIONAL, /** Use a hash which is uni-directional across TCP flow */
     3237        HASHER_CUSTOM, /** Always use the user supplied hasher */
     3238        HASHER_HARDWARE, /** Set by the format if the hashing is going to be done in hardware */
     3239};
     3240
    32313241DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value);
    32323242DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
    32333243DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet);
     3244enum hasher_types;
     3245DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data);
    32343246
    32353247#ifdef __cplusplus
  • lib/libtrace_int.h

    rd6a56b6 r17a3dff  
    175175        THREAD_EMPTY,
    176176        THREAD_HASHER,
    177         THREAD_MAPPER,
     177        THREAD_PERPKT,
    178178        THREAD_REDUCER
    179179};
     
    184184        THREAD_FINISHED,
    185185        THREAD_PAUSED
    186 };
    187 
    188 enum hasher_types {
    189         HASHER_BALANCE, /** Balance load across CPUs best as possible this is basically to say don't care about hash, but this might still might be implemented using a hash or round robin etc.. */
    190         HASHER_BIDIRECTIONAL, /** Use a hash which is uni-directional for TCP flows (IP src dest,TCP port src dest), non TCP
    191                                                         Will be sent to the same place, with the exception of UDP which may or may not be sent to separate cores */
    192         HASHER_UNIDIRECTIONAL, /** Use a hash which is uni-directional across TCP flow */
    193         HASHER_CUSTOM, /** Always use the user supplied hasher */
    194         HASHER_HARDWARE, /** Set by the format if the hashing is going to be done in hardware */
    195186};
    196187
     
    206197#define REDUCE_STEPPING 0x10
    207198
    208 #define MAPPER_USE_SLIDING_WINDOW 0x20
     199#define PERPKT_USE_SLIDING_WINDOW 0x20
    209200
    210201/**
     
    218209        void* user_data; // TLS for the user to use
    219210        pthread_t tid;
    220         int map_num; // A number from 0-X that represents this mapper number
     211        int perpkt_num; // A number from 0-X that represents this perpkt threads number
    221212                                // in the table, intended to quickly identify this thread
    222                                 // -1 represents NA (such as in the case this is not a mapper thread)
     213                                // -1 represents NA (such as the case this is not a perpkt thread)
    223214        libtrace_ringbuffer_t rbuffer; // Input
    224215        libtrace_vector_t vector; // Output
     
    243234struct first_packets {
    244235        pthread_spinlock_t lock;
    245         size_t count; // If == mappers we have all
     236        size_t count; // If == perpkt_thread_count threads we have all
    246237        size_t first; // Valid if count != 0
    247238        struct __packet_storage_magic_type {
     
    285276        /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */
    286277        pthread_cond_t perpkt_cond;
    287         /** Set to the number of mapper threads that are finishing (or have finished), or to -1 once all have been joined, 0 implies all are running */
    288         int mappers_finishing;
    289         /** A count of mappers that are pausing */
    290         int perpkt_pausing;
     278        /** Set to the number of perpkt threads that are finishing (or have finished), or to -1 once all have been joined, 0 implies all are running */
     279        int perpkts_finishing;
     280        /** A count of perpkt threads that are pausing */
     281        int perpkts_pausing;
    291282       
    292283        /** For the sliding window hasher implementation */
     
    294285        /** Set once trace_join has been called */
    295286        bool joined;
    296         /** Set to indicate a mappers queue is full and such the writing mapper cannot proceed */
    297         bool mapper_queue_full;
     287        /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */
     288        bool perpkt_queue_full;
    298289        /** Global storage for this trace, shared among all the threads  */
    299290        void* global_blob;
     
    302293        /** The actual freelist */
    303294        libtrace_ringbuffer_t packet_freelist;
    304         /** The number of packets that can queue per mapper thread - XXX consider deadlocks with non malloc()'d packets that need to be released */
    305         int mapper_buffer_size;
     295        /** The number of packets that can queue per thread - XXX consider deadlocks with non malloc()'d packets that need to be released */
     296        int perpkt_buffer_size;
    306297        /** The reducer flags */
    307298        int reducer_flags;
     
    320311        libtrace_thread_t hasher_thread;
    321312        libtrace_thread_t reducer_thread;
    322         int mapper_thread_count;
    323         libtrace_thread_t * mapper_threads; // All our mapper threads
     313        int perpkt_thread_count;
     314        libtrace_thread_t * perpkt_threads; // All our perpkt threads
    324315        libtrace_slidingwindow_t sliding_window;
    325316        sem_t sem;
     
    331322inline void libtrace_zero_thread(libtrace_thread_t * t);
    332323inline void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t);
     324libtrace_thread_t * get_thread_table(libtrace_t *libtrace);
     325int get_thread_table_num(libtrace_t *libtrace);
     326
    333327
    334328/** A libtrace output trace
  • lib/trace.c

    r60e8e86 r17a3dff  
    111111
    112112/* Set once pstart is called used for backwards compatibility reasons */
    113 extern int libtrace_parallel = 0;
     113int libtrace_parallel = 0;
    114114
    115115/* strncpy is not assured to copy the final \0, so we
     
    263263        // libtrace->libtrace_lock
    264264        // libtrace->perpkt_cond;
    265         libtrace->perpkt_pausing = 0;
    266         libtrace->mapper_queue_full = false;
    267         libtrace->mappers_finishing = -1;
     265        libtrace->perpkts_pausing = 0;
     266        libtrace->perpkt_queue_full = false;
     267        libtrace->perpkts_finishing = -1;
    268268        libtrace->reducer_flags = 0;
    269269        libtrace->joined = false;
     
    273273        libtrace->hasher = NULL;
    274274        libtrace->packet_freelist_size = 0;
    275         libtrace->mapper_buffer_size = 0;
     275        libtrace->perpkt_buffer_size = 0;
    276276        libtrace->expected_key = 0;
    277277        libtrace_zero_ringbuffer(&libtrace->packet_freelist);
     
    280280        libtrace_zero_slidingwindow(&libtrace->sliding_window);
    281281        libtrace->reducer_thread.type = THREAD_EMPTY;
    282         libtrace->mapper_thread_count = 0;
    283         libtrace->mapper_threads = NULL;
     282        libtrace->perpkt_thread_count = 0;
     283        libtrace->perpkt_threads = NULL;
    284284
    285285        /* Parse the URI to determine what sort of trace we are dealing with */
     
    381381        // libtrace->libtrace_lock
    382382        // libtrace->perpkt_cond;
    383         libtrace->perpkt_pausing = 0;
    384         libtrace->mapper_queue_full = false;
    385         libtrace->mappers_finishing = -1;
     383        libtrace->perpkts_pausing = 0;
     384        libtrace->perpkt_queue_full = false;
     385        libtrace->perpkts_finishing = -1;
    386386        libtrace->reducer_flags = 0;
    387387        libtrace->joined = false;
     
    392392        libtrace->expected_key = 0;
    393393        libtrace->packet_freelist_size = 0;
    394         libtrace->mapper_buffer_size = 0;
     394        libtrace->perpkt_buffer_size = 0;
    395395        libtrace_zero_ringbuffer(&libtrace->packet_freelist);
    396396        libtrace_zero_thread(&libtrace->hasher_thread);
     
    398398        libtrace_zero_slidingwindow(&libtrace->sliding_window);
    399399        libtrace->reducer_thread.type = THREAD_EMPTY;
    400         libtrace->mapper_thread_count = 0;
    401         libtrace->mapper_threads = NULL;
     400        libtrace->perpkt_thread_count = 0;
     401        libtrace->perpkt_threads = NULL;
    402402       
    403403        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    655655        libtrace_ringbuffer_destroy(&libtrace->packet_freelist);
    656656       
    657         for (i = 0; i < libtrace->mapper_thread_count; ++i) {
    658                         assert (libtrace_vector_get_size(&libtrace->mapper_threads[i].vector) == 0);
    659                         libtrace_vector_destroy(&libtrace->mapper_threads[i].vector);
    660         }
    661         free(libtrace->mapper_threads);
    662         libtrace->mapper_threads = NULL;
    663         libtrace->mapper_thread_count = 0;
     657        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     658                assert (libtrace_vector_get_size(&libtrace->perpkt_threads[i].vector) == 0);
     659                libtrace_vector_destroy(&libtrace->perpkt_threads[i].vector);
     660        }
     661        free(libtrace->perpkt_threads);
     662        libtrace->perpkt_threads = NULL;
     663        libtrace->perpkt_thread_count = 0;
    664664       
    665665        if (libtrace->event.packet) {
  • lib/trace_parallel.c

    r60e8e86 r17a3dff  
    125125        int i;
    126126        struct multithreading_stats totals = {0};
    127         for (i = 0; i < libtrace->mapper_thread_count ; i++) {
    128                 printf("\nStats for mapper thread#%d\n", i);
     127        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
     128                printf("\nStats for perpkt thread#%d\n", i);
    129129                printf("\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
    130130                totals.full_queue_hits += contention_stats[i].full_queue_hits;
     
    132132                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
    133133        }
    134         printf("\nTotals for mapper threads\n");
     134        printf("\nTotals for perpkt threads\n");
    135135        printf("\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
    136136        printf("\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
     
    147147        libtrace_zero_deque(&t->deque);
    148148        t->recorded_first = false;
    149         t->map_num = -1;
     149        t->perpkt_num = -1;
    150150}
    151151
     
    156156        pthread_t tid = pthread_self();
    157157
    158         for (;i<libtrace->mapper_thread_count ;++i) {
    159                 if (pthread_equal(tid, libtrace->mapper_threads[i].tid))
    160                         return &libtrace->mapper_threads[i];
     158        for (;i<libtrace->perpkt_thread_count ;++i) {
     159                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
     160                        return &libtrace->perpkt_threads[i];
    161161        }
    162162        return NULL;
    163163}
    164164
    165 int get_thread_table_num(libtrace_t *libtrace);
    166 DLLEXPORT int get_thread_table_num(libtrace_t *libtrace) {
     165int get_thread_table_num(libtrace_t *libtrace) {
    167166        int i = 0;
    168167        pthread_t tid = pthread_self();
    169         for (;i<libtrace->mapper_thread_count; ++i) {
    170                 if (pthread_equal(tid, libtrace->mapper_threads[i].tid))
     168        for (;i<libtrace->perpkt_thread_count; ++i) {
     169                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
    171170                        return i;
    172171        }
     
    196195        printf("Pausing thread #%d\n", get_thread_table_num(trace));
    197196        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    198         trace->perpkt_pausing++;
     197        trace->perpkts_pausing++;
    199198        pthread_cond_broadcast(&trace->perpkt_cond);
    200199        while (!trace->started) {
    201200                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
    202201        }
    203         trace->perpkt_pausing--;
     202        trace->perpkts_pausing--;
    204203        pthread_cond_broadcast(&trace->perpkt_cond);
    205204        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     
    207206}
    208207
    209 void* mapper_start(void *data) {
     208/**
     209 * The is the entry point for our packet processing threads.
     210 */
     211static void* perpkt_threads_entry(void *data) {
    210212        libtrace_t *trace = (libtrace_t *)data;
    211213        libtrace_thread_t * t;
     
    216218        t = get_thread_table(trace);
    217219        assert(t);
    218         //printf("Yay Started Mapper thread #%d\n", (int) get_thread_table_num(trace));
     220        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
    219221        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    220222
     
    245247                }
    246248
    247                 if (trace->mapper_thread_count == 1) {
     249                if (trace->perpkt_thread_count == 1) {
    248250                        if (!packet) {
    249251                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
     
    286288
    287289        // Notify only after we've defiantly set the state to finished
    288         message.code = MESSAGE_MAPPER_ENDED;
     290        message.code = MESSAGE_PERPKT_ENDED;
    289291        message.additional = NULL;
    290292        trace_send_message_to_reducer(trace, &message);
     
    335337                /* We are guaranteed to have a hash function i.e. != NULL */
    336338                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
    337                 thread = trace_packet_get_hash(packet) % trace->mapper_thread_count;
     339                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
    338340                /* Blocking write to the correct queue - I'm the only writer */
    339                 if (trace->mapper_threads[thread].state != THREAD_FINISHED) {
    340                         libtrace_ringbuffer_write(&trace->mapper_threads[thread].rbuffer, packet);
     341                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
     342                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
    341343                        pkt_skipped = 0;
    342344                } else {
     
    346348
    347349        /* Broadcast our last failed read to all threads */
    348         for (i = 0; i < trace->mapper_thread_count; i++) {
     350        for (i = 0; i < trace->perpkt_thread_count; i++) {
    349351                libtrace_packet_t * bcast;
    350352                printf("Broadcasting error/EOF now the trace is over\n");
    351                 if (i == trace->mapper_thread_count - 1) {
     353                if (i == trace->perpkt_thread_count - 1) {
    352354                        bcast = packet;
    353355                } else {
     
    356358                }
    357359                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    358                 if (trace->mapper_threads[i].state != THREAD_FINISHED) {
     360                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
    359361                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    360362                        // Unlock early otherwise we could deadlock
    361                         libtrace_ringbuffer_write(&trace->mapper_threads[i].rbuffer, NULL);
     363                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, NULL);
    362364                } else {
    363365                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     
    371373        // Notify only after we've defiantly set the state to finished
    372374        libtrace_message_t message;
    373         message.code = MESSAGE_MAPPER_ENDED;
     375        message.code = MESSAGE_PERPKT_ENDED;
    374376        message.additional = NULL;
    375377        trace_send_message_to_reducer(trace, &message);
     
    427429{
    428430        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    429         libtrace_thread_t* t = &libtrace->mapper_threads[this_thread];
     431        libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
    430432
    431433        if (*packet) // Recycle the old get the new
     
    468470 * before returning EOF/error.
    469471 */
    470 inline static int trace_handle_finishing_mapper(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
     472inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
    471473{
    472474        /* We are waiting for the condition that another thread ends to check
     
    480482
    481483                // Check before
    482                 if (libtrace->mappers_finishing == libtrace->mapper_thread_count) {
     484                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
    483485                        complete = true;
    484486                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    489491
    490492                // Check after
    491                 if (libtrace->mappers_finishing == libtrace->mapper_thread_count) {
     493                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
    492494                        complete = true;
    493495                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    512514 * Expects the libtrace_lock to not be held
    513515 */
    514 inline static int trace_finish_mapper(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
     516inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
    515517{
    516518        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    517519        t->state = THREAD_FINISHING;
    518         libtrace->mappers_finishing++;
     520        libtrace->perpkts_finishing++;
    519521        pthread_cond_broadcast(&libtrace->perpkt_cond);
    520522        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    521         return trace_handle_finishing_mapper(libtrace, packet, t);
     523        return trace_handle_finishing_perpkt(libtrace, packet, t);
    522524}
    523525
     
    537539{
    538540        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    539         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
    540         int thread, ret, psize;
     541        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     542        int thread, ret/*, psize*/;
    541543
    542544        while (1) {
     
    554556
    555557                // Another thread cannot write a packet because a queue has filled up. Is it ours?
    556                 if (libtrace->mapper_queue_full) {
     558                if (libtrace->perpkt_queue_full) {
    557559                        contention_stats[this_thread].wait_for_fill_complete_hits++;
    558560                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    576578
    577579                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
    578                 thread = trace_packet_get_hash(*packet) % libtrace->mapper_thread_count;
     580                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
    579581                if (thread == this_thread) {
    580582                        // If it's this thread we must be in order because we checked the buffer once we got the lock
     
    583585                }
    584586
    585                 if (libtrace->mapper_threads[thread].state != THREAD_FINISHED) {
    586                         while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->mapper_threads[thread].rbuffer, *packet)) {
    587                                 libtrace->mapper_queue_full = true;
     587                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
     588                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
     589                                libtrace->perpkt_queue_full = true;
    588590                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    589591                                contention_stats[this_thread].full_queue_hits++;
     
    591593                        }
    592594                        *packet = NULL;
    593                         libtrace->mapper_queue_full = false;
     595                        libtrace->perpkt_queue_full = false;
    594596                } else {
    595597                        /* We can get here if the user closes the thread before natural completion/or error */
     
    616618{
    617619        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    618         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
    619         int ret, i, thread, psize;
     620        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     621        int ret, i, thread/*, psize*/;
    620622
    621623        if (t->state == THREAD_FINISHING)
    622                 return trace_handle_finishing_mapper(libtrace, packet, t);
     624                return trace_handle_finishing_perpkt(libtrace, packet, t);
    623625
    624626        while (1) {
     
    641643
    642644                // TODO put on *proper* condition variable
    643                 if (libtrace->mapper_queue_full) {
     645                if (libtrace->perpkt_queue_full) {
    644646                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    645647                        assert(sem_post(&libtrace->sem) == 0);
     
    661663                                return 0;
    662664                        else
    663                                 return trace_finish_mapper(libtrace, packet, t);
     665                                return trace_finish_perpkt(libtrace, packet, t);
    664666                }
    665667                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    678680                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
    679681                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
    680                         if (libtrace->mapper_queue_full) {
     682                        if (libtrace->perpkt_queue_full) {
    681683                                // I might be the holdup in which case if I can read my queue I should do that and return
    682684                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
     
    689691                        // Read greedily as many as we can
    690692                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
    691                                 thread = trace_packet_get_hash(*packet) % libtrace->mapper_thread_count;
    692                                 if (libtrace->mapper_threads[thread].state != THREAD_FINISHED) {
    693                                         while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->mapper_threads[thread].rbuffer, *packet)) {
     693                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
     694                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
     695                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
    694696                                                if (this_thread == thread)
    695697                                                {
     
    700702                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
    701703                                                                // We must be able to write this now 100% without fail
    702                                                                 libtrace_ringbuffer_write(&libtrace->mapper_threads[thread].rbuffer, *packet);
     704                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
    703705                                                                assert(sem_post(&libtrace->sem) == 0);
    704706                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     
    709711                                                }
    710712                                                // Not us we have to give the other threads a chance to write there packets then
    711                                                 libtrace->mapper_queue_full = true;
     713                                                libtrace->perpkt_queue_full = true;
    712714                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
    713                                                 for (i = 0; i < libtrace->mapper_thread_count-1; i++) // Release all other threads to read there packets
     715                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    714716                                                        assert(sem_post(&libtrace->sem) == 0);
    715717
     
    717719                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
    718720                                                // Grab these back
    719                                                 for (i = 0; i < libtrace->mapper_thread_count-1; i++) // Release all other threads to read there packets
     721                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    720722                                                        assert(sem_wait(&libtrace->sem) == 0);
    721                                                 libtrace->mapper_queue_full = false;
     723                                                libtrace->perpkt_queue_full = false;
    722724                                        }
    723725                                        assert(sem_post(&libtrace->sem) == 0);
     
    753755                dup = trace_copy_packet(packet);
    754756                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
    755                 libtrace->first_packets.packets[t->map_num].packet = dup;
     757                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
    756758                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
    757                 memcpy(&libtrace->first_packets.packets[t->map_num].tv, &tv, sizeof(tv));
     759                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
    758760                // Now update the first
    759761                libtrace->first_packets.count++;
    760762                if (libtrace->first_packets.count == 1) {
    761763                        // We the first entry hence also the first known packet
    762                         libtrace->first_packets.first = t->map_num;
     764                        libtrace->first_packets.first = t->perpkt_num;
    763765                } else {
    764766                        // Check if we are newer than the previous 'first' packet
     
    766768                        if (trace_get_seconds(dup) <
    767769                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
    768                                 libtrace->first_packets.first = t->map_num;
     770                                libtrace->first_packets.first = t->perpkt_num;
    769771                }
    770772                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
     
    778780
    779781/**
    780  * Returns 1 if its certain that the first packet is truly the first packet
     782 * Returns 1 if it's certain that the first packet is truly the first packet
    781783 * rather than a best guess based upon threads that have published so far.
    782784 * Otherwise 0 is returned.
     
    791793                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
    792794                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
    793                 if (libtrace->first_packets.count == libtrace->mapper_thread_count) {
     795                if (libtrace->first_packets.count == libtrace->perpkt_thread_count) {
    794796                        ret = 1;
    795797                } else {
     
    944946        } else if (trace_has_dedicated_hasher(libtrace)) {
    945947                ret = trace_pread_packet_hasher_thread(libtrace, packet);
    946         } else if (libtrace->reducer_flags & MAPPER_USE_SLIDING_WINDOW) {
     948        } else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
    947949                ret = trace_pread_packet_sliding_window(libtrace, packet);
    948950        } else {
     
    967969        int i;
    968970
    969         for (i = 0; i < libtrace->mapper_thread_count; i++) {
    970                 libtrace_thread_t *t = &libtrace->mapper_threads[i];
    971                 assert(pthread_create(&t->tid, NULL, mapper_start, (void *) libtrace) == 0);
    972         }
    973         return libtrace->mapper_thread_count;
     971        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     972                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
     973                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
     974        }
     975        return libtrace->perpkt_thread_count;
    974976}
    975977
     
    988990        if (trace_is_err(libtrace))
    989991                return -1;;
    990         if (libtrace->perpkt_pausing != 0) {
     992        if (libtrace->perpkts_pausing != 0) {
    991993                printf("Restarting trace\n");
    992994                libtrace->format->pstart_input(libtrace);
     
    10051007        libtrace->per_pkt = per_pkt;
    10061008        libtrace->reducer = reducer;
    1007         libtrace->mappers_finishing = 0;
     1009        libtrace->perpkts_finishing = 0;
    10081010        // libtrace->hasher = &rand_hash; /* Hasher now set via option */
    10091011
     
    10141016
    10151017        // Set default buffer sizes
    1016         if (libtrace->mapper_buffer_size <= 0)
    1017                 libtrace->mapper_buffer_size = 1000;
    1018 
    1019         if (libtrace->mapper_thread_count <= 0)
    1020                 libtrace->mapper_thread_count = 2; // XXX scale to system
     1018        if (libtrace->perpkt_buffer_size <= 0)
     1019                libtrace->perpkt_buffer_size = 1000;
     1020
     1021        if (libtrace->perpkt_thread_count <= 0)
     1022                libtrace->perpkt_thread_count = 2; // XXX scale to system
    10211023
    10221024        if(libtrace->packet_freelist_size <= 0)
    1023                 libtrace->packet_freelist_size = (libtrace->mapper_buffer_size + 1) * libtrace->mapper_thread_count;
     1025                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
    10241026
    10251027        if(libtrace->packet_freelist_size <
    1026                 (libtrace->mapper_buffer_size + 1) * libtrace->mapper_thread_count)
     1028                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
    10271029                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
    10281030
     
    10621064        libtrace->first_packets.count = 0;
    10631065        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
    1064         libtrace->first_packets.packets = calloc(libtrace->mapper_thread_count, sizeof(struct  __packet_storage_magic_type));
    1065 
    1066 
    1067         /* Start all of our mapper threads */
    1068         libtrace->mapper_threads = calloc(sizeof(libtrace_thread_t), libtrace->mapper_thread_count);
    1069         for (i = 0; i < libtrace->mapper_thread_count; i++) {
    1070                 libtrace_thread_t *t = &libtrace->mapper_threads[i];
     1066        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
     1067
     1068
     1069        /* Start all of our perpkt threads */
     1070        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
     1071        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1072                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
    10711073                t->trace = libtrace;
    10721074                t->ret = NULL;
    1073                 t->type = THREAD_MAPPER;
     1075                t->type = THREAD_PERPKT;
    10741076                t->state = THREAD_RUNNING;
    10751077                t->user_data = NULL;
    10761078                // t->tid DONE on create
    1077                 t->map_num = i;
     1079                t->perpkt_num = i;
    10781080                if (libtrace->hasher)
    1079                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->mapper_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
     1081                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
    10801082                // Depending on the mode vector or deque might be chosen
    10811083                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
     
    10911093        int threads_started = 0;
    10921094        /* Setup the trace and start our threads */
    1093         if (libtrace->mapper_thread_count > 1 && libtrace->format->pstart_input) {
     1095        if (libtrace->perpkt_thread_count > 1 && libtrace->format->pstart_input) {
    10941096                printf("This format has direct support for p's\n");
    10951097                threads_started = libtrace->format->pstart_input(libtrace);
     
    11121114
    11131115        // TODO fix these leaks etc
    1114         if (libtrace->mapper_thread_count != threads_started)
    1115                 printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->mapper_thread_count);
     1116        if (libtrace->perpkt_thread_count != threads_started)
     1117                printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
    11161118
    11171119
     
    11241126 * 2. All perpkt threads are paused waiting on a condition var
    11251127 * 3. Then call ppause on the underlying format if found
    1126  * 4. Return with perpkt_pausing set to mapper_count (Used when restarting so we reuse the threads)
     1128 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
    11271129 *
    11281130 * Once done you should be a able to modify the trace setup and call pstart again
     
    11481150
    11491151        printf("Sending messages \n");
    1150         // Stop threads, skip this one if its a mapper
    1151         for (i = 0; i < libtrace->mapper_thread_count; i++) {
    1152                 if (&libtrace->mapper_threads[i] != t) {
     1152        // Stop threads, skip this one if its a perpkt
     1153        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1154                if (&libtrace->perpkt_threads[i] != t) {
    11531155                        libtrace_message_t message;
    11541156                        message.code = MESSAGE_PAUSE;
    11551157                        message.additional = NULL;
    1156                         trace_send_message_to_thread(libtrace, &libtrace->mapper_threads[i], &message);
     1158                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    11571159                }
    11581160        }
     
    11631165
    11641166        if (t) {
    1165                 // A mapper is doing the pausing interesting fake a extra thread paused
     1167                // A perpkt is doing the pausing interesting fake a extra thread paused
    11661168                // We rely on the user to not return before starting the trace again
    11671169                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1168                 libtrace->perpkt_pausing++;
     1170                libtrace->perpkts_pausing++;
    11691171                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    11701172        }
     
    11781180        // Wait for all threads to pause
    11791181        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1180         while (libtrace->mapper_thread_count != libtrace->perpkt_pausing) {
     1182        while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) {
    11811183                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
    11821184        }
     
    12221224        // Now send a message asking the threads to stop
    12231225        // This will be retrieved before trying to read another packet
    1224         for (i = 0; i < libtrace->mapper_thread_count; i++) {
     1226        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    12251227                libtrace_message_t message;
    12261228                message.code = MESSAGE_STOP;
    12271229                message.additional = NULL;
    1228                 trace_send_message_to_thread(libtrace, &libtrace->mapper_threads[i], &message);
     1230                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    12291231        }
    12301232
     
    13031305        int i;
    13041306
    1305         /* Firstly wait for the mapper threads to finish, since these are
     1307        /* Firstly wait for the perpkt threads to finish, since these are
    13061308         * user controlled */
    1307         for (i=0; i< libtrace->mapper_thread_count; i++) {
    1308                 //printf("Waiting to join with mapper #%d\n", i);
    1309                 assert(pthread_join(libtrace->mapper_threads[i].tid, NULL) == 0);
    1310                 //printf("Joined with mapper #%d\n", i);
     1309        for (i=0; i< libtrace->perpkt_thread_count; i++) {
     1310                //printf("Waiting to join with perpkt #%d\n", i);
     1311                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
     1312                //printf("Joined with perpkt #%d\n", i);
    13111313                // So we must do our best effort to empty the queue - so
    13121314                // the producer (or any other threads) don't block.
     
    13141316                // Mark that we are no longer accepting packets
    13151317                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1316                 libtrace->mapper_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
     1318                libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
    13171319                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    1318                 while(libtrace_ringbuffer_try_read(&libtrace->mapper_threads[i].rbuffer, (void **) &packet))
    1319                         if (packet) // This could be NULL iff the mapper finishes early
     1320                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
     1321                        if (packet) // This could be NULL iff the perpkt finishes early
    13201322                                trace_destroy_packet(packet);
    13211323        }
     
    13321334        // Now that everything is finished nothing can be touching our
    13331335        // buffers so clean them up
    1334         for (i = 0; i < libtrace->mapper_thread_count; i++) {
     1336        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    13351337                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
    13361338                // if they lost timeslice before-during a write
    13371339                libtrace_packet_t * packet;
    1338                 while(libtrace_ringbuffer_try_read(&libtrace->mapper_threads[i].rbuffer, (void **) &packet))
     1340                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
    13391341                        trace_destroy_packet(packet);
    13401342                if (libtrace->hasher) {
    1341                         assert(libtrace_ringbuffer_is_empty(&libtrace->mapper_threads[i].rbuffer));
    1342                         libtrace_ringbuffer_destroy(&libtrace->mapper_threads[i].rbuffer);
     1343                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
     1344                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
    13431345                }
    13441346                // Cannot destroy vector yet, this happens with trace_destroy
     
    14861488{
    14871489        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1488         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
     1490        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    14891491
    14901492        assert (pthread_spin_lock(&t->tmp_spinlock) == 0);
     
    15061508{
    15071509        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1508         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
     1510        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    15091511        if (t->tmp_key != key) {
    15101512                if (t->tmp_data) {
     
    15251527        // Who am I???
    15261528        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1527         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
     1529        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    15281530        // Now put it into my table
    15291531        static __thread int count = 0;
     
    15781580        libtrace_vector_empty(results);
    15791581        // Check all of the temp queues
    1580         for (i = 0; i < libtrace->mapper_thread_count; ++i) {
     1582        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    15811583                libtrace_result_t r = {0,0};
    1582                 assert (pthread_spin_lock(&libtrace->mapper_threads[i].tmp_spinlock) == 0);
    1583                 if (libtrace->mapper_threads[i].tmp_key == key) {
    1584                         libtrace_result_set_key_value(&r, key, libtrace->mapper_threads[i].tmp_data);
    1585                         libtrace->mapper_threads[i].tmp_data = NULL;
     1584                assert (pthread_spin_lock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
     1585                if (libtrace->perpkt_threads[i].tmp_key == key) {
     1586                        libtrace_result_set_key_value(&r, key, libtrace->perpkt_threads[i].tmp_data);
     1587                        libtrace->perpkt_threads[i].tmp_data = NULL;
    15861588                        printf("Found in temp queue\n");
    15871589                }
    1588                 assert (pthread_spin_unlock(&libtrace->mapper_threads[i].tmp_spinlock) == 0);
     1590                assert (pthread_spin_unlock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
    15891591                if (libtrace_result_get_value(&r)) {
    15901592                        // Got a result still in temporary
     
    15931595                } else {
    15941596                        // This might be waiting on the actual queue
    1595                         libtrace_queue_t *v = &libtrace->mapper_threads[i].deque;
     1597                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    15961598                        if (libtrace_deque_peek_front(v, (void *) &r) &&
    15971599                                        libtrace_result_get_value(&r)) {
    1598                                 assert (libtrace_deque_pop_front(&libtrace->mapper_threads[i].deque, (void *) &r) == 1);
     1600                                assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[i].deque, (void *) &r) == 1);
    15991601                                printf("Found in real queue\n");
    16001602                                libtrace_vector_push_back(results, &r);
     
    16211623        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    16221624                int live_count = 0;
    1623                 bool live[libtrace->mapper_thread_count]; // Set if a trace is alive
    1624                 uint64_t key[libtrace->mapper_thread_count]; // Cached keys
     1625                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
     1626                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
    16251627                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
    16261628                int min_queue = -1;
    16271629
    16281630                /* Loop through check all are alive (have data) and find the smallest */
    1629                 for (i = 0; i < libtrace->mapper_thread_count; ++i) {
    1630                         libtrace_queue_t *v = &libtrace->mapper_threads[i].deque;
     1631                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     1632                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    16311633                        if (libtrace_deque_get_size(v) != 0) {
    16321634                                libtrace_result_t r;
     
    16451647
    16461648                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
    1647                 while ((live_count == libtrace->mapper_thread_count) || (live_count &&
     1649                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
    16481650                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
    16491651                                libtrace->joined))) {
     
    16511653                        libtrace_result_t r;
    16521654
    1653                         assert (libtrace_deque_pop_front(&libtrace->mapper_threads[min_queue].deque, (void *) &r) == 1);
     1655                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
    16541656                        libtrace_vector_push_back(results, &r);
    16551657
     
    16581660
    16591661                        // Now update the one we just removed
    1660                         if (libtrace_deque_get_size(&libtrace->mapper_threads[min_queue].deque) )
     1662                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
    16611663                        {
    1662                                 libtrace_deque_peek_front(&libtrace->mapper_threads[min_queue].deque, (void *) &r);
     1664                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
    16631665                                key[min_queue] = libtrace_result_get_key(&r);
    16641666                                if (key[min_queue] <= min_key) {
     
    16681670                                        min_key = key[min_queue]; // Update our minimum
    16691671                                        // Check all find the smallest again - all are alive
    1670                                         for (i = 0; i < libtrace->mapper_thread_count; ++i) {
     1672                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    16711673                                                if (live[i] && min_key > key[i]) {
    16721674                                                        min_key = key[i];
     
    16801682                                min_key = UINT64_MAX; // Update our minimum
    16811683                                // Check all find the smallest again - all are alive
    1682                                 for (i = 0; i < libtrace->mapper_thread_count; ++i) {
     1684                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    16831685                                        // Still not 100% TODO (what if order is wrong or not increasing)
    16841686                                        if (live[i] && min_key >= key[i]) {
     
    16901692                }
    16911693        } else { // Queues are not in order - return all results in the queue
    1692                 for (i = 0; i < libtrace->mapper_thread_count; i++) {
    1693                         libtrace_vector_append(results, &libtrace->mapper_threads[i].vector);
     1694                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1695                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
    16941696                }
    16951697                if (flags & REDUCE_SORT) {
     
    17211723        // TODO I don't like using this so much
    17221724        //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1723         for (i = 0; i < libtrace->mapper_thread_count; i++) {
    1724                 if (libtrace->mapper_threads[i].state == THREAD_RUNNING)
     1725        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1726                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING)
    17251727                        b++;
    17261728        }
     
    17351737                case TRACE_OPTION_SET_HASHER:
    17361738                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
    1737                 case TRACE_OPTION_SET_MAPPER_BUFFER_SIZE:
    1738                         libtrace->mapper_buffer_size = *((int *) value);
     1739                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
     1740                        libtrace->perpkt_buffer_size = *((int *) value);
    17391741                        return 1;
    17401742                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
    17411743                        libtrace->packet_freelist_size = *((int *) value);
    17421744                        return 1;
    1743                 case TRACE_OPTION_SET_MAPPER_THREAD_COUNT:
    1744                         libtrace->mapper_thread_count = *((int *) value);
     1745                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
     1746                        libtrace->perpkt_thread_count = *((int *) value);
    17451747                        return 1;
    17461748                case TRACE_DROP_OUT_OF_ORDER:
     
    17701772                case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER:
    17711773                        if (*((int *) value))
    1772                                 libtrace->reducer_flags |= MAPPER_USE_SLIDING_WINDOW;
     1774                                libtrace->reducer_flags |= PERPKT_USE_SLIDING_WINDOW;
    17731775                        else
    1774                                 libtrace->reducer_flags &= ~MAPPER_USE_SLIDING_WINDOW;
     1776                                libtrace->reducer_flags &= ~PERPKT_USE_SLIDING_WINDOW;
    17751777                        return 1;
    17761778                case TRACE_OPTION_TRACETIME:
  • tools/traceanon/traceanon_parallel.c

    rd6a56b6 r17a3dff  
    352352        int i = 1;
    353353        trace_parallel_config(trace, TRACE_OPTION_SEQUENTIAL, &i);
    354         //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &i);
     354        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i);
    355355        //trace_parallel_config(trace, TRACE_OPTION_USE_DEDICATED_HASHER, &i);
    356356        //trace_parallel_config(trace, TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER, &i);
    357357        i = 2;
    358         trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_THREAD_COUNT, &i);
     358        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
    359359       
    360360        if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
  • tools/tracertstats/tracertstats_parallel.c

    rd6a56b6 r17a3dff  
    161161                assert(res != last_res);
    162162                last_res = res;
    163                 //printf("Mapper published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
     163                //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
    164164                while (*last_ts < ts) {
    165165                        report_results((double) *last_ts * (double) packet_interval, count, bytes);
     
    218218                //assert(res != last_res);
    219219                last_res = res;
    220                 //printf("Mapper published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
     220                //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
    221221                /*while (*last_ts < ts) {
    222222                        report_results((double) *last_ts * (double) packet_interval, count, bytes);
  • tools/tracestats/tracestats_parallel.c

    rd6a56b6 r17a3dff  
    215215        //trace_parallel_config(trace, TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER, &option);
    216216        option = 2;
    217         trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_THREAD_COUNT, &option);
     217        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
    218218        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
    219219
Note: See TracChangeset for help on using the changeset viewer.