Changeset a49a9eb
- Timestamp:
- 07/30/14 18:44:16 (7 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- be3f75b
- Parents:
- 41148f2
- Files:
-
- 2 added
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/Makefile.am
rdafe86a ra49a9eb 58 58 libtrace_arphrd.h \ 59 59 data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \ 60 data-struct/deque.c data-struct/sliding_window.c \60 data-struct/deque.c data-struct/sliding_window.c data-struct/object_cache.c \ 61 61 hash_toeplitz.c 62 62 -
lib/data-struct/deque.c
r8c42377 ra49a9eb 25 25 q->size = 0; 26 26 q->element_size = element_size; 27 assert(pthread_mutex_init(&q->lock, NULL)== 0);27 ASSERT_RET(pthread_mutex_init(&q->lock, NULL), == 0); 28 28 } 29 29 … … 36 36 memcpy(&new_node->data, d, q->element_size); 37 37 // Only ->prev is unknown at this stage to be completed in lock 38 assert(pthread_mutex_lock(&q->lock)== 0);38 ASSERT_RET(pthread_mutex_lock(&q->lock), == 0); 39 39 if (q->head == NULL) { 40 40 assert(q->tail == NULL && q->size == 0); … … 48 48 } 49 49 q->size++; 50 assert(pthread_mutex_unlock(&q->lock)== 0);50 ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0); 51 51 } 52 52 … … 59 59 memcpy(&new_node->data, d, q->element_size); 60 60 // Only ->next is unknown at this stage to be completed in lock 61 assert(pthread_mutex_lock(&q->lock)== 0);61 ASSERT_RET(pthread_mutex_lock(&q->lock), == 0); 62 62 if (q->head == NULL) { 63 63 assert(q->tail == NULL && q->size == 0); … … 72 72 } 73 73 q->size++; 74 assert(pthread_mutex_unlock(&q->lock)== 0);74 ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0); 75 75 } 76 76 … … 78 78 { 79 79 int ret = 1; 80 assert(pthread_mutex_lock(&q->lock)== 0);80 ASSERT_RET(pthread_mutex_lock(&q->lock), == 0); 81 81 if (q->head == NULL) 82 82 ret = 0; 83 83 else 84 84 memcpy(d, &q->head->data, q->element_size); 85 assert(pthread_mutex_unlock(&q->lock)== 0);85 ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0); 86 86 return ret; 87 87 } … … 90 90 { 91 91 int ret = 1; 92 assert(pthread_mutex_lock(&q->lock)== 0);92 ASSERT_RET(pthread_mutex_lock(&q->lock), == 0); 93 93 if (q->tail == NULL) 94 94 ret = 0; 95 95 else 96 96 memcpy(d, &q->tail->data, q->element_size); 97 assert(pthread_mutex_unlock(&q->lock)== 0);97 ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0); 98 98 return ret; 99 99 } … … 103 103 int ret = 0; 104 104 list_node_t * n = NULL; 105 assert(pthread_mutex_lock(&q->lock)== 0);105 ASSERT_RET(pthread_mutex_lock(&q->lock), == 0); 106 106 if (q->head != NULL) { 107 107 n = q->head; … … 114 114 q->tail = q->head; 115 115 } 116 assert(pthread_mutex_unlock(&q->lock)== 0);116 ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0); 117 117 // Unlock once we've removed it :) 118 118 if (ret) { … … 127 127 int ret = 0; 128 128 list_node_t * n; 129 assert(pthread_mutex_lock(&q->lock)== 0);129 ASSERT_RET(pthread_mutex_lock(&q->lock), == 0); 130 130 if (q->tail != NULL) { 131 131 n = q->tail; … … 138 138 q->head = q->tail; 139 139 } 140 assert(pthread_mutex_unlock(&q->lock)== 0);140 ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0); 141 141 if (ret) { 142 142 memcpy(d, &n->data, q->element_size); … … 150 150 #if RACE_SAFE 151 151 size_t ret; 152 assert(pthread_mutex_lock(&q->lock)== 0);152 ASSERT_RET(pthread_mutex_lock(&q->lock), == 0); 153 153 ret = q->size; 154 assert(pthread_mutex_unlock(&q->lock)== 0);154 ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0); 155 155 return ret; 156 156 #else … … 169 169 list_node_t *n; 170 170 assert(q->element_size == sizeof(libtrace_result_t)); 171 assert(pthread_mutex_lock(&q->lock)== 0);171 ASSERT_RET(pthread_mutex_lock(&q->lock), == 0); 172 172 n = q->head; 173 173 for (n = q->head; n != NULL; n = n->next) { 174 174 (*fn)(&n->data); 175 175 } 176 assert(pthread_mutex_unlock(&q->lock)== 0);176 ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0); 177 177 } -
lib/data-struct/message_queue.c
r5ba34eb ra49a9eb 19 19 { 20 20 assert(message_len); 21 assert(pipe(mq->pipefd)!= -1);21 ASSERT_RET(pipe(mq->pipefd), != -1); 22 22 mq->message_count = 0; 23 23 if (message_len > PIPE_BUF) … … 45 45 int ret; 46 46 assert(mq->message_len); 47 assert(write(mq->pipefd[1], message, mq->message_len)== (int) mq->message_len);47 ASSERT_RET(write(mq->pipefd[1], message, mq->message_len), == (int) mq->message_len); 48 48 // Update after we've written 49 49 pthread_spin_lock(&mq->spin); … … 73 73 ret = mq->message_count--; 74 74 pthread_spin_unlock(&mq->spin); 75 assert(read(mq->pipefd[0], message, mq->message_len)== (int) mq->message_len);75 ASSERT_RET(read(mq->pipefd[0], message, mq->message_len), == (int) mq->message_len); 76 76 return ret; 77 77 } … … 102 102 ret = --mq->message_count; 103 103 // :( read(...) needs to be done within the *spin* lock otherwise blocking might steal our read 104 assert(read(mq->pipefd[0], message, mq->message_len)== (int) mq->message_len);104 ASSERT_RET(read(mq->pipefd[0], message, mq->message_len), == (int) mq->message_len); 105 105 } else { 106 106 ret = LIBTRACE_MQ_FAILED; -
lib/data-struct/message_queue.h
r5ba34eb ra49a9eb 1 1 #include <pthread.h> 2 2 #include <limits.h> 3 #include "../libtrace.h" 3 4 4 5 #ifndef LIBTRACE_MESSAGE_QUEUE -
lib/data-struct/ring_buffer.c
read9478 ra49a9eb 7 7 #include <stdlib.h> 8 8 #include <assert.h> 9 #include <string.h> 10 #include <stdio.h> 9 11 10 12 #define LOCK_TYPE_MUTEX 0 // Default if not defined 11 13 #define LOCK_TYPE_SPIN 1 12 #define LOCK_TYPE_SEMAPHORE 2 13 #define LOCK_TYPE_NONE 3 14 #define LOCK_TYPE_NONE 2 14 15 15 16 // No major difference noticed here between mutex and spin, both have there 16 17 // downsides. 17 18 18 #define USE_MODULUS 119 19 #define USE_CHECK_EARLY 1 20 20 21 #define USE_LOCK_TYPE LOCK_TYPE_ SPIN21 #define USE_LOCK_TYPE LOCK_TYPE_MUTEX 22 22 #if USE_LOCK_TYPE == LOCK_TYPE_SPIN 23 # define LOCK(dir) assert(pthread_spin_lock(&rb->s ## dir ## lock)== 0)24 # define UNLOCK(dir) assert(pthread_spin_unlock(&rb->s ## dir ## lock)== 0)23 # define LOCK(dir) ASSERT_RET(pthread_spin_lock(&rb->s ## dir ## lock), == 0) 24 # define UNLOCK(dir) ASSERT_RET(pthread_spin_unlock(&rb->s ## dir ## lock), == 0) 25 25 # define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \ 26 action }27 #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE28 # define LOCK(dir) assert(sem_wait(&rb->sem ## dir ## lock) == 0)29 # define UNLOCK(dir) assert(sem_post(&rb->sem ## dir ## lock) == 0)30 # define TRY_LOCK(dir, action) if(sem_trywait(&rb->sem ## dir ## lock) != 0) { \31 26 action } 32 27 #elif USE_LOCK_TYPE == LOCK_TYPE_NONE … … 35 30 # define TRY_LOCK(dir, action) 36 31 #else // Mutex 37 # define LOCK(dir) assert(pthread_mutex_lock(&rb-> dir ## lock)== 0)38 # define UNLOCK(dir) assert(pthread_mutex_unlock(&rb-> dir ## lock)== 0)32 # define LOCK(dir) ASSERT_RET(pthread_mutex_lock(&rb-> dir ## lock), == 0) 33 # define UNLOCK(dir) ASSERT_RET(pthread_mutex_unlock(&rb-> dir ## lock), == 0) 39 34 # define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\ 40 35 action } … … 64 59 if (mode == LIBTRACE_RINGBUFFER_BLOCKING) { 65 60 /* The signaling part - i.e. release when data's ready to read */ 66 assert(sem_init(&rb->fulls, 0, 0) == 0); 67 assert(sem_init(&rb->emptys, 0, size - 1) == 0); // REMEMBER the -1 here :) very important 61 pthread_cond_init(&rb->full_cond, NULL); 62 pthread_cond_init(&rb->empty_cond, NULL); 63 ASSERT_RET(pthread_mutex_init(&rb->empty_lock, NULL), == 0); 64 ASSERT_RET(pthread_mutex_init(&rb->full_lock, NULL), == 0); 68 65 } 69 66 /* The mutual exclusion part */ 70 67 #if USE_LOCK_TYPE == LOCK_TYPE_SPIN 71 68 #warning "using spinners" 72 assert(pthread_spin_init(&rb->swlock, 0) == 0); 73 assert(pthread_spin_init(&rb->srlock, 0) == 0); 74 #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE 75 #warning "using semaphore" 76 assert(sem_init(&rb->semrlock, 0, 1) != -1); 77 assert(sem_init(&rb->semwlock, 0, 1) != -1); 69 ASSERT_RET(pthread_spin_init(&rb->swlock, 0), == 0); 70 ASSERT_RET(pthread_spin_init(&rb->srlock, 0), == 0); 78 71 #elif USE_LOCK_TYPE == LOCK_TYPE_NONE 79 72 #warning "No locking used" 80 #else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */81 assert(pthread_mutex_init(&rb->wlock, NULL)== 0);82 assert(pthread_mutex_init(&rb->rlock, NULL)== 0);73 #else 74 ASSERT_RET(pthread_mutex_init(&rb->wlock, NULL), == 0); 75 ASSERT_RET(pthread_mutex_init(&rb->rlock, NULL), == 0); 83 76 #endif 84 77 } … … 90 83 DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) { 91 84 #if USE_LOCK_TYPE == LOCK_TYPE_SPIN 92 assert(pthread_spin_destroy(&rb->swlock) == 0); 93 assert(pthread_spin_destroy(&rb->srlock) == 0); 94 #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE 95 assert(sem_destroy(&rb->semrlock) != -1); 96 assert(sem_destroy(&rb->semwlock) != -1); 85 ASSERT_RET(pthread_spin_destroy(&rb->swlock), == 0); 86 ASSERT_RET(pthread_spin_destroy(&rb->srlock), == 0); 97 87 #elif USE_LOCK_TYPE == LOCK_TYPE_NONE 98 #else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */ 99 assert(pthread_mutex_destroy(&rb->wlock) == 0); 100 assert(pthread_mutex_destroy(&rb->rlock) == 0); 101 #endif 88 #endif 89 ASSERT_RET(pthread_mutex_destroy(&rb->wlock), == 0); 90 ASSERT_RET(pthread_mutex_destroy(&rb->rlock), == 0); 102 91 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) { 103 assert(sem_destroy(&rb->fulls) == 0);104 assert(sem_destroy(&rb->emptys) == 0);92 pthread_cond_destroy(&rb->full_cond); 93 pthread_cond_destroy(&rb->empty_cond); 105 94 } 106 95 rb->size = 0; … … 126 115 */ 127 116 DLLEXPORT int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) { 128 #if USE_MODULUS129 117 return rb->start == ((rb->end + 1) % rb->size); 130 #else 131 return rb->start == ((rb->end + 1 < rb->size) ? rb->end + 1 : 0); 132 #endif 133 } 134 135 /** 136 * Performs a blocking write to the buffer, upon return the value will be 137 * stored. This will not clobber old values. 138 * 139 * This assumes only one thread writing at once. Use 140 * libtrace_ringbuffer_swrite for a thread safe version. 141 * 142 * @param rb a pointer to libtrace_ringbuffer structure 143 * @param value the value to store 144 */ 145 DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) { 118 } 119 120 static inline size_t libtrace_ringbuffer_nb_full(const libtrace_ringbuffer_t *rb) { 121 if (rb->end < rb->start) 122 return rb->end + rb->size - rb->start; 123 else 124 return rb->end - rb->start; 125 // return (rb->end + rb->size - rb->start) % rb->size; 126 } 127 128 static inline size_t libtrace_ringbuffer_nb_empty(const libtrace_ringbuffer_t *rb) { 129 if (rb->start <= rb->end) 130 return rb->start + rb->size - rb->end - 1; 131 else 132 return rb->start - rb->end - 1; 133 // return (rb->start + rb->size - rb->end - 1) % rb->size; 134 } 135 136 /** 137 * Waits for a empty slot, that we can write to. 138 * @param rb The ringbuffer 139 */ 140 static inline void wait_for_empty(libtrace_ringbuffer_t *rb) { 146 141 /* Need an empty to start with */ 147 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) 148 assert(sem_wait(&rb->emptys) == 0); 149 else 142 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) { 143 pthread_mutex_lock(&rb->empty_lock); 144 while (libtrace_ringbuffer_is_full(rb)) 145 pthread_cond_wait(&rb->empty_cond, &rb->empty_lock); 146 pthread_mutex_unlock(&rb->empty_lock); 147 } else { 150 148 while (libtrace_ringbuffer_is_full(rb)) 151 149 /* Yield our time, why?, we tried and failed to write an item … … 154 152 * burst to write without blocking */ 155 153 sched_yield();//_mm_pause(); 156 154 } 155 } 156 157 /** 158 * Waits for a full slot, that we read from. 159 * @param rb The ringbuffer 160 */ 161 static inline void wait_for_full(libtrace_ringbuffer_t *rb) { 162 /* Need an empty to start with */ 163 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) { 164 pthread_mutex_lock(&rb->full_lock); 165 while (libtrace_ringbuffer_is_empty(rb)) 166 pthread_cond_wait(&rb->full_cond, &rb->full_lock); 167 pthread_mutex_unlock(&rb->full_lock); 168 } else { 169 while (libtrace_ringbuffer_is_empty(rb)) 170 /* Yield our time, why?, we tried and failed to write an item 171 * to the buffer - so we should give up our time in the hope 172 * that the reader thread can empty the buffer giving us a good 173 * burst to write without blocking */ 174 sched_yield();//_mm_pause(); 175 } 176 } 177 178 /** 179 * Notifies we have created a full slot, after a write. 180 * @param rb The ringbuffer 181 */ 182 static inline void notify_full(libtrace_ringbuffer_t *rb) { 183 /* Need an empty to start with */ 184 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) { 185 pthread_mutex_lock(&rb->full_lock); 186 pthread_cond_broadcast(&rb->full_cond); 187 pthread_mutex_unlock(&rb->full_lock); 188 } 189 } 190 191 /** 192 * Notifies we have created an empty slot, after a read. 193 * @param rb The ringbuffer 194 */ 195 static inline void notify_empty(libtrace_ringbuffer_t *rb) { 196 /* Need an empty to start with */ 197 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) { 198 pthread_mutex_lock(&rb->empty_lock); 199 pthread_cond_broadcast(&rb->empty_cond); 200 pthread_mutex_unlock(&rb->empty_lock); 201 } 202 } 203 204 /** 205 * Performs a blocking write to the buffer, upon return the value will be 206 * stored. This will not clobber old values. 207 * 208 * This assumes only one thread writing at once. Use 209 * libtrace_ringbuffer_swrite for a thread safe version. 210 * 211 * @param rb a pointer to libtrace_ringbuffer structure 212 * @param value the value to store 213 */ 214 DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) { 215 /* Need an empty to start with */ 216 wait_for_empty(rb); 157 217 rb->elements[rb->end] = value; 158 #if USE_MODULUS159 218 rb->end = (rb->end + 1) % rb->size; 160 #else 161 rb->end = (rb->end + 1 < rb->size) ? rb->end + 1 : 0; 162 #endif 163 /* This check is bad we can easily lose our time slice, and the reader 164 * can catch up before it should, in this case spin locking is used */ 165 //if (libtrace_ringbuffer_is_empty(rb)) 166 // assert(0 == 1); 167 /* Now we've made another full */ 168 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) 169 assert(sem_post(&rb->fulls) == 0); 219 notify_full(rb); 220 } 221 222 /** 223 * Performs a blocking write to the buffer, upon return the value will be 224 * stored. This will not clobber old values. 225 * 226 * This assumes only one thread writing at once. Use 227 * libtrace_ringbuffer_swrite for a thread safe version. 228 * 229 * Packets are written out from start to end in order, if only some packets are 230 * written those at the end of the array will be still be unwritten. 231 * 232 * @param rb a pointer to libtrace_ringbuffer structure 233 * @param values A pointer to a memory address read in 234 * @param nb_buffer The maximum buffers to write i.e. the length of values 235 * @param min_nb_buffers The minimum number of buffers to write 236 * @param value the value to store 237 */ 238 DLLEXPORT size_t libtrace_ringbuffer_write_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) { 239 size_t nb_ready; 240 size_t i = 0; 241 242 assert(min_nb_buffers <= nb_buffers); 243 if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb)) 244 return 0; 245 246 do { 247 register size_t end; 248 wait_for_empty(rb); 249 nb_ready = libtrace_ringbuffer_nb_empty(rb); 250 nb_ready = MIN(nb_ready, nb_buffers-i); 251 nb_ready += i; 252 // TODO consider optimising into at most 2 memcpys?? 253 end = rb->end; 254 for (; i < nb_ready; i++) { 255 rb->elements[end] = values[i]; 256 end = (end + 1) % rb->size; 257 } 258 rb->end = end; 259 notify_full(rb); 260 } while (i < min_nb_buffers); 261 return i; 170 262 } 171 263 … … 198 290 199 291 /* We need a full slot */ 200 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) 201 assert(sem_wait(&rb->fulls) == 0); 202 else 203 while (libtrace_ringbuffer_is_empty(rb)) 204 /* Yield our time, why?, we tried and failed to read an item 205 * from the buffer - so we should give up our time in the hope 206 * that the writer thread can fill the buffer giving us a good 207 * burst to read without blocking etc */ 208 sched_yield();//_mm_pause(); 292 wait_for_full(rb); 293 value = rb->elements[rb->start]; 294 rb->start = (rb->start + 1) % rb->size; 295 /* Now that's an empty slot */ 296 notify_empty(rb); 297 return value; 298 } 299 300 /** 301 * Waits and reads from the supplied buffer, note this will block forever. 302 * Attempts to read the requested number of packets, however will return 303 * with only the number that are currently ready. 304 * 305 * Set min_nb_buffers to 0 to 'try' read packets. 306 * 307 * The buffer is filled from start to finish i.e. if 2 is returned [0] and [1] 308 * are valid. 309 * 310 * @param rb a pointer to libtrace_ringbuffer structure 311 * @param values A pointer to a memory address where the returned item would be placed 312 * @param nb_buffer The maximum buffers to read i.e. the length of values 313 * @param min_nb_buffers The minimum number of buffers to read 314 * @return The number of packets read 315 */ 316 DLLEXPORT size_t libtrace_ringbuffer_read_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) { 317 size_t nb_ready; 318 size_t i = 0; 209 319 210 value = rb->elements[rb->start]; 211 #if USE_MODULUS 212 rb->start = (rb->start + 1) % rb->size; 213 #else 214 rb->start = (rb->start + 1 < rb->size) ? rb->start + 1 : 0; 215 #endif 216 /* Now that's a empty slot */ 217 if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) 218 assert(sem_post(&rb->emptys) == 0); 219 return value; 220 } 320 assert(min_nb_buffers <= nb_buffers); 321 322 if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb)) 323 return 0; 324 325 do { 326 register size_t start; 327 /* We need a full slot */ 328 wait_for_full(rb); 329 330 nb_ready = libtrace_ringbuffer_nb_full(rb); 331 nb_ready = MIN(nb_ready, nb_buffers-i); 332 // Additional to the i we've already read 333 nb_ready += i; 334 start = rb->start; 335 for (; i < nb_ready; i++) { 336 values[i] = rb->elements[start]; 337 start = (start + 1) % rb->size; 338 } 339 rb->start = start; 340 /* Now that's an empty slot */ 341 notify_empty(rb); 342 } while (i < min_nb_buffers); 343 return i; 344 } 345 221 346 222 347 /** … … 242 367 libtrace_ringbuffer_write(rb, value); 243 368 UNLOCK(w); 369 } 370 371 /** 372 * A thread safe version of libtrace_ringbuffer_write_bulk 373 */ 374 DLLEXPORT size_t libtrace_ringbuffer_swrite_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) { 375 size_t ret; 376 #if USE_CHECK_EARLY 377 if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb)) // Check early 378 return 0; 379 #endif 380 LOCK(w); 381 ret = libtrace_ringbuffer_write_bulk(rb, values, nb_buffers, min_nb_buffers); 382 UNLOCK(w); 383 return ret; 244 384 } 245 385 … … 290 430 291 431 /** 432 * A thread safe version of libtrace_ringbuffer_read_bulk 433 */ 434 DLLEXPORT size_t libtrace_ringbuffer_sread_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) { 435 size_t ret; 436 #if USE_CHECK_EARLY 437 if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb)) // Check early 438 return 0; 439 #endif 440 LOCK(r); 441 ret = libtrace_ringbuffer_read_bulk(rb, values, nb_buffers, min_nb_buffers); 442 UNLOCK(r); 443 return ret; 444 } 445 446 /** 292 447 * A thread safe version of libtrace_ringbuffer_try_write 293 448 */ … … 330 485 rb->elements = NULL; 331 486 } 487 488 -
lib/data-struct/ring_buffer.h
read9478 ra49a9eb 20 20 pthread_spinlock_t swlock; 21 21 pthread_spinlock_t srlock; 22 sem_t semrlock; 23 sem_t semwlock; 24 sem_t emptys; 25 sem_t fulls; 22 // We need to ensure that broadcasts dont get lost hence 23 // these locks below 24 // We avoid using semaphores since they don't allow 25 // multiple releases. 26 pthread_mutex_t empty_lock; 27 pthread_mutex_t full_lock; 28 pthread_cond_t empty_cond; // Signal when empties are ready 29 pthread_cond_t full_cond; // Signal when fulls are ready 26 30 // Aim to get this on a separate cache line to start - important if spinning 27 31 volatile size_t end; … … 46 50 DLLEXPORT int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value); 47 51 52 53 54 DLLEXPORT size_t libtrace_ringbuffer_write_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers); 55 DLLEXPORT size_t libtrace_ringbuffer_read_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers); 56 DLLEXPORT size_t libtrace_ringbuffer_sread_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers); 57 DLLEXPORT size_t libtrace_ringbuffer_swrite_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers); 58 48 59 #endif -
lib/data-struct/vector.c
rfac8c46 ra49a9eb 10 10 v->max_size = 128; // Pick a largish size to begin with 11 11 v->elements = malloc(v->max_size * v->element_size); 12 assert(pthread_mutex_init(&v->lock, NULL)== 0);12 ASSERT_RET(pthread_mutex_init(&v->lock, NULL), == 0); 13 13 } 14 14 15 15 DLLEXPORT void libtrace_vector_destroy(libtrace_vector_t *v) { 16 assert(pthread_mutex_destroy(&v->lock)== 0);16 ASSERT_RET(pthread_mutex_destroy(&v->lock), == 0); 17 17 free(v->elements); 18 18 // Be safe make sure we wont work any more … … 24 24 25 25 DLLEXPORT void libtrace_vector_push_back(libtrace_vector_t *v, void *d) { 26 assert(pthread_mutex_lock(&v->lock)== 0);26 ASSERT_RET(pthread_mutex_lock(&v->lock), == 0); 27 27 if (v->size >= v->max_size) { 28 28 /* Resize */ … … 33 33 memcpy(&v->elements[v->size*v->element_size], d, v->element_size); 34 34 v->size++; 35 assert(pthread_mutex_unlock(&v->lock)== 0);35 ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0); 36 36 } 37 37 … … 41 41 42 42 DLLEXPORT int libtrace_vector_get(libtrace_vector_t *v, size_t location, void *d) { 43 assert(pthread_mutex_lock(&v->lock)== 0);43 ASSERT_RET(pthread_mutex_lock(&v->lock), == 0); 44 44 if (location >= v->size) { 45 assert(pthread_mutex_unlock(&v->lock)== 0);45 ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0); 46 46 return 0; 47 47 } 48 48 memcpy(d, &v->elements[location*v->element_size], v->element_size); 49 assert(pthread_mutex_unlock(&v->lock)== 0);49 ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0); 50 50 return 1; 51 51 } … … 53 53 DLLEXPORT int libtrace_vector_remove_front(libtrace_vector_t *v) { 54 54 size_t i; 55 assert(pthread_mutex_lock(&v->lock)== 0);55 ASSERT_RET(pthread_mutex_lock(&v->lock), == 0); 56 56 if (!v->size) { 57 assert(pthread_mutex_unlock(&v->lock)== 0);57 ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0); 58 58 return 0; 59 59 } … … 62 62 for (i = 0; i < v->size * v->element_size; i++) 63 63 v->elements[i] = v->elements[i+v->element_size]; 64 assert(pthread_mutex_unlock(&v->lock)== 0);64 ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0); 65 65 return 1; 66 66 } … … 82 82 if (src->size == 0) // Nothing to do if this is the case 83 83 return; 84 assert(pthread_mutex_lock(&dest->lock)== 0);85 assert(pthread_mutex_lock(&src->lock)== 0);84 ASSERT_RET(pthread_mutex_lock(&dest->lock), == 0); 85 ASSERT_RET(pthread_mutex_lock(&src->lock), == 0); 86 86 if (src->size == 0) // Double check now we've got the locks - Nothing to do if this is the case 87 87 goto unlock; … … 104 104 } 105 105 unlock: 106 assert(pthread_mutex_unlock(&src->lock)== 0);107 assert(pthread_mutex_unlock(&dest->lock)== 0);106 ASSERT_RET(pthread_mutex_unlock(&src->lock), == 0); 107 ASSERT_RET(pthread_mutex_unlock(&dest->lock), == 0); 108 108 } 109 109 … … 117 117 118 118 DLLEXPORT void libtrace_vector_empty(libtrace_vector_t *v) { 119 assert(pthread_mutex_lock(&v->lock)== 0);119 ASSERT_RET(pthread_mutex_lock(&v->lock), == 0); 120 120 v->size = 0; 121 assert(pthread_mutex_unlock(&v->lock)== 0);121 ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0); 122 122 } 123 123 … … 126 126 { 127 127 size_t cur; 128 assert(pthread_mutex_lock(&v->lock)== 0);128 ASSERT_RET(pthread_mutex_lock(&v->lock), == 0); 129 129 for (cur = 0; cur < v->size; cur++) { 130 130 (*fn)(&v->elements[cur*v->element_size]); 131 131 } 132 assert(pthread_mutex_unlock(&v->lock)== 0);132 ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0); 133 133 } -
lib/libtrace.h.in
r17c5749 ra49a9eb 115 115 /** DAG driver version installed on the current system */ 116 116 #define DAG_DRIVER_V "@DAG_VERSION_NUM@" 117 118 /** 119 * A version of assert that always runs the first argument even 120 * when not debugging, however only asserts the condition if debugging 121 * Intended for use mainly with pthread locks etc. which have error 122 * returns but *should* never actually fail. 123 */ 124 #ifdef NDEBUG 125 #define ASSERT_RET(run, cond) run 126 #else 127 #define ASSERT_RET(run, cond) assert(run cond) 128 #endif 117 129 118 130 #ifdef __cplusplus … … 3188 3200 3189 3201 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer); 3190 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet);3191 3202 DLLEXPORT int trace_ppause(libtrace_t *libtrace); 3192 3203 DLLEXPORT int trace_pstop(libtrace_t *libtrace); -
lib/libtrace_int.h
r50ce607 ra49a9eb 149 149 150 150 #include "data-struct/ring_buffer.h" 151 #include "data-struct/object_cache.h" 151 152 #include "data-struct/vector.h" 152 153 #include "data-struct/message_queue.h" … … 317 318 int packet_freelist_size; 318 319 /** The actual freelist */ 319 libtrace_ ringbuffer_t packet_freelist;320 libtrace_ocache_t packet_freelist; 320 321 /** The number of packets that can queue per thread - XXX consider deadlocks with non malloc()'d packets that need to be released */ 321 322 int perpkt_buffer_size; -
lib/trace.c
rc99b1e5 ra49a9eb 273 273 libtrace->perpkt_buffer_size = 0; 274 274 libtrace->expected_key = 0; 275 libtrace_zero_ ringbuffer(&libtrace->packet_freelist);275 libtrace_zero_ocache(&libtrace->packet_freelist); 276 276 libtrace_zero_thread(&libtrace->hasher_thread); 277 277 libtrace_zero_thread(&libtrace->reducer_thread); … … 395 395 libtrace->packet_freelist_size = 0; 396 396 libtrace->perpkt_buffer_size = 0; 397 libtrace_zero_ ringbuffer(&libtrace->packet_freelist);397 libtrace_zero_ocache(&libtrace->packet_freelist); 398 398 libtrace_zero_thread(&libtrace->hasher_thread); 399 399 libtrace_zero_thread(&libtrace->reducer_thread); … … 651 651 } 652 652 free(libtrace->first_packets.packets); 653 assert(pthread_spin_destroy(&libtrace->first_packets.lock)== 0);653 ASSERT_RET(pthread_spin_destroy(&libtrace->first_packets.lock), == 0); 654 654 } 655 655 … … 666 666 /* Empty any packet memory */ 667 667 if (libtrace->state != STATE_NEW) { 668 libtrace_packet_t * packet; 669 while (libtrace_ringbuffer_try_read(&libtrace->packet_freelist,(void **) &packet)) 670 trace_destroy_packet(packet); 671 672 libtrace_ringbuffer_destroy(&libtrace->packet_freelist); 668 // This has all of our packets 669 libtrace_ocache_destroy(&libtrace->packet_freelist); 673 670 674 671 for (i = 0; i < libtrace->perpkt_thread_count; ++i) { … … 721 718 } 722 719 723 DLLEXPORT libtrace_packet_t *trace_create_packet(void) 724 { 725 libtrace_packet_t *packet = 720 DLLEXPORT libtrace_packet_t *trace_create_packet(void) 721 { 722 libtrace_packet_t *packet = 726 723 (libtrace_packet_t*)calloc((size_t)1,sizeof(libtrace_packet_t)); 727 724 … … 1453 1450 #if HAVE_LLVM 1454 1451 if (!filter->jitfilter) { 1455 assert(pthread_mutex_lock(&mutex)== 0);1452 ASSERT_RET(pthread_mutex_lock(&mutex), == 0); 1456 1453 /* Again double check here like the bpf filter */ 1457 1454 if(filter->jitfilter) … … 1462 1459 * as such lock here anyways */ 1463 1460 filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len); 1464 assert(pthread_mutex_unlock(&mutex)== 0);1461 ASSERT_RET(pthread_mutex_unlock(&mutex), == 0); 1465 1462 } 1466 1463 #endif -
lib/trace_parallel.c
r049a700 ra49a9eb 103 103 #define VERBOSE_DEBBUGING 0 104 104 105 106 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets); 107 105 108 extern int libtrace_parallel; 106 109 … … 109 112 uint64_t wait_for_fill_complete_hits; 110 113 } contention_stats[1024]; 114 115 struct mem_stats { 116 struct memfail { 117 uint64_t cache_hit; 118 uint64_t ring_hit; 119 uint64_t miss; 120 uint64_t recycled; 121 } readbulk, read, write, writebulk; 122 }; 123 124 // Grrr gcc wants this spelt out 125 __thread struct mem_stats mem_hits = {{0},{0},{0},{0}}; 126 127 static void print_memory_stats() { 128 char t_name[50]; 129 uint64_t total; 130 pthread_getname_np(pthread_self(), t_name, sizeof(t_name)); 131 132 fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name); 133 134 total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss; 135 if (total) { 136 fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n", 137 mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled); 138 fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n", 139 total, (double) mem_hits.read.miss / (double) total * 100.0); 140 } 141 142 total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss; 143 if (total) { 144 fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n", 145 mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled); 146 147 148 fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n", 149 total, (double) mem_hits.readbulk.miss / (double) total * 100.0); 150 } 151 152 total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss; 153 if (total) { 154 fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n", 155 mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled); 156 157 fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n", 158 total, (double) mem_hits.write.miss / (double) total * 100.0); 159 } 160 161 total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss; 162 if (total) { 163 fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n", 164 mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled); 165 166 fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n", 167 total, (double) mem_hits.writebulk.miss / (double) total * 100.0); 168 } 169 170 } 111 171 112 172 /** … … 165 225 const enum trace_state new_state, const bool need_lock) 166 226 { 167 enum trace_state prev_state;227 UNUSED enum trace_state prev_state; 168 228 if (need_lock) 169 229 pthread_mutex_lock(&trace->libtrace_lock); … … 293 353 static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) { 294 354 trace_make_results_packets_safe(trace); 295 assert(pthread_mutex_lock(&trace->libtrace_lock)== 0);355 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 296 356 thread_change_state(trace, t, THREAD_PAUSED, false); 297 357 while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) { 298 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock)== 0);358 ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0); 299 359 } 300 360 thread_change_state(trace, t, THREAD_RUNNING, false); 301 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); 302 } 361 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 362 } 363 364 #define PACKETQUEUES 10 303 365 304 366 /** … … 309 371 libtrace_thread_t * t; 310 372 libtrace_message_t message = {0}; 311 libtrace_packet_t *packet = NULL; 312 373 libtrace_packet_t *packets[PACKETQUEUES] = {NULL}; 374 size_t nb_packets; 375 size_t i; 313 376 314 377 // Force this thread to wait until trace_pstart has been completed 315 assert(pthread_mutex_lock(&trace->libtrace_lock)== 0);378 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 316 379 t = get_thread_table(trace); 317 380 assert(t); … … 320 383 trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace)); 321 384 } 322 assert(pthread_mutex_unlock(&trace->libtrace_lock)== 0);385 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 323 386 324 387 /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */ … … 333 396 334 397 for (;;) { 335 int psize;336 398 337 399 if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) { … … 346 408 // The hasher has stopped by this point, so the queue shouldn't be filling 347 409 while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 348 psize = trace_pread_packet(trace, t, &packet); 349 if (psize > 0) { 350 packet = (*trace->per_pkt)(trace, packet, NULL, t); 410 nb_packets = trace_pread_packet(trace, t, packets, 1); 411 if (nb_packets == 1) { 412 if (packets[0]->error > 0) 413 packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t); 351 414 } else { 352 fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", p size, libtrace_ringbuffer_is_empty(&t->rbuffer));415 fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", packets[0]->error, libtrace_ringbuffer_is_empty(&t->rbuffer)); 353 416 } 354 417 } … … 369 432 370 433 if (trace->perpkt_thread_count == 1) { 371 if (!packet) { 372 if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet)) 373 packet = trace_create_packet(); 434 if (!packets[0]) { 435 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1); 374 436 } 375 assert(packet); 376 if ((psize = trace_read_packet(trace, packet)) <1) { 377 break; 437 assert(packets[0]); 438 packets[0]->error = trace_read_packet(trace, packets[0]); 439 nb_packets = 1; 440 } else { 441 nb_packets = trace_pread_packet(trace, t, packets, PACKETQUEUES); 442 } 443 // Loop through the packets we just read 444 for (i = 0; i < nb_packets; ++i) { 445 446 if (packets[i]->error > 0) { 447 packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t); 448 } else if (packets[i]->error != -2) { 449 // An error this should be the last packet we read 450 size_t z; 451 for (z = i ; z < nb_packets; ++z) 452 fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[i]->error); 453 assert (i == nb_packets-1); 454 goto stop; 378 455 } 379 } else { 380 psize = trace_pread_packet(trace, t, &packet); 381 } 382 383 if (psize > 0) { 384 packet = (*trace->per_pkt)(trace, packet, NULL, t); 385 continue; 386 } 387 388 if (psize == -2) 389 continue; // We have a message 390 391 if (psize < 1) { // consider sending a message 392 break; 393 } 394 456 // -2 is a message its not worth checking now just finish this lot and we'll check 457 // when we loop next 458 } 395 459 } 396 460 … … 404 468 (*trace->per_pkt)(trace, NULL, &message, t); 405 469 406 // Free our last packet 407 if (packet) 408 trace_destroy_packet(packet); 470 // Free any remaining packets 471 for (i = 0; i < PACKETQUEUES; i++) { 472 if (packets[i]) { 473 libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1); 474 packets[i] = NULL; 475 } 476 } 409 477 410 478 … … 416 484 trace_send_message_to_reducer(trace, &message); 417 485 418 assert(pthread_mutex_lock(&trace->libtrace_lock)== 0);486 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 419 487 if (trace->format->punregister_thread) { 420 488 trace->format->punregister_thread(trace, t); 421 489 } 422 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); 490 print_memory_stats(); 491 492 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 423 493 424 494 pthread_exit(NULL); … … 439 509 assert(trace_has_dedicated_hasher(trace)); 440 510 /* Wait until all threads are started and objects are initialised (ring buffers) */ 441 assert(pthread_mutex_lock(&trace->libtrace_lock)== 0);511 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 442 512 t = &trace->hasher_thread; 443 513 assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid)); … … 446 516 trace->format->pregister_thread(trace, t, true); 447 517 } 448 assert(pthread_mutex_unlock(&trace->libtrace_lock)== 0);518 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 449 519 int pkt_skipped = 0; 450 520 /* Read all packets in then hash and queue against the correct thread */ 451 521 while (1) { 452 522 int thread; 453 if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))454 packet = trace_create_packet();523 if (!pkt_skipped) 524 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1); 455 525 assert(packet); 456 526 … … 462 532 switch(message.code) { 463 533 case MESSAGE_DO_PAUSE: 464 assert(pthread_mutex_lock(&trace->libtrace_lock)== 0);534 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 465 535 thread_change_state(trace, t, THREAD_PAUSED, false); 466 536 pthread_cond_broadcast(&trace->perpkt_cond); 467 537 while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) { 468 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock)== 0);538 ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0); 469 539 } 470 540 thread_change_state(trace, t, THREAD_RUNNING, false); 471 541 pthread_cond_broadcast(&trace->perpkt_cond); 472 assert(pthread_mutex_unlock(&trace->libtrace_lock)== 0);542 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 473 543 break; 474 544 case MESSAGE_DO_STOP: … … 506 576 bcast = packet; 507 577 } else { 508 bcast = trace_create_packet();578 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1); 509 579 bcast->error = packet->error; 510 580 } 511 assert(pthread_mutex_lock(&trace->libtrace_lock)== 0);581 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 512 582 if (trace->perpkt_threads[i].state != THREAD_FINISHED) { 513 assert(pthread_mutex_unlock(&trace->libtrace_lock)== 0);583 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 514 584 // Unlock early otherwise we could deadlock 515 585 libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast); 516 586 } else { 517 assert(pthread_mutex_unlock(&trace->libtrace_lock)== 0);587 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 518 588 } 519 589 } … … 526 596 message.additional.uint64 = 0; 527 597 trace_send_message_to_reducer(trace, &message); 528 assert(pthread_mutex_lock(&trace->libtrace_lock)== 0);598 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 529 599 if (trace->format->punregister_thread) { 530 600 trace->format->punregister_thread(trace, t); 531 601 } 532 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); 602 print_memory_stats(); 603 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 533 604 534 605 // TODO remove from TTABLE t sometime … … 553 624 } 554 625 626 /** 627 * @brief Move NULLs to the end of an array. 628 * @param values 629 * @param len 630 * @return The location the first NULL, aka the number of non NULL elements 631 */ 632 static inline size_t move_nulls_back(void *arr[], size_t len) { 633 size_t fr=0, en = len-1; 634 // Shift all non NULL elements to the front of the array, and NULLs to the 635 // end, traverses every element at most once 636 for (;fr < en; ++fr) { 637 if (arr[fr] == NULL) { 638 for (;en > fr; --en) { 639 if(arr[en]) { 640 arr[fr] = arr[en]; 641 arr[en] = NULL; 642 break; 643 } 644 } 645 } 646 } 647 // This is the index of the first NULL 648 en = MIN(fr, en); 649 // Or the end of the array if this special case 650 if (arr[en]) 651 en++; 652 return en; 653 } 654 655 /** returns the number of packets successfully allocated in the final array 656 these will all be at the front of the array */ 657 inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) { 658 size_t nb; 659 nb = move_nulls_back((void **) packets, nb_packets); 660 mem_hits.read.recycled += nb; 661 nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb); 662 assert(nb_packets == nb); 663 return nb; 664 } 665 666 667 inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) { 668 size_t nb; 669 nb = move_nulls_back((void **) packets, nb_packets); 670 mem_hits.write.recycled += nb_packets - nb; 671 nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb); 672 memset(packets, 0, nb); // XXX make better, maybe do this in ocache?? 673 return nb; 674 } 675 555 676 /* Our simplest case when a thread becomes ready it can obtain an exclusive 556 * lock to read a packet from the underlying trace. 557 */ 558 inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet) 559 { 560 // We need this to fill the 'first' packet table 561 if (!*packet) { 562 if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet)) 563 *packet = trace_create_packet(); 564 } 565 assert(*packet); 566 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 567 /* Read a packet */ 568 (*packet)->error = trace_read_packet(libtrace, *packet); 569 // Doing this inside the lock ensures the first packet is always 570 // recorded first 571 if ((*packet)->error > 0) 572 store_first_packet(libtrace, *packet, t); 573 574 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 575 return (*packet)->error; 677 * lock to read packets from the underlying trace. 678 */ 679 inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets) 680 { 681 size_t i = 0; 682 683 nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets); 684 685 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 686 /* Read nb_packets */ 687 for (i = 0; i < nb_packets; ++i) { 688 packets[i]->error = trace_read_packet(libtrace, packets[i]); 689 // Doing this inside the lock ensures the first packet is always 690 // recorded first 691 if (packets[i]->error <= 0) { 692 ++i; 693 break; 694 } 695 } 696 if (packets[0]->error > 0) { 697 store_first_packet(libtrace, packets[0], t); 698 } 699 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 700 return i; 576 701 } 577 702 … … 581 706 * 2. Move that into the packet provided (packet) 582 707 */ 583 inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet) 584 { 585 if (*packet) // Recycle the old get the new 586 if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet)) 587 trace_destroy_packet(*packet); 588 *packet = libtrace_ringbuffer_read(&t->rbuffer); 589 590 if (*packet) { 708 inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) 709 { 710 size_t i; 711 712 // Always grab at least one 713 if (packets[0]) // Recycle the old get the new 714 libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1); 715 packets[0] = libtrace_ringbuffer_read(&t->rbuffer); 716 717 718 if (packets[0] == NULL) { 719 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packets, 1, 1); 720 packets[0]->error = -2; 721 } 722 723 if (packets[0]->error < 0) 724 return 1; 725 726 for (i = 1; i < nb_packets; i++) { 727 if (packets[i]) // Recycle the old get the new 728 libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1); 729 if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) { 730 packets[i] = NULL; 731 break; 732 } 733 // Message wating 734 if (packets[i] == NULL) { 735 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[i], 1, 1); 736 packets[i]->error = -2; 737 ++i; 738 break; 739 } 740 } 741 742 return i; 743 /*if (*packet) { 591 744 return (*packet)->error; 592 745 } else { … … 597 750 fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n"); 598 751 return -2; 599 } 752 }*/ 600 753 } 601 754 … … 613 766 614 767 if (*packet) // Recycle the old get the new 615 if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet)) 616 trace_destroy_packet(*packet); 768 libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1); 617 769 *packet = retrived_packet; 618 770 *ret = (*packet)->error; … … 635 787 do { 636 788 // Wait for a thread to end 637 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);789 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 638 790 639 791 // Check before 640 792 if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) { 641 793 complete = true; 642 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);794 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 643 795 continue; 644 796 } 645 797 646 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock)== 0);798 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0); 647 799 648 800 // Check after 649 801 if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) { 650 802 complete = true; 651 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);803 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 652 804 continue; 653 805 } 654 806 655 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);807 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 656 808 657 809 // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue … … 696 848 return ret; 697 849 // Can still block here if another thread is writing to a full queue 698 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);850 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 699 851 700 852 // Its impossible for our own queue to overfill, because no one can write 701 853 // when we are in the lock 702 854 if(try_waiting_queue(libtrace, t, packet, &ret)) { 703 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);855 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 704 856 return ret; 705 857 } … … 708 860 if (libtrace->perpkt_queue_full) { 709 861 contention_stats[t->perpkt_num].wait_for_fill_complete_hits++; 710 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);862 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 711 863 continue; 712 864 } 713 865 714 if (!*packet) { 715 if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet)) 716 *packet = trace_create_packet(); 717 } 866 if (!*packet) 867 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1); 718 868 assert(*packet); 719 869 720 870 // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock) 721 871 if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) { 722 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);872 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 723 873 if (libtrace_halt) 724 874 return 0; … … 731 881 if (thread == t->perpkt_num) { 732 882 // If it's this thread we must be in order because we checked the buffer once we got the lock 733 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);883 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 734 884 return (*packet)->error; 735 885 } … … 738 888 while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) { 739 889 libtrace->perpkt_queue_full = true; 740 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);890 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 741 891 contention_stats[t->perpkt_num].full_queue_hits++; 742 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);892 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 743 893 } 744 894 *packet = NULL; … … 748 898 assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner"); 749 899 } 750 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);900 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 751 901 } 752 902 } … … 779 929 // We limit the number of packets we get to the size of the sliding window 780 930 // such that it is impossible for any given thread to fail to store a packet 781 assert(sem_wait(&libtrace->sem)== 0);931 ASSERT_RET(sem_wait(&libtrace->sem), == 0); 782 932 /*~~~~Single threaded read of a packet~~~~*/ 783 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);933 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 784 934 785 935 /* Re-check our queue things we might have data waiting */ 786 936 if(try_waiting_queue(libtrace, t, packet, &ret)) { 787 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);788 assert(sem_post(&libtrace->sem)== 0);937 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 938 ASSERT_RET(sem_post(&libtrace->sem), == 0); 789 939 return ret; 790 940 } … … 792 942 // TODO put on *proper* condition variable 793 943 if (libtrace->perpkt_queue_full) { 794 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);795 assert(sem_post(&libtrace->sem)== 0);944 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 945 ASSERT_RET(sem_post(&libtrace->sem), == 0); 796 946 contention_stats[t->perpkt_num].wait_for_fill_complete_hits++; 797 947 continue; 798 948 } 799 949 800 if (!*packet) { 801 if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet)) 802 *packet = trace_create_packet(); 803 } 950 if (!*packet) 951 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1); 804 952 assert(*packet); 805 953 806 954 if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) { 807 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);808 assert(sem_post(&libtrace->sem)== 0);955 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 956 ASSERT_RET(sem_post(&libtrace->sem), == 0); 809 957 // Finish this thread ensuring that any data written later by another thread is retrieved also 810 958 if (libtrace_halt) … … 813 961 return trace_finish_perpkt(libtrace, packet, t); 814 962 } 815 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);963 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 816 964 817 965 /* ~~~~Multiple threads can run the hasher~~~~ */ … … 819 967 820 968 /* Yes this is correct opposite read lock for a write operation */ 821 assert(pthread_rwlock_rdlock(&libtrace->window_lock)== 0);969 ASSERT_RET(pthread_rwlock_rdlock(&libtrace->window_lock), == 0); 822 970 if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet)) 823 971 assert(!"Semaphore should stop us from ever overfilling the sliding window"); 824 assert(pthread_rwlock_unlock(&libtrace->window_lock)== 0);972 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0); 825 973 *packet = NULL; 826 974 827 975 // Always try read any data from the sliding window 828 976 while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) { 829 assert(pthread_rwlock_wrlock(&libtrace->window_lock)== 0);977 ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0); 830 978 if (libtrace->perpkt_queue_full) { 831 979 // I might be the holdup in which case if I can read my queue I should do that and return 832 980 if(try_waiting_queue(libtrace, t, packet, &ret)) { 833 assert(pthread_rwlock_unlock(&libtrace->window_lock)== 0);981 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0); 834 982 return ret; 835 983 } 836 assert(pthread_rwlock_unlock(&libtrace->window_lock)== 0);984 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0); 837 985 continue; 838 986 } … … 851 999 // We must be able to write this now 100% without fail 852 1000 libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet); 853 assert(sem_post(&libtrace->sem)== 0);854 assert(pthread_rwlock_unlock(&libtrace->window_lock)== 0);1001 ASSERT_RET(sem_post(&libtrace->sem), == 0); 1002 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0); 855 1003 return ret; 856 1004 } else { … … 860 1008 // Not us we have to give the other threads a chance to write there packets then 861 1009 libtrace->perpkt_queue_full = true; 862 assert(pthread_rwlock_unlock(&libtrace->window_lock)== 0);1010 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0); 863 1011 for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets 864 assert(sem_post(&libtrace->sem)== 0);1012 ASSERT_RET(sem_post(&libtrace->sem), == 0); 865 1013 866 1014 contention_stats[t->perpkt_num].full_queue_hits++; 867 assert(pthread_rwlock_wrlock(&libtrace->window_lock)== 0);1015 ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0); 868 1016 // Grab these back 869 1017 for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets 870 assert(sem_wait(&libtrace->sem)== 0);1018 ASSERT_RET(sem_wait(&libtrace->sem), == 0); 871 1019 libtrace->perpkt_queue_full = false; 872 1020 } 873 assert(sem_post(&libtrace->sem)== 0);1021 ASSERT_RET(sem_post(&libtrace->sem), == 0); 874 1022 *packet = NULL; 875 1023 } else { … … 879 1027 } 880 1028 } 881 assert(pthread_rwlock_unlock(&libtrace->window_lock)== 0);1029 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0); 882 1030 } 883 1031 // Now we go back to checking our queue anyways … … 902 1050 gettimeofday(&tv, NULL); 903 1051 dup = trace_copy_packet(packet); 904 assert(pthread_spin_lock(&libtrace->first_packets.lock)== 0);1052 ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0); 905 1053 libtrace->first_packets.packets[t->perpkt_num].packet = dup; 906 1054 //printf("Stored first packet time=%f\n", trace_get_seconds(dup)); … … 918 1066 libtrace->first_packets.first = t->perpkt_num; 919 1067 } 920 assert(pthread_spin_unlock(&libtrace->first_packets.lock)== 0);1068 ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0); 921 1069 libtrace_message_t mesg = {0}; 922 1070 mesg.code = MESSAGE_FIRST_PACKET; … … 936 1084 { 937 1085 int ret = 0; 938 assert(pthread_spin_lock(&libtrace->first_packets.lock)== 0);1086 ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0); 939 1087 if (libtrace->first_packets.count) { 940 1088 *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet; … … 956 1104 *tv = NULL; 957 1105 } 958 assert(pthread_spin_unlock(&libtrace->first_packets.lock)== 0);1106 ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0); 959 1107 return ret; 960 1108 } … … 1110 1258 1111 1259 /** 1112 * Read a packet from the parallel trace 1113 */ 1114 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet) 1115 { 1116 int ret; 1117 1118 // Cleanup the packet passed back 1119 if (*packet) 1120 trace_fin_packet(*packet); 1260 * Read packets from the parallel trace 1261 * @return the number of packets read, null packets indicate messages. Check packet->error before 1262 * assuming a packet is valid. 1263 */ 1264 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets) 1265 { 1266 size_t ret; 1267 size_t i; 1268 assert(nb_packets); 1269 1270 for (i = 0; i < nb_packets; i++) { 1271 // Cleanup the packet passed back 1272 if (packets[i]) 1273 trace_fin_packet(packets[i]); 1274 } 1121 1275 1122 1276 if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { 1123 if (!*packet) 1124 *packet = trace_create_packet(); 1125 ret = trace_pread_packet_wrapper(libtrace, t, *packet); 1277 if (!packets[0]) 1278 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **)packets, 1, 1); 1279 packets[0]->error = trace_pread_packet_wrapper(libtrace, t, *packets); 1280 ret = 1; 1126 1281 } else if (trace_has_dedicated_hasher(libtrace)) { 1127 ret = trace_pread_packet_hasher_thread(libtrace, t, packet );1282 ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets); 1128 1283 } else if (!trace_has_dedicated_hasher(libtrace)) { 1129 1284 /* We don't care about which core a packet goes to */ 1130 ret = trace_pread_packet_first_in_first_served(libtrace, t, packet );1285 ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets); 1131 1286 } /* else { 1132 1287 ret = trace_pread_packet_hash_locked(libtrace, packet); … … 1135 1290 // Formats can also optionally do this internally to ensure the first 1136 1291 // packet is always reported correctly 1137 if (ret > 0) { 1138 store_first_packet(libtrace, *packet, t); 1292 assert(ret); 1293 assert(ret <= nb_packets); 1294 if (packets[0]->error > 0) { 1295 store_first_packet(libtrace, packets[0], t); 1139 1296 if (libtrace->tracetime) 1140 delay_tracetime(libtrace, *packet, t);1297 delay_tracetime(libtrace, packets[0], t); 1141 1298 } 1142 1299 … … 1149 1306 static inline int trace_start_perpkt_threads (libtrace_t *libtrace) { 1150 1307 int i; 1151 1308 char name[16]; 1152 1309 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1153 1310 libtrace_thread_t *t = &libtrace->perpkt_threads[i]; 1154 assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0); 1311 ASSERT_RET(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace), == 0); 1312 snprintf(name, 16, "perpkt-%d", i); 1313 pthread_setname_np(t->tid, name); 1155 1314 } 1156 1315 return libtrace->perpkt_thread_count; … … 1168 1327 { 1169 1328 int i; 1329 char name[16]; 1170 1330 sigset_t sig_before, sig_block_all; 1171 1331 assert(libtrace); … … 1177 1337 if (libtrace->state != STATE_NEW) { 1178 1338 int err = 0; 1179 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);1339 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1180 1340 if (libtrace->state != STATE_PAUSED) { 1181 1341 trace_set_err(libtrace, TRACE_ERR_BAD_STATE, 1182 1342 "The trace(%s) has already been started and is not paused!!", libtrace->uridata); 1183 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);1343 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1184 1344 return -1; 1185 1345 } … … 1207 1367 libtrace_change_state(libtrace, STATE_RUNNING, false); 1208 1368 } 1209 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);1369 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1210 1370 return err; 1211 1371 } … … 1219 1379 libtrace->reducer = reducer; 1220 1380 1221 assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL)== 0);1222 assert(pthread_cond_init(&libtrace->perpkt_cond, NULL)== 0);1223 assert(pthread_rwlock_init(&libtrace->window_lock, NULL)== 0);1381 ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0); 1382 ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0); 1383 ASSERT_RET(pthread_rwlock_init(&libtrace->window_lock, NULL), == 0); 1224 1384 // Grab the lock 1225 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);1385 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1226 1386 1227 1387 // Set default buffer sizes … … 1250 1410 sigemptyset(&sig_block_all); 1251 1411 1252 assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before)== 0);1412 ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0); 1253 1413 1254 1414 // If we are using a hasher start it … … 1261 1421 t->state = THREAD_RUNNING; 1262 1422 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t)); 1263 assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0); 1423 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace), == 0); 1424 snprintf(name, sizeof(name), "hasher-thread"); 1425 pthread_setname_np(t->tid, name); 1264 1426 } else { 1265 1427 libtrace->hasher_thread.type = THREAD_EMPTY; 1266 1428 } 1267 libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING); 1429 //libtrace_ocache_init(&libtrace->packet_freelist, trace_create_packet, trace_destroy_packet, 64, libtrace->packet_freelist_size * 4, true); 1430 libtrace_ocache_init(&libtrace->packet_freelist, 1431 (void* (*)()) trace_create_packet, 1432 (void (*)(void *))trace_destroy_packet, 1433 64, 1434 libtrace->packet_freelist_size * 4, 1435 true); 1268 1436 //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0); 1269 assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size)== 0);1437 ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0); 1270 1438 // This will be applied to every new thread that starts, i.e. they will block all signals 1271 1439 // Lets start a fixed number of reading threads … … 1281 1449 libtrace->first_packets.first = 0; 1282 1450 libtrace->first_packets.count = 0; 1283 assert(pthread_spin_init(&libtrace->first_packets.lock, 0)== 0);1451 ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0); 1284 1452 libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct __packet_storage_magic_type)); 1285 1453 … … 1297 1465 t->perpkt_num = i; 1298 1466 if (libtrace->hasher) 1299 libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_ BLOCKING);1467 libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING); 1300 1468 // Depending on the mode vector or deque might be chosen 1301 1469 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t)); … … 1305 1473 t->tmp_data = NULL; 1306 1474 t->recorded_first = false; 1307 assert(pthread_spin_init(&t->tmp_spinlock, 0)== 0);1475 ASSERT_RET(pthread_spin_init(&t->tmp_spinlock, 0), == 0); 1308 1476 t->tracetime_offset_usec = 0;; 1309 1477 } … … 1326 1494 libtrace->keepalive_thread.state = THREAD_RUNNING; 1327 1495 libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t)); 1328 assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace)== 0);1496 ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0); 1329 1497 } 1330 1498 … … 1335 1503 1336 1504 // Revert back - Allow signals again 1337 assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL)== 0);1338 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);1505 ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0); 1506 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1339 1507 1340 1508 if (threads_started < 0) … … 1368 1536 t = get_thread_table(libtrace); 1369 1537 // Check state from within the lock if we are going to change it 1370 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);1538 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1371 1539 if (!libtrace->started || libtrace->state != STATE_RUNNING) { 1372 1540 fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state); 1373 1541 trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()"); 1374 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);1542 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1375 1543 return -1; 1376 1544 } 1377 1545 1378 1546 libtrace_change_state(libtrace, STATE_PAUSING, false); 1379 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);1547 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1380 1548 1381 1549 // Special case handle the hasher thread case … … 1386 1554 trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message); 1387 1555 // Wait for it to pause 1388 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);1556 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1389 1557 while (libtrace->hasher_thread.state == THREAD_RUNNING) { 1390 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock)== 0);1391 } 1392 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);1558 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0); 1559 } 1560 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1393 1561 } 1394 1562 … … 1425 1593 1426 1594 // Wait for all threads to pause 1427 assert(pthread_mutex_lock(&libtrace->libtrace_lock)== 0);1595 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1428 1596 while(libtrace->perpkt_thread_states[THREAD_RUNNING]) { 1429 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock)== 0);1430 } 1431 assert(pthread_mutex_unlock(&libtrace->libtrace_lock)== 0);1597 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0); 1598 } 1599 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1432 1600 1433 1601 fprintf(stderr, "Threads have paused\n"); … … 1559 1727 for (i=0; i< libtrace->perpkt_thread_count; i++) { 1560 1728 //printf("Waiting to join with perpkt #%d\n", i); 1561 assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL)== 0);1729 ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0); 1562 1730 //printf("Joined with perpkt #%d\n", i); 1563 1731 // So we must do our best effort to empty the queue - so … … 1607 1775 1608 1776 libtrace_change_state(libtrace, STATE_JOINED, true); 1777 print_memory_stats(); 1609 1778 } 1610 1779 … … 1977 2146 DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) { 1978 2147 libtrace_packet_t* result; 1979 if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result)) 1980 result = trace_create_packet(); 2148 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1); 1981 2149 assert(result); 1982 2150 swap_packets(result, packet); // Move the current packet into our copy … … 1984 2152 } 1985 2153 1986 DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t *packet) {2154 DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { 1987 2155 // Try write back the packet 1988 2156 assert(packet); 1989 2157 // Always release any resources this might be holding such as a slot in a ringbuffer 1990 2158 trace_fin_packet(packet); 1991 if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) { 1992 /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */ 1993 //assert(1 == 90); 1994 trace_destroy_packet(packet); 1995 } 2159 libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1); 1996 2160 } 1997 2161 -
test/test-datastruct-ringbuffer.c
r19135af ra49a9eb 26 26 } 27 27 28 static void * producer_bulk(void * a) { 29 libtrace_ringbuffer_t * rb = (libtrace_ringbuffer_t *) a; 30 char * i; 31 for (i = NULL; i < TEST_SIZE; i++) { 32 assert(libtrace_ringbuffer_write_bulk(rb, (void **) &i, 1, 1) == 1); 33 } 34 return 0; 35 } 36 37 static void * consumer_bulk(void * a) { 38 libtrace_ringbuffer_t * rb = (libtrace_ringbuffer_t *) a; 39 char *i; 40 void *value; 41 for (i = NULL; i < TEST_SIZE; i++) { 42 assert (libtrace_ringbuffer_read_bulk(rb, &value, 1, 1) == 1); 43 assert(value == i); 44 } 45 return 0; 46 } 47 48 28 49 /** 29 50 * Tests the ringbuffer data structure, first this establishes that single … … 39 60 40 61 libtrace_ringbuffer_init(&rb_block, (size_t) RINGBUFFER_SIZE, LIBTRACE_RINGBUFFER_BLOCKING); 41 libtrace_ringbuffer_init(&rb_polling, (size_t) RINGBUFFER_SIZE, LIBTRACE_RINGBUFFER_ BLOCKING);62 libtrace_ringbuffer_init(&rb_polling, (size_t) RINGBUFFER_SIZE, LIBTRACE_RINGBUFFER_POLLING); 42 63 assert(libtrace_ringbuffer_is_empty(&rb_block)); 43 64 assert(libtrace_ringbuffer_is_empty(&rb_polling)); … … 105 126 assert(libtrace_ringbuffer_is_empty(&rb_polling)); 106 127 128 pthread_create(&t[0], NULL, &producer_bulk, (void *) &rb_block); 129 pthread_create(&t[1], NULL, &consumer_bulk, (void *) &rb_block); 130 pthread_join(t[0], NULL); 131 pthread_join(t[1], NULL); 132 assert(libtrace_ringbuffer_is_empty(&rb_block)); 133 134 pthread_create(&t[0], NULL, &producer_bulk, (void *) &rb_polling); 135 pthread_create(&t[1], NULL, &consumer_bulk, (void *) &rb_polling); 136 pthread_join(t[0], NULL); 137 pthread_join(t[1], NULL); 138 assert(libtrace_ringbuffer_is_empty(&rb_polling)); 139 107 140 return 0; 108 141 }
Note: See TracChangeset
for help on using the changeset viewer.