Changeset 2adc1d0


Ignore:
Timestamp:
02/25/15 17:24:20 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
43d3e73
Parents:
69ae5a9
Message:

Remove unused hash lock code which was never finished

Due to the complexity of threading this code was never finished.
And did not show a significant performance improvement over
alternatives.

Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace.h.in

    r526d9d0 r2adc1d0  
    35343534DLLEXPORT int trace_pstop(libtrace_t *libtrace);
    35353535DLLEXPORT void trace_join(libtrace_t * trace);
    3536 DLLEXPORT void print_contention_stats (libtrace_t *libtrace);
    35373536
    35383537DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key);
  • lib/trace_parallel.c

    r69ae5a9 r2adc1d0  
    105105extern int libtrace_parallel;
    106106
    107 struct multithreading_stats {
    108         uint64_t full_queue_hits;
    109         uint64_t wait_for_fill_complete_hits;
    110 } contention_stats[1024];
    111 
    112107struct mem_stats {
    113108        struct memfail {
     
    275270        else
    276271                return false;
    277 }
    278 
    279 DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {
    280         int i;
    281         struct multithreading_stats totals = {0};
    282         for (i = 0; i < libtrace->perpkt_thread_count ; i++) {
    283                 fprintf(stderr, "\nStats for perpkt thread#%d\n", i);
    284                 fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);
    285                 totals.full_queue_hits += contention_stats[i].full_queue_hits;
    286                 fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);
    287                 totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;
    288         }
    289         fprintf(stderr, "\nTotals for perpkt threads\n");
    290         fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);
    291         fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);
    292 
    293         return;
    294272}
    295273
     
    968946
    969947        return i;
    970 }
    971 
    972 /**
    973  * Tries to read from our queue and returns 1 if a packet was retrieved
    974  */
    975 static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)
    976 {
    977         libtrace_packet_t* retrived_packet;
    978 
    979         /* Lets see if we have one waiting */
    980         if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {
    981                 /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */
    982                 assert(retrived_packet);
    983 
    984                 if (*packet) // Recycle the old get the new
    985                         libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
    986                 *packet = retrived_packet;
    987                 *ret = (*packet)->error;
    988                 return 1;
    989         }
    990         return 0;
    991 }
    992 
    993 /**
    994  * Allows us to ensure all threads are finished writing to our threads ring_buffer
    995  * before returning EOF/error.
    996  */
    997 inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
    998 {
    999         /* We are waiting for the condition that another thread ends to check
    1000          * our queue for new data, once all threads end we can go to finished */
    1001         bool complete = false;
    1002         int ret;
    1003 
    1004         do {
    1005                 // Wait for a thread to end
    1006                 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    1007 
    1008                 // Check before
    1009                 if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
    1010                         complete = true;
    1011                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1012                         continue;
    1013                 }
    1014 
    1015                 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
    1016 
    1017                 // Check after
    1018                 if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
    1019                         complete = true;
    1020                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1021                         continue;
    1022                 }
    1023 
    1024                 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1025 
    1026                 // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
    1027                 if(try_waiting_queue(libtrace, t, packet, &ret))
    1028                         return ret;
    1029         } while (!complete);
    1030 
    1031         // We can only end up here once all threads complete
    1032         try_waiting_queue(libtrace, t, packet, &ret);
    1033 
    1034         return ret;
    1035         // TODO rethink this logic fix bug here
    1036 }
    1037 
    1038 /**
    1039  * Expects the libtrace_lock to not be held
    1040  */
    1041 inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)
    1042 {
    1043         thread_change_state(libtrace, t, THREAD_FINISHING, true);
    1044         return trace_handle_finishing_perpkt(libtrace, packet, t);
    1045 }
    1046 
    1047 /**
    1048  * This case is much like the dedicated hasher, except that we will become
    1049  * hasher if we don't have a a packet waiting.
    1050  *
    1051  * Note: This is only every used if we have are doing hashing.
    1052  *
    1053  * TODO: Can block on zero copy formats such as ring: and dpdk: if the
    1054  * queue sizes in total are larger than the ring size.
    1055  *
    1056  * 1. We read a packet from our buffer
    1057  * 2. Move that into the packet provided (packet)
    1058  */
    1059 inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    1060 {
    1061         int thread, ret/*, psize*/;
    1062 
    1063         while (1) {
    1064                 if(try_waiting_queue(libtrace, t, packet, &ret))
    1065                         return ret;
    1066                 // Can still block here if another thread is writing to a full queue
    1067                 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    1068 
    1069                 // Its impossible for our own queue to overfill, because no one can write
    1070                 // when we are in the lock
    1071                 if(try_waiting_queue(libtrace, t, packet, &ret)) {
    1072                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1073                         return ret;
    1074                 }
    1075 
    1076                 // Another thread cannot write a packet because a queue has filled up. Is it ours?
    1077                 if (libtrace->perpkt_queue_full) {
    1078                         contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    1079                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1080                         continue;
    1081                 }
    1082 
    1083                 if (!*packet)
    1084                         libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
    1085                 assert(*packet);
    1086 
    1087                 // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
    1088                 if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
    1089                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1090                         if (libtrace_halt)
    1091                                 return 0;
    1092                         else
    1093                                 return (*packet)->error;
    1094                 }
    1095 
    1096                 trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));
    1097                 thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;
    1098                 if (thread == t->perpkt_num) {
    1099                         // If it's this thread we must be in order because we checked the buffer once we got the lock
    1100                         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1101                         return (*packet)->error;
    1102                 }
    1103 
    1104                 if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {
    1105                         while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
    1106                                 libtrace->perpkt_queue_full = true;
    1107                                 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1108                                 contention_stats[t->perpkt_num].full_queue_hits++;
    1109                                 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    1110                         }
    1111                         *packet = NULL;
    1112                         libtrace->perpkt_queue_full = false;
    1113                 } else {
    1114                         /* We can get here if the user closes the thread before natural completion/or error */
    1115                         assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
    1116                 }
    1117                 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1118         }
    1119948}
    1120949
  • tools/tracestats/tracestats_parallel.c

    r5ab626a r2adc1d0  
    257257                trace_perror(trace,"%s",uri);
    258258
    259         print_contention_stats(trace);
    260259        trace_destroy(trace);
    261260}
Note: See TracChangeset for help on using the changeset viewer.