Ignore:
Timestamp:
07/18/14 14:20:32 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
b4b6b75
Parents:
8af0d01
Message:

Adds per thread storage to for the format to use against libtrace_threads.
Passes threads as arguments to reads to save overhead of looking these up.
Various changes to the DPDK system including registering a thread to allow our threads to be start with different DPDK thread numbers for thread local memory caches.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r3296252 r50ce607  
    317317        assert(t);
    318318        //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));
     319        if (trace->format->pregister_thread) {
     320                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
     321        }
    319322        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    320323
     
    343346                                                // The hasher has stopped by this point, so the queue shouldn't be filling
    344347                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    345                                                         psize = trace_pread_packet(trace, &packet);
     348                                                        psize = trace_pread_packet(trace, t, &packet);
    346349                                                        if (psize > 0) {
    347350                                                                packet = (*trace->per_pkt)(trace, packet, NULL, t);
     
    375378                        }
    376379                } else {
    377                         psize = trace_pread_packet(trace, &packet);
     380                        psize = trace_pread_packet(trace, t, &packet);
    378381                }
    379382
     
    413416        trace_send_message_to_reducer(trace, &message);
    414417
     418        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     419        if (trace->format->punregister_thread) {
     420                trace->format->punregister_thread(trace, t);
     421        }
     422        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     423
    415424        pthread_exit(NULL);
    416425};
     
    434443        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
    435444        printf("Hasher Thread started\n");
     445        if (trace->format->pregister_thread) {
     446                trace->format->pregister_thread(trace, t, true);
     447        }
    436448        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    437449        int pkt_skipped = 0;
     
    514526        message.additional.uint64 = 0;
    515527        trace_send_message_to_reducer(trace, &message);
     528        assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     529        if (trace->format->pregister_thread) {
     530                trace->format->punregister_thread(trace, t);
     531        }
     532        assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    516533
    517534        // TODO remove from TTABLE t sometime
     
    539556 * lock to read a packet from the underlying trace.
    540557 */
    541 inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_packet_t **packet)
     558inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    542559{
    543560        // We need this to fill the 'first' packet table
    544         libtrace_thread_t *t = get_thread_table(libtrace);
    545561        if (!*packet) {
    546562                if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
     
    565581 * 2. Move that into the packet provided (packet)
    566582 */
    567 inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_packet_t **packet)
    568 {
    569         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    570         libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread];
    571 
     583inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
     584{
    572585        if (*packet) // Recycle the old get the new
    573586                if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
     
    675688 * 2. Move that into the packet provided (packet)
    676689 */
    677 inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_packet_t **packet)
    678 {
    679         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    680         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     690inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
     691{
    681692        int thread, ret/*, psize*/;
    682693
     
    696707                // Another thread cannot write a packet because a queue has filled up. Is it ours?
    697708                if (libtrace->perpkt_queue_full) {
    698                         contention_stats[this_thread].wait_for_fill_complete_hits++;
     709                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    699710                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    700711                        continue;
     
    718729                trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
    719730                thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
    720                 if (thread == this_thread) {
     731                if (thread == t->perpkt_num) {
    721732                        // If it's this thread we must be in order because we checked the buffer once we got the lock
    722733                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     
    728739                                libtrace->perpkt_queue_full = true;
    729740                                assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    730                                 contention_stats[this_thread].full_queue_hits++;
     741                                contention_stats[t->perpkt_num].full_queue_hits++;
    731742                                assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    732743                        }
     
    754765 * 2. Move that into the packet provided (packet)
    755766 */
    756 inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_packet_t **packet)
    757 {
    758         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    759         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
     767inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
     768{
    760769        int ret, i, thread/*, psize*/;
    761770
     
    785794                        assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    786795                        assert(sem_post(&libtrace->sem) == 0);
    787                         contention_stats[this_thread].wait_for_fill_complete_hits++;
     796                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    788797                        continue;
    789798                }
     
    833842                                if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
    834843                                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
    835                                                 if (this_thread == thread)
     844                                                if (t->perpkt_num == thread)
    836845                                                {
    837846                                                        // TODO think about this case more because we have to stop early if this were to happen on the last read
     
    855864                                                        assert(sem_post(&libtrace->sem) == 0);
    856865
    857                                                 contention_stats[this_thread].full_queue_hits++;
     866                                                contention_stats[t->perpkt_num].full_queue_hits++;
    858867                                                assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
    859868                                                // Grab these back
     
    10481057 *
    10491058 */
    1050 static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_packet_t *packet) {
     1059static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packet) {
    10511060
    10521061        assert(libtrace && "You called trace_read_packet() with a NULL libtrace parameter!\n");
     
    10731082                         * structure */
    10741083                        packet->trace = libtrace;
    1075                         ret=libtrace->format->pread_packet(libtrace,packet);
     1084                        ret=libtrace->format->pread_packet(libtrace, t, packet);
    10761085                        if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
    10771086                                return ret;
     
    11031112 * Read a packet from the parallel trace
    11041113 */
    1105 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_packet_t **packet)
     1114DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    11061115{
    11071116        int ret;
    1108         libtrace_thread_t *t = get_thread_table(libtrace);
    11091117
    11101118        // Cleanup the packet passed back
     
    11151123                if (!*packet)
    11161124                        *packet = trace_create_packet();
    1117                 ret = trace_pread_packet_wrapper(libtrace, *packet);
     1125                ret = trace_pread_packet_wrapper(libtrace, t, *packet);
    11181126        } else if (trace_has_dedicated_hasher(libtrace)) {
    1119                 ret = trace_pread_packet_hasher_thread(libtrace, packet);
     1127                ret = trace_pread_packet_hasher_thread(libtrace, t, packet);
    11201128        } else if (!trace_has_dedicated_hasher(libtrace)) {
    11211129                /* We don't care about which core a packet goes to */
    1122                 ret = trace_pread_packet_first_in_first_served(libtrace, packet);
     1130                ret = trace_pread_packet_first_in_first_served(libtrace, t, packet);
    11231131        } /* else {
    11241132                ret = trace_pread_packet_hash_locked(libtrace, packet);
     
    11651173                return -1;
    11661174        }
     1175       
    11671176        // NOTE: Until the trace is started we wont have a libtrace_lock initialised
    11681177        if (libtrace->state != STATE_NEW) {
Note: See TracChangeset for help on using the changeset viewer.