Ignore:
Timestamp:
04/26/14 23:23:17 (8 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
5ce14a5
Parents:
60e8e86
Message:

Rename from google map/reduce framework names to something more meaningful.
Rename mapper to perpkt since this is what it actually is in libtrace.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r60e8e86 r17a3dff  
    125125        int i;
    126126        struct multithreading_stats totals = {0};
    127         for (i = 0; i < libtrace->mapper_thread_count ; i++) {
    128                 printf("\nStats for mapper thread#%d\n", i);
     127        for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
     128                printf("\nStats for perpkt thread#%d\n", i);
    129129                printf("\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
    130130                totals.full_queue_hits += contention_stats[i].full_queue_hits;
     
    132132                totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
    133133        }
    134         printf("\nTotals for mapper threads\n");
     134        printf("\nTotals for perpkt threads\n");
    135135        printf("\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
    136136        printf("\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
     
    147147        libtrace_zero_deque(&t->deque);
    148148        t->recorded_first = false;
    149         t->map_num = -1;
     149        t->perpkt_num = -1;
    150150}
    151151
     
    156156        pthread_t tid = pthread_self();
    157157
    158         for (;i<libtrace->mapper_thread_count ;++i) {
    159                 if (pthread_equal(tid, libtrace->mapper_threads[i].tid))
    160                         return &libtrace->mapper_threads[i];
     158        for (;i<libtrace->perpkt_thread_count ;++i) {
     159                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
     160                        return &libtrace->perpkt_threads[i];
    161161        }
    162162        return NULL;
    163163}
    164164
    165 int get_thread_table_num(libtrace_t *libtrace);
    166 DLLEXPORT int get_thread_table_num(libtrace_t *libtrace) {
     165int get_thread_table_num(libtrace_t *libtrace) {
    167166        int i = 0;
    168167        pthread_t tid = pthread_self();
    169         for (;i<libtrace->mapper_thread_count; ++i) {
    170                 if (pthread_equal(tid, libtrace->mapper_threads[i].tid))
     168        for (;i<libtrace->perpkt_thread_count; ++i) {
     169                if (pthread_equal(tid, libtrace->perpkt_threads[i].tid))
    171170                        return i;
    172171        }
     
    196195        printf("Pausing thread #%d\n", get_thread_table_num(trace));
    197196        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    198         trace->perpkt_pausing++;
     197        trace->perpkts_pausing++;
    199198        pthread_cond_broadcast(&trace->perpkt_cond);
    200199        while (!trace->started) {
    201200                assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
    202201        }
    203         trace->perpkt_pausing--;
     202        trace->perpkts_pausing--;
    204203        pthread_cond_broadcast(&trace->perpkt_cond);
    205204        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     
    207206}
    208207
    209 void* mapper_start(void *data) {
     208/**
     209 * The is the entry point for our packet processing threads.
     210 */
     211static void* perpkt_threads_entry(void *data) {
    210212        libtrace_t *trace = (libtrace_t *)data;
    211213        libtrace_thread_t * t;
     
    216218        t = get_thread_table(trace);
    217219        assert(t);
    218         //printf("Yay Started Mapper thread #%d\n", (int) get_thread_table_num(trace));
     220        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
    219221        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    220222
     
    245247                }
    246248
    247                 if (trace->mapper_thread_count == 1) {
     249                if (trace->perpkt_thread_count == 1) {
    248250                        if (!packet) {
    249251                                if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
     
    286288
    287289        // Notify only after we've defiantly set the state to finished
    288         message.code = MESSAGE_MAPPER_ENDED;
     290        message.code = MESSAGE_PERPKT_ENDED;
    289291        message.additional = NULL;
    290292        trace_send_message_to_reducer(trace, &message);
     
    335337                /* We are guaranteed to have a hash function i.e. != NULL */
    336338                trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data));
    337                 thread = trace_packet_get_hash(packet) % trace->mapper_thread_count;
     339                thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count;
    338340                /* Blocking write to the correct queue - I'm the only writer */
    339                 if (trace->mapper_threads[thread].state != THREAD_FINISHED) {
    340                         libtrace_ringbuffer_write(&trace->mapper_threads[thread].rbuffer, packet);
     341                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
     342                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
    341343                        pkt_skipped = 0;
    342344                } else {
     
    346348
    347349        /* Broadcast our last failed read to all threads */
    348         for (i = 0; i < trace->mapper_thread_count; i++) {
     350        for (i = 0; i < trace->perpkt_thread_count; i++) {
    349351                libtrace_packet_t * bcast;
    350352                printf("Broadcasting error/EOF now the trace is over\n");
    351                 if (i == trace->mapper_thread_count - 1) {
     353                if (i == trace->perpkt_thread_count - 1) {
    352354                        bcast = packet;
    353355                } else {
     
    356358                }
    357359                assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
    358                 if (trace->mapper_threads[i].state != THREAD_FINISHED) {
     360                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
    359361                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    360362                        // Unlock early otherwise we could deadlock
    361                         libtrace_ringbuffer_write(&trace->mapper_threads[i].rbuffer, NULL);
     363                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, NULL);
    362364                } else {
    363365                        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     
    371373        // Notify only after we've defiantly set the state to finished
    372374        libtrace_message_t message;
    373         message.code = MESSAGE_MAPPER_ENDED;
     375        message.code = MESSAGE_PERPKT_ENDED;
    374376        message.additional = NULL;
    375377        trace_send_message_to_reducer(trace, &message);
     
    427429{
    428430        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    429         libtrace_thread_t* t = &libtrace->mapper_threads[this_thread];
     431        libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
    430432
    431433        if (*packet) // Recycle the old get the new
     
    468470 * before returning EOF/error.
    469471 */
    470 inline static int trace_handle_finishing_mapper(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
     472inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
    471473{
    472474        /* We are waiting for the condition that another thread ends to check
     
    480482
    481483                // Check before
    482                 if (libtrace->mappers_finishing == libtrace->mapper_thread_count) {
     484                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
    483485                        complete = true;
    484486                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    489491
    490492                // Check after
    491                 if (libtrace->mappers_finishing == libtrace->mapper_thread_count) {
     493                if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) {
    492494                        complete = true;
    493495                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    512514 * Expects the libtrace_lock to not be held
    513515 */
    514 inline static int trace_finish_mapper(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
     516inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
    515517{
    516518        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    517519        t->state = THREAD_FINISHING;
    518         libtrace->mappers_finishing++;
     520        libtrace->perpkts_finishing++;
    519521        pthread_cond_broadcast(&libtrace->perpkt_cond);
    520522        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    521         return trace_handle_finishing_mapper(libtrace, packet, t);
     523        return trace_handle_finishing_perpkt(libtrace, packet, t);
    522524}
    523525
     
    537539{
    538540        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    539         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
    540         int thread, ret, psize;
     541        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     542        int thread, ret/*, psize*/;
    541543
    542544        while (1) {
     
    554556
    555557                // Another thread cannot write a packet because a queue has filled up. Is it ours?
    556                 if (libtrace->mapper_queue_full) {
     558                if (libtrace->perpkt_queue_full) {
    557559                        contention_stats[this_thread].wait_for_fill_complete_hits++;
    558560                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    576578
    577579                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
    578                 thread = trace_packet_get_hash(*packet) % libtrace->mapper_thread_count;
     580                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
    579581                if (thread == this_thread) {
    580582                        // If it's this thread we must be in order because we checked the buffer once we got the lock
     
    583585                }
    584586
    585                 if (libtrace->mapper_threads[thread].state != THREAD_FINISHED) {
    586                         while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->mapper_threads[thread].rbuffer, *packet)) {
    587                                 libtrace->mapper_queue_full = true;
     587                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
     588                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
     589                                libtrace->perpkt_queue_full = true;
    588590                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    589591                                contention_stats[this_thread].full_queue_hits++;
     
    591593                        }
    592594                        *packet = NULL;
    593                         libtrace->mapper_queue_full = false;
     595                        libtrace->perpkt_queue_full = false;
    594596                } else {
    595597                        /* We can get here if the user closes the thread before natural completion/or error */
     
    616618{
    617619        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    618         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
    619         int ret, i, thread, psize;
     620        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     621        int ret, i, thread/*, psize*/;
    620622
    621623        if (t->state == THREAD_FINISHING)
    622                 return trace_handle_finishing_mapper(libtrace, packet, t);
     624                return trace_handle_finishing_perpkt(libtrace, packet, t);
    623625
    624626        while (1) {
     
    641643
    642644                // TODO put on *proper* condition variable
    643                 if (libtrace->mapper_queue_full) {
     645                if (libtrace->perpkt_queue_full) {
    644646                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    645647                        assert(sem_post(&libtrace->sem) == 0);
     
    661663                                return 0;
    662664                        else
    663                                 return trace_finish_mapper(libtrace, packet, t);
     665                                return trace_finish_perpkt(libtrace, packet, t);
    664666                }
    665667                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    678680                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
    679681                        assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
    680                         if (libtrace->mapper_queue_full) {
     682                        if (libtrace->perpkt_queue_full) {
    681683                                // I might be the holdup in which case if I can read my queue I should do that and return
    682684                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
     
    689691                        // Read greedily as many as we can
    690692                        while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {
    691                                 thread = trace_packet_get_hash(*packet) % libtrace->mapper_thread_count;
    692                                 if (libtrace->mapper_threads[thread].state != THREAD_FINISHED) {
    693                                         while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->mapper_threads[thread].rbuffer, *packet)) {
     693                                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
     694                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
     695                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
    694696                                                if (this_thread == thread)
    695697                                                {
     
    700702                                                        if(try_waiting_queue(libtrace, t, packet, &ret)) {
    701703                                                                // We must be able to write this now 100% without fail
    702                                                                 libtrace_ringbuffer_write(&libtrace->mapper_threads[thread].rbuffer, *packet);
     704                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
    703705                                                                assert(sem_post(&libtrace->sem) == 0);
    704706                                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     
    709711                                                }
    710712                                                // Not us we have to give the other threads a chance to write there packets then
    711                                                 libtrace->mapper_queue_full = true;
     713                                                libtrace->perpkt_queue_full = true;
    712714                                                assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
    713                                                 for (i = 0; i < libtrace->mapper_thread_count-1; i++) // Release all other threads to read there packets
     715                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    714716                                                        assert(sem_post(&libtrace->sem) == 0);
    715717
     
    717719                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
    718720                                                // Grab these back
    719                                                 for (i = 0; i < libtrace->mapper_thread_count-1; i++) // Release all other threads to read there packets
     721                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    720722                                                        assert(sem_wait(&libtrace->sem) == 0);
    721                                                 libtrace->mapper_queue_full = false;
     723                                                libtrace->perpkt_queue_full = false;
    722724                                        }
    723725                                        assert(sem_post(&libtrace->sem) == 0);
     
    753755                dup = trace_copy_packet(packet);
    754756                assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
    755                 libtrace->first_packets.packets[t->map_num].packet = dup;
     757                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
    756758                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
    757                 memcpy(&libtrace->first_packets.packets[t->map_num].tv, &tv, sizeof(tv));
     759                memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv));
    758760                // Now update the first
    759761                libtrace->first_packets.count++;
    760762                if (libtrace->first_packets.count == 1) {
    761763                        // We the first entry hence also the first known packet
    762                         libtrace->first_packets.first = t->map_num;
     764                        libtrace->first_packets.first = t->perpkt_num;
    763765                } else {
    764766                        // Check if we are newer than the previous 'first' packet
     
    766768                        if (trace_get_seconds(dup) <
    767769                                trace_get_seconds(libtrace->first_packets.packets[first].packet))
    768                                 libtrace->first_packets.first = t->map_num;
     770                                libtrace->first_packets.first = t->perpkt_num;
    769771                }
    770772                assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
     
    778780
    779781/**
    780  * Returns 1 if its certain that the first packet is truly the first packet
     782 * Returns 1 if it's certain that the first packet is truly the first packet
    781783 * rather than a best guess based upon threads that have published so far.
    782784 * Otherwise 0 is returned.
     
    791793                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
    792794                *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv;
    793                 if (libtrace->first_packets.count == libtrace->mapper_thread_count) {
     795                if (libtrace->first_packets.count == libtrace->perpkt_thread_count) {
    794796                        ret = 1;
    795797                } else {
     
    944946        } else if (trace_has_dedicated_hasher(libtrace)) {
    945947                ret = trace_pread_packet_hasher_thread(libtrace, packet);
    946         } else if (libtrace->reducer_flags & MAPPER_USE_SLIDING_WINDOW) {
     948        } else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) {
    947949                ret = trace_pread_packet_sliding_window(libtrace, packet);
    948950        } else {
     
    967969        int i;
    968970
    969         for (i = 0; i < libtrace->mapper_thread_count; i++) {
    970                 libtrace_thread_t *t = &libtrace->mapper_threads[i];
    971                 assert(pthread_create(&t->tid, NULL, mapper_start, (void *) libtrace) == 0);
    972         }
    973         return libtrace->mapper_thread_count;
     971        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     972                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
     973                assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
     974        }
     975        return libtrace->perpkt_thread_count;
    974976}
    975977
     
    988990        if (trace_is_err(libtrace))
    989991                return -1;;
    990         if (libtrace->perpkt_pausing != 0) {
     992        if (libtrace->perpkts_pausing != 0) {
    991993                printf("Restarting trace\n");
    992994                libtrace->format->pstart_input(libtrace);
     
    10051007        libtrace->per_pkt = per_pkt;
    10061008        libtrace->reducer = reducer;
    1007         libtrace->mappers_finishing = 0;
     1009        libtrace->perpkts_finishing = 0;
    10081010        // libtrace->hasher = &rand_hash; /* Hasher now set via option */
    10091011
     
    10141016
    10151017        // Set default buffer sizes
    1016         if (libtrace->mapper_buffer_size <= 0)
    1017                 libtrace->mapper_buffer_size = 1000;
    1018 
    1019         if (libtrace->mapper_thread_count <= 0)
    1020                 libtrace->mapper_thread_count = 2; // XXX scale to system
     1018        if (libtrace->perpkt_buffer_size <= 0)
     1019                libtrace->perpkt_buffer_size = 1000;
     1020
     1021        if (libtrace->perpkt_thread_count <= 0)
     1022                libtrace->perpkt_thread_count = 2; // XXX scale to system
    10211023
    10221024        if(libtrace->packet_freelist_size <= 0)
    1023                 libtrace->packet_freelist_size = (libtrace->mapper_buffer_size + 1) * libtrace->mapper_thread_count;
     1025                libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
    10241026
    10251027        if(libtrace->packet_freelist_size <
    1026                 (libtrace->mapper_buffer_size + 1) * libtrace->mapper_thread_count)
     1028                (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
    10271029                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
    10281030
     
    10621064        libtrace->first_packets.count = 0;
    10631065        assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
    1064         libtrace->first_packets.packets = calloc(libtrace->mapper_thread_count, sizeof(struct  __packet_storage_magic_type));
    1065 
    1066 
    1067         /* Start all of our mapper threads */
    1068         libtrace->mapper_threads = calloc(sizeof(libtrace_thread_t), libtrace->mapper_thread_count);
    1069         for (i = 0; i < libtrace->mapper_thread_count; i++) {
    1070                 libtrace_thread_t *t = &libtrace->mapper_threads[i];
     1066        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
     1067
     1068
     1069        /* Start all of our perpkt threads */
     1070        libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count);
     1071        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1072                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
    10711073                t->trace = libtrace;
    10721074                t->ret = NULL;
    1073                 t->type = THREAD_MAPPER;
     1075                t->type = THREAD_PERPKT;
    10741076                t->state = THREAD_RUNNING;
    10751077                t->user_data = NULL;
    10761078                // t->tid DONE on create
    1077                 t->map_num = i;
     1079                t->perpkt_num = i;
    10781080                if (libtrace->hasher)
    1079                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->mapper_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
     1081                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
    10801082                // Depending on the mode vector or deque might be chosen
    10811083                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
     
    10911093        int threads_started = 0;
    10921094        /* Setup the trace and start our threads */
    1093         if (libtrace->mapper_thread_count > 1 && libtrace->format->pstart_input) {
     1095        if (libtrace->perpkt_thread_count > 1 && libtrace->format->pstart_input) {
    10941096                printf("This format has direct support for p's\n");
    10951097                threads_started = libtrace->format->pstart_input(libtrace);
     
    11121114
    11131115        // TODO fix these leaks etc
    1114         if (libtrace->mapper_thread_count != threads_started)
    1115                 printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->mapper_thread_count);
     1116        if (libtrace->perpkt_thread_count != threads_started)
     1117                printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count);
    11161118
    11171119
     
    11241126 * 2. All perpkt threads are paused waiting on a condition var
    11251127 * 3. Then call ppause on the underlying format if found
    1126  * 4. Return with perpkt_pausing set to mapper_count (Used when restarting so we reuse the threads)
     1128 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads)
    11271129 *
    11281130 * Once done you should be a able to modify the trace setup and call pstart again
     
    11481150
    11491151        printf("Sending messages \n");
    1150         // Stop threads, skip this one if its a mapper
    1151         for (i = 0; i < libtrace->mapper_thread_count; i++) {
    1152                 if (&libtrace->mapper_threads[i] != t) {
     1152        // Stop threads, skip this one if its a perpkt
     1153        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1154                if (&libtrace->perpkt_threads[i] != t) {
    11531155                        libtrace_message_t message;
    11541156                        message.code = MESSAGE_PAUSE;
    11551157                        message.additional = NULL;
    1156                         trace_send_message_to_thread(libtrace, &libtrace->mapper_threads[i], &message);
     1158                        trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    11571159                }
    11581160        }
     
    11631165
    11641166        if (t) {
    1165                 // A mapper is doing the pausing interesting fake a extra thread paused
     1167                // A perpkt is doing the pausing interesting fake a extra thread paused
    11661168                // We rely on the user to not return before starting the trace again
    11671169                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1168                 libtrace->perpkt_pausing++;
     1170                libtrace->perpkts_pausing++;
    11691171                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    11701172        }
     
    11781180        // Wait for all threads to pause
    11791181        assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1180         while (libtrace->mapper_thread_count != libtrace->perpkt_pausing) {
     1182        while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) {
    11811183                assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
    11821184        }
     
    12221224        // Now send a message asking the threads to stop
    12231225        // This will be retrieved before trying to read another packet
    1224         for (i = 0; i < libtrace->mapper_thread_count; i++) {
     1226        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    12251227                libtrace_message_t message;
    12261228                message.code = MESSAGE_STOP;
    12271229                message.additional = NULL;
    1228                 trace_send_message_to_thread(libtrace, &libtrace->mapper_threads[i], &message);
     1230                trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message);
    12291231        }
    12301232
     
    13031305        int i;
    13041306
    1305         /* Firstly wait for the mapper threads to finish, since these are
     1307        /* Firstly wait for the perpkt threads to finish, since these are
    13061308         * user controlled */
    1307         for (i=0; i< libtrace->mapper_thread_count; i++) {
    1308                 //printf("Waiting to join with mapper #%d\n", i);
    1309                 assert(pthread_join(libtrace->mapper_threads[i].tid, NULL) == 0);
    1310                 //printf("Joined with mapper #%d\n", i);
     1309        for (i=0; i< libtrace->perpkt_thread_count; i++) {
     1310                //printf("Waiting to join with perpkt #%d\n", i);
     1311                assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
     1312                //printf("Joined with perpkt #%d\n", i);
    13111313                // So we must do our best effort to empty the queue - so
    13121314                // the producer (or any other threads) don't block.
     
    13141316                // Mark that we are no longer accepting packets
    13151317                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1316                 libtrace->mapper_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
     1318                libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer
    13171319                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    1318                 while(libtrace_ringbuffer_try_read(&libtrace->mapper_threads[i].rbuffer, (void **) &packet))
    1319                         if (packet) // This could be NULL iff the mapper finishes early
     1320                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
     1321                        if (packet) // This could be NULL iff the perpkt finishes early
    13201322                                trace_destroy_packet(packet);
    13211323        }
     
    13321334        // Now that everything is finished nothing can be touching our
    13331335        // buffers so clean them up
    1334         for (i = 0; i < libtrace->mapper_thread_count; i++) {
     1336        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    13351337                // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
    13361338                // if they lost timeslice before-during a write
    13371339                libtrace_packet_t * packet;
    1338                 while(libtrace_ringbuffer_try_read(&libtrace->mapper_threads[i].rbuffer, (void **) &packet))
     1340                while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet))
    13391341                        trace_destroy_packet(packet);
    13401342                if (libtrace->hasher) {
    1341                         assert(libtrace_ringbuffer_is_empty(&libtrace->mapper_threads[i].rbuffer));
    1342                         libtrace_ringbuffer_destroy(&libtrace->mapper_threads[i].rbuffer);
     1343                        assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));
     1344                        libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer);
    13431345                }
    13441346                // Cannot destroy vector yet, this happens with trace_destroy
     
    14861488{
    14871489        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1488         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
     1490        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    14891491
    14901492        assert (pthread_spin_lock(&t->tmp_spinlock) == 0);
     
    15061508{
    15071509        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1508         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
     1510        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    15091511        if (t->tmp_key != key) {
    15101512                if (t->tmp_data) {
     
    15251527        // Who am I???
    15261528        int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1527         libtrace_thread_t * t = &libtrace->mapper_threads[this_thread];
     1529        libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    15281530        // Now put it into my table
    15291531        static __thread int count = 0;
     
    15781580        libtrace_vector_empty(results);
    15791581        // Check all of the temp queues
    1580         for (i = 0; i < libtrace->mapper_thread_count; ++i) {
     1582        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    15811583                libtrace_result_t r = {0,0};
    1582                 assert (pthread_spin_lock(&libtrace->mapper_threads[i].tmp_spinlock) == 0);
    1583                 if (libtrace->mapper_threads[i].tmp_key == key) {
    1584                         libtrace_result_set_key_value(&r, key, libtrace->mapper_threads[i].tmp_data);
    1585                         libtrace->mapper_threads[i].tmp_data = NULL;
     1584                assert (pthread_spin_lock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
     1585                if (libtrace->perpkt_threads[i].tmp_key == key) {
     1586                        libtrace_result_set_key_value(&r, key, libtrace->perpkt_threads[i].tmp_data);
     1587                        libtrace->perpkt_threads[i].tmp_data = NULL;
    15861588                        printf("Found in temp queue\n");
    15871589                }
    1588                 assert (pthread_spin_unlock(&libtrace->mapper_threads[i].tmp_spinlock) == 0);
     1590                assert (pthread_spin_unlock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0);
    15891591                if (libtrace_result_get_value(&r)) {
    15901592                        // Got a result still in temporary
     
    15931595                } else {
    15941596                        // This might be waiting on the actual queue
    1595                         libtrace_queue_t *v = &libtrace->mapper_threads[i].deque;
     1597                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    15961598                        if (libtrace_deque_peek_front(v, (void *) &r) &&
    15971599                                        libtrace_result_get_value(&r)) {
    1598                                 assert (libtrace_deque_pop_front(&libtrace->mapper_threads[i].deque, (void *) &r) == 1);
     1600                                assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[i].deque, (void *) &r) == 1);
    15991601                                printf("Found in real queue\n");
    16001602                                libtrace_vector_push_back(results, &r);
     
    16211623        if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    16221624                int live_count = 0;
    1623                 bool live[libtrace->mapper_thread_count]; // Set if a trace is alive
    1624                 uint64_t key[libtrace->mapper_thread_count]; // Cached keys
     1625                bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive
     1626                uint64_t key[libtrace->perpkt_thread_count]; // Cached keys
    16251627                uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h?
    16261628                int min_queue = -1;
    16271629
    16281630                /* Loop through check all are alive (have data) and find the smallest */
    1629                 for (i = 0; i < libtrace->mapper_thread_count; ++i) {
    1630                         libtrace_queue_t *v = &libtrace->mapper_threads[i].deque;
     1631                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     1632                        libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque;
    16311633                        if (libtrace_deque_get_size(v) != 0) {
    16321634                                libtrace_result_t r;
     
    16451647
    16461648                /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */
    1647                 while ((live_count == libtrace->mapper_thread_count) || (live_count &&
     1649                while ((live_count == libtrace->perpkt_thread_count) || (live_count &&
    16481650                                ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) ||
    16491651                                libtrace->joined))) {
     
    16511653                        libtrace_result_t r;
    16521654
    1653                         assert (libtrace_deque_pop_front(&libtrace->mapper_threads[min_queue].deque, (void *) &r) == 1);
     1655                        assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1);
    16541656                        libtrace_vector_push_back(results, &r);
    16551657
     
    16581660
    16591661                        // Now update the one we just removed
    1660                         if (libtrace_deque_get_size(&libtrace->mapper_threads[min_queue].deque) )
     1662                        if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) )
    16611663                        {
    1662                                 libtrace_deque_peek_front(&libtrace->mapper_threads[min_queue].deque, (void *) &r);
     1664                                libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r);
    16631665                                key[min_queue] = libtrace_result_get_key(&r);
    16641666                                if (key[min_queue] <= min_key) {
     
    16681670                                        min_key = key[min_queue]; // Update our minimum
    16691671                                        // Check all find the smallest again - all are alive
    1670                                         for (i = 0; i < libtrace->mapper_thread_count; ++i) {
     1672                                        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    16711673                                                if (live[i] && min_key > key[i]) {
    16721674                                                        min_key = key[i];
     
    16801682                                min_key = UINT64_MAX; // Update our minimum
    16811683                                // Check all find the smallest again - all are alive
    1682                                 for (i = 0; i < libtrace->mapper_thread_count; ++i) {
     1684                                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    16831685                                        // Still not 100% TODO (what if order is wrong or not increasing)
    16841686                                        if (live[i] && min_key >= key[i]) {
     
    16901692                }
    16911693        } else { // Queues are not in order - return all results in the queue
    1692                 for (i = 0; i < libtrace->mapper_thread_count; i++) {
    1693                         libtrace_vector_append(results, &libtrace->mapper_threads[i].vector);
     1694                for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1695                        libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector);
    16941696                }
    16951697                if (flags & REDUCE_SORT) {
     
    17211723        // TODO I don't like using this so much
    17221724        //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    1723         for (i = 0; i < libtrace->mapper_thread_count; i++) {
    1724                 if (libtrace->mapper_threads[i].state == THREAD_RUNNING)
     1725        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     1726                if (libtrace->perpkt_threads[i].state == THREAD_RUNNING)
    17251727                        b++;
    17261728        }
     
    17351737                case TRACE_OPTION_SET_HASHER:
    17361738                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
    1737                 case TRACE_OPTION_SET_MAPPER_BUFFER_SIZE:
    1738                         libtrace->mapper_buffer_size = *((int *) value);
     1739                case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
     1740                        libtrace->perpkt_buffer_size = *((int *) value);
    17391741                        return 1;
    17401742                case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
    17411743                        libtrace->packet_freelist_size = *((int *) value);
    17421744                        return 1;
    1743                 case TRACE_OPTION_SET_MAPPER_THREAD_COUNT:
    1744                         libtrace->mapper_thread_count = *((int *) value);
     1745                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
     1746                        libtrace->perpkt_thread_count = *((int *) value);
    17451747                        return 1;
    17461748                case TRACE_DROP_OUT_OF_ORDER:
     
    17701772                case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER:
    17711773                        if (*((int *) value))
    1772                                 libtrace->reducer_flags |= MAPPER_USE_SLIDING_WINDOW;
     1774                                libtrace->reducer_flags |= PERPKT_USE_SLIDING_WINDOW;
    17731775                        else
    1774                                 libtrace->reducer_flags &= ~MAPPER_USE_SLIDING_WINDOW;
     1776                                libtrace->reducer_flags &= ~PERPKT_USE_SLIDING_WINDOW;
    17751777                        return 1;
    17761778                case TRACE_OPTION_TRACETIME:
Note: See TracChangeset for help on using the changeset viewer.