- Timestamp:
- 02/25/15 17:24:20 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 43d3e73
- Parents:
- 69ae5a9
- Location:
- lib
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/libtrace.h.in
r526d9d0 r2adc1d0 3534 3534 DLLEXPORT int trace_pstop(libtrace_t *libtrace); 3535 3535 DLLEXPORT void trace_join(libtrace_t * trace); 3536 DLLEXPORT void print_contention_stats (libtrace_t *libtrace);3537 3536 3538 3537 DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key); -
lib/trace_parallel.c
r69ae5a9 r2adc1d0 105 105 extern int libtrace_parallel; 106 106 107 struct multithreading_stats {108 uint64_t full_queue_hits;109 uint64_t wait_for_fill_complete_hits;110 } contention_stats[1024];111 112 107 struct mem_stats { 113 108 struct memfail { … … 275 270 else 276 271 return false; 277 }278 279 DLLEXPORT void print_contention_stats(libtrace_t *libtrace) {280 int i;281 struct multithreading_stats totals = {0};282 for (i = 0; i < libtrace->perpkt_thread_count ; i++) {283 fprintf(stderr, "\nStats for perpkt thread#%d\n", i);284 fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits);285 totals.full_queue_hits += contention_stats[i].full_queue_hits;286 fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", contention_stats[i].wait_for_fill_complete_hits);287 totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits;288 }289 fprintf(stderr, "\nTotals for perpkt threads\n");290 fprintf(stderr, "\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits);291 fprintf(stderr, "\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits);292 293 return;294 272 } 295 273 … … 968 946 969 947 return i; 970 }971 972 /**973 * Tries to read from our queue and returns 1 if a packet was retrieved974 */975 static inline int try_waiting_queue(libtrace_t *libtrace, libtrace_thread_t * t, libtrace_packet_t **packet, int * ret)976 {977 libtrace_packet_t* retrived_packet;978 979 /* Lets see if we have one waiting */980 if (libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &retrived_packet)) {981 /* Copy paste from trace_pread_packet_hasher_thread() except that we try read (non-blocking) */982 assert(retrived_packet);983 984 if (*packet) // Recycle the old get the new985 libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);986 *packet = retrived_packet;987 *ret = (*packet)->error;988 return 1;989 }990 return 0;991 }992 993 /**994 * Allows us to ensure all threads are finished writing to our threads ring_buffer995 * before returning EOF/error.996 */997 inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)998 {999 /* We are waiting for the condition that another thread ends to check1000 * our queue for new data, once all threads end we can go to finished */1001 bool complete = false;1002 int ret;1003 1004 do {1005 // Wait for a thread to end1006 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);1007 1008 // Check before1009 if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {1010 complete = true;1011 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1012 continue;1013 }1014 1015 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);1016 1017 // Check after1018 if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {1019 complete = true;1020 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1021 continue;1022 }1023 1024 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1025 1026 // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue1027 if(try_waiting_queue(libtrace, t, packet, &ret))1028 return ret;1029 } while (!complete);1030 1031 // We can only end up here once all threads complete1032 try_waiting_queue(libtrace, t, packet, &ret);1033 1034 return ret;1035 // TODO rethink this logic fix bug here1036 }1037 1038 /**1039 * Expects the libtrace_lock to not be held1040 */1041 inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)1042 {1043 thread_change_state(libtrace, t, THREAD_FINISHING, true);1044 return trace_handle_finishing_perpkt(libtrace, packet, t);1045 }1046 1047 /**1048 * This case is much like the dedicated hasher, except that we will become1049 * hasher if we don't have a a packet waiting.1050 *1051 * Note: This is only every used if we have are doing hashing.1052 *1053 * TODO: Can block on zero copy formats such as ring: and dpdk: if the1054 * queue sizes in total are larger than the ring size.1055 *1056 * 1. We read a packet from our buffer1057 * 2. Move that into the packet provided (packet)1058 */1059 inline static int trace_pread_packet_hash_locked(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)1060 {1061 int thread, ret/*, psize*/;1062 1063 while (1) {1064 if(try_waiting_queue(libtrace, t, packet, &ret))1065 return ret;1066 // Can still block here if another thread is writing to a full queue1067 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);1068 1069 // Its impossible for our own queue to overfill, because no one can write1070 // when we are in the lock1071 if(try_waiting_queue(libtrace, t, packet, &ret)) {1072 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1073 return ret;1074 }1075 1076 // Another thread cannot write a packet because a queue has filled up. Is it ours?1077 if (libtrace->perpkt_queue_full) {1078 contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;1079 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1080 continue;1081 }1082 1083 if (!*packet)1084 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);1085 assert(*packet);1086 1087 // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)1088 if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {1089 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1090 if (libtrace_halt)1091 return 0;1092 else1093 return (*packet)->error;1094 }1095 1096 trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));1097 thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;1098 if (thread == t->perpkt_num) {1099 // If it's this thread we must be in order because we checked the buffer once we got the lock1100 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1101 return (*packet)->error;1102 }1103 1104 if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {1105 while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {1106 libtrace->perpkt_queue_full = true;1107 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1108 contention_stats[t->perpkt_num].full_queue_hits++;1109 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);1110 }1111 *packet = NULL;1112 libtrace->perpkt_queue_full = false;1113 } else {1114 /* We can get here if the user closes the thread before natural completion/or error */1115 assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");1116 }1117 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1118 }1119 948 } 1120 949
Note: See TracChangeset
for help on using the changeset viewer.