Changeset fed9152
- Timestamp:
- 01/20/15 10:52:15 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 1b59edf, cb39d35
- Parents:
- 18bf317 (diff), 04bf7c5 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/data-struct/object_cache.c
r6e41e73 r04bf7c5 175 175 * reads will block (free never should).Otherwise packets can be freely 176 176 * allocated upon requested and are free'd if there is not enough space for them. 177 * @return Returns The number of packets outstanding, or extra object recevied 178 * Ideally this should be zero (0) otherwise some form of memory leak 179 * is likely present. 177 * @return If successful returns 0 otherwise -1. 180 178 */ 181 DLLEXPORT void libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), void (*free)(void *), 182 size_t thread_cache_size, size_t buffer_size, bool limit_size) { 179 DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), 180 void (*free)(void *), 181 size_t thread_cache_size, 182 size_t buffer_size, bool limit_size) { 183 183 184 184 assert(buffer_size); 185 185 assert(alloc); 186 186 assert(free); 187 libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING); 187 if (libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING) != 0) { 188 return -1; 189 } 188 190 oc->alloc = alloc; 189 191 oc->free = free; … … 193 195 oc->max_nb_thread_list = 0x10; 194 196 oc->thread_list = calloc(0x10, sizeof(void*)); 197 if (oc->thread_list == NULL) { 198 libtrace_ringbuffer_destroy(&oc->rb); 199 return -1; 200 } 195 201 pthread_spin_init(&oc->spin, 0); 196 202 if (limit_size) … … 198 204 else 199 205 oc->max_allocations = 0; 206 return 0; 200 207 } 201 208 -
lib/data-struct/object_cache.h
r6e41e73 r04bf7c5 20 20 } libtrace_ocache_t; 21 21 22 DLLEXPORT voidlibtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), void (*free)(void*),22 DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), void (*free)(void*), 23 23 size_t thread_cache_size, size_t buffer_size, bool limit_size); 24 24 DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc); -
lib/data-struct/ring_buffer.c
ra49a9eb r04bf7c5 47 47 * becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING. 48 48 * NOTE: this mainly applies to the blocking functions 49 */ 50 DLLEXPORT void libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) { 49 * @return If successful returns 0 otherwise -1 upon failure. 50 */ 51 DLLEXPORT int libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) { 51 52 size = size + 1; 52 assert (size > 1); 53 rb->size = size; // Only this -1 actually usable :) 53 if (!(size > 1)) 54 return -1; 55 rb->size = size; 54 56 rb->start = 0; 55 57 rb->end = 0; 56 58 rb->elements = calloc(rb->size, sizeof(void*)); 57 assert(rb->elements); 59 if (!rb->elements) 60 return -1; 58 61 rb->mode = mode; 59 62 if (mode == LIBTRACE_RINGBUFFER_BLOCKING) { 60 /* The signaling part - i.e. release when data 's ready to read */63 /* The signaling part - i.e. release when data is ready to read */ 61 64 pthread_cond_init(&rb->full_cond, NULL); 62 65 pthread_cond_init(&rb->empty_cond, NULL); … … 75 78 ASSERT_RET(pthread_mutex_init(&rb->rlock, NULL), == 0); 76 79 #endif 80 return 0; 77 81 } 78 82 -
lib/data-struct/ring_buffer.h
ra49a9eb r04bf7c5 32 32 } libtrace_ringbuffer_t; 33 33 34 DLLEXPORT voidlibtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode);34 DLLEXPORT int libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode); 35 35 DLLEXPORT void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb); 36 36 DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb); -
lib/libtrace_int.h
rd7fd648 r04bf7c5 292 292 int perpkt_thread_states[THREAD_STATE_MAX]; 293 293 294 /** For the sliding window hasher implementation */295 pthread_rwlock_t window_lock;296 294 /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */ 297 295 bool perpkt_queue_full; … … 315 313 int perpkt_thread_count; 316 314 libtrace_thread_t * perpkt_threads; // All our perpkt threads 317 libtrace_slidingwindow_t sliding_window;318 sem_t sem;319 315 // Used to keep track of the first packet seen on each thread 320 316 struct first_packets first_packets; … … 881 877 struct libtrace_info_t info; 882 878 883 /** Starts or unpauses an input trace in parallel mode - note that 879 /** 880 * Starts or unpauses an input trace in parallel mode - note that 884 881 * this function is often the one that opens the file or device for 885 882 * reading. 886 883 * 887 884 * @param libtrace The input trace to be started or unpaused 888 * @return If successful the number of threads started, 0 indicates 889 * no threads started and this should be done automatically. 890 * Otherwise in event of an error -1 is returned. 885 * @return 0 upon success. 886 * Otherwise in event of an error -1 is returned. 891 887 * 892 888 */ -
lib/trace.c
rbdc8b36 r04bf7c5 108 108 static struct libtrace_format_t *formats_list = NULL; 109 109 110 int libtrace_halt = 0;110 volatile int libtrace_halt = 0; 111 111 112 112 /* Set once pstart is called used for backwards compatibility reasons */ … … 261 261 262 262 /* Parallel inits */ 263 // libtrace->libtrace_lock264 // libtrace->perpkt_cond;263 ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0); 264 ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0); 265 265 libtrace->state = STATE_NEW; 266 266 libtrace->perpkt_queue_full = false; … … 273 273 libtrace_zero_thread(&libtrace->reporter_thread); 274 274 libtrace_zero_thread(&libtrace->keepalive_thread); 275 libtrace_zero_slidingwindow(&libtrace->sliding_window);276 275 libtrace->reporter_thread.type = THREAD_EMPTY; 277 276 libtrace->perpkt_thread_count = 0; … … 381 380 382 381 /* Parallel inits */ 383 // libtrace->libtrace_lock384 // libtrace->perpkt_cond;382 ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0); 383 ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0); 385 384 libtrace->state = STATE_NEW; // TODO MAYBE DEAD 386 385 libtrace->perpkt_queue_full = false; … … 393 392 libtrace_zero_thread(&libtrace->reporter_thread); 394 393 libtrace_zero_thread(&libtrace->keepalive_thread); 395 libtrace_zero_slidingwindow(&libtrace->sliding_window);396 394 libtrace->reporter_thread.type = THREAD_EMPTY; 397 395 libtrace->perpkt_thread_count = 0; … … 634 632 */ 635 633 DLLEXPORT void trace_destroy(libtrace_t *libtrace) { 636 634 int i; 637 635 assert(libtrace); 638 636 639 /* destroy any packet that are still around */ 637 ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0); 638 ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0); 639 640 /* destroy any packets that are still around */ 640 641 if (libtrace->state != STATE_NEW && libtrace->first_packets.packets) { 641 642 for (i = 0; i < libtrace->perpkt_thread_count; ++i) { … … 687 688 DLLEXPORT void trace_destroy_dead(libtrace_t *libtrace) { 688 689 assert(libtrace); 690 691 ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0); 692 ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0); 689 693 690 694 /* Don't call pause_input or fin_input, because we should never have -
lib/trace_parallel.c
rd51fd26 r04bf7c5 170 170 171 171 /** 172 * True if the trace has dedicated hasher thread otherwise false ,173 * to be used after the trace is running172 * True if the trace has dedicated hasher thread otherwise false. 173 * This can be used once the hasher thread has been started. 174 174 */ 175 175 static inline int trace_has_dedicated_hasher(libtrace_t * libtrace) 176 176 { 177 assert(libtrace->state != STATE_NEW);178 177 return libtrace->hasher_thread.type == THREAD_HASHER; 179 178 } … … 230 229 prev_state, t->state); 231 230 231 pthread_cond_broadcast(&trace->perpkt_cond); 232 232 if (need_lock) 233 233 pthread_mutex_unlock(&trace->libtrace_lock); 234 pthread_cond_broadcast(&trace->perpkt_cond);235 234 } 236 235 … … 257 256 get_trace_state_name(trace->state)); 258 257 258 pthread_cond_broadcast(&trace->perpkt_cond); 259 259 if (need_lock) 260 260 pthread_mutex_unlock(&trace->libtrace_lock); 261 pthread_cond_broadcast(&trace->perpkt_cond);262 261 } 263 262 … … 293 292 294 293 void libtrace_zero_thread(libtrace_thread_t * t) { 294 t->accepted_packets = 0; 295 t->recorded_first = false; 296 t->tracetime_offset_usec = 0; 297 t->user_data = 0; 298 t->format_data = 0; 299 libtrace_zero_ringbuffer(&t->rbuffer); 295 300 t->trace = NULL; 296 301 t->ret = NULL; 297 302 t->type = THREAD_EMPTY; 298 libtrace_zero_ringbuffer(&t->rbuffer);299 t->recorded_first = false;300 303 t->perpkt_num = -1; 301 t->accepted_packets = 0;302 304 } 303 305 … … 414 416 size_t z; 415 417 // We could have an eof or error and a message such as pause 416 for (z = i ; z < nb_packets; ++z) {417 fprintf(stderr, "i=%d nb_packet =%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);418 for (z = i + 1 ; z < nb_packets; ++z) { 419 fprintf(stderr, "i=%d nb_packets=%d err=%d, seq=%d\n", (int) z, (int) nb_packets, packets[z]->error, (int) packets[z]->order); 418 420 assert (packets[z]->error <= 0); 419 421 } … … 461 463 int ret; 462 464 463 /* Fill it with empty packets */ 464 memset(&packets, 0, sizeof(void*) * trace->config.burst_size); 465 libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets, trace->config.burst_size, trace->config.burst_size); 466 467 // Force this thread to wait until trace_pstart has been completed 465 /* Wait until trace_pstart has been completed */ 468 466 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 469 467 t = get_thread_table(trace); 470 468 assert(t); 469 if (trace->state == STATE_ERROR) { 470 thread_change_state(trace, t, THREAD_FINISHED, false); 471 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 472 pthread_exit(NULL); 473 } 471 474 //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace)); 472 475 if (trace->format->pregister_thread) { … … 474 477 } 475 478 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 479 480 /* Fill our buffer with empty packets */ 481 memset(&packets, 0, sizeof(void*) * trace->config.burst_size); 482 libtrace_ocache_alloc(&trace->packet_freelist, (void **) packets, 483 trace->config.burst_size, 484 trace->config.burst_size); 476 485 477 486 /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */ … … 541 550 else if (ret != nb_packets) { 542 551 // Refill the empty packets 543 printf("Refilling packets ret=%d nb_packets=%zd\n", ret, nb_packets);552 //printf("Refilling packets ret=%d nb_packets=%zd\n", ret, nb_packets); 544 553 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[ret], nb_packets - ret, nb_packets - ret); 545 554 } … … 607 616 t = &trace->hasher_thread; 608 617 assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid)); 618 if (trace->state == STATE_ERROR) { 619 thread_change_state(trace, t, THREAD_FINISHED, false); 620 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 621 pthread_exit(NULL); 622 } 623 609 624 printf("Hasher Thread started\n"); 610 625 if (trace->format->pregister_thread) { … … 1004 1019 1005 1020 /** 1006 * This case is much like the dedicated hasher, except that we will become1007 * hasher if we don't have a packet waiting.1008 *1009 * TODO: You can lose the tail of a trace if the final thread1010 * fills its own queue and therefore breaks early and doesn't empty the sliding window.1011 *1012 * TODO: Can block on zero copy formats such as ring: and dpdk: if the1013 * queue sizes in total are larger than the ring size.1014 *1015 * 1. We read a packet from our buffer1016 * 2. Move that into the packet provided (packet)1017 */1018 inline static int trace_pread_packet_sliding_window(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)1019 {1020 int ret, i, thread/*, psize*/;1021 1022 if (t->state == THREAD_FINISHING)1023 return trace_handle_finishing_perpkt(libtrace, packet, t);1024 1025 while (1) {1026 // Check if we have packets ready1027 if(try_waiting_queue(libtrace, t, packet, &ret))1028 return ret;1029 1030 // We limit the number of packets we get to the size of the sliding window1031 // such that it is impossible for any given thread to fail to store a packet1032 ASSERT_RET(sem_wait(&libtrace->sem), == 0);1033 /*~~~~Single threaded read of a packet~~~~*/1034 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);1035 1036 /* Re-check our queue things we might have data waiting */1037 if(try_waiting_queue(libtrace, t, packet, &ret)) {1038 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1039 ASSERT_RET(sem_post(&libtrace->sem), == 0);1040 return ret;1041 }1042 1043 // TODO put on *proper* condition variable1044 if (libtrace->perpkt_queue_full) {1045 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1046 ASSERT_RET(sem_post(&libtrace->sem), == 0);1047 contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;1048 continue;1049 }1050 1051 if (!*packet)1052 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);1053 assert(*packet);1054 1055 if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {1056 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1057 ASSERT_RET(sem_post(&libtrace->sem), == 0);1058 // Finish this thread ensuring that any data written later by another thread is retrieved also1059 if (libtrace_halt)1060 return 0;1061 else1062 return trace_finish_perpkt(libtrace, packet, t);1063 }1064 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);1065 1066 /* ~~~~Multiple threads can run the hasher~~~~ */1067 trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data));1068 1069 /* Yes this is correct opposite read lock for a write operation */1070 ASSERT_RET(pthread_rwlock_rdlock(&libtrace->window_lock), == 0);1071 if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))1072 assert(!"Semaphore should stop us from ever overfilling the sliding window");1073 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);1074 *packet = NULL;1075 1076 // Always try read any data from the sliding window1077 while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {1078 ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);1079 if (libtrace->perpkt_queue_full) {1080 // I might be the holdup in which case if I can read my queue I should do that and return1081 if(try_waiting_queue(libtrace, t, packet, &ret)) {1082 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);1083 return ret;1084 }1085 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);1086 continue;1087 }1088 // Read greedily as many as we can1089 while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) {1090 thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count;1091 if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) {1092 while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {1093 if (t->perpkt_num == thread)1094 {1095 // TODO think about this case more because we have to stop early if this were to happen on the last read1096 // before EOF/error we might not have emptied the sliding window1097 printf("!~!~!~!~!~!~In this Code~!~!~!~!\n");1098 // Its our queue we must have a packet to read out1099 if(try_waiting_queue(libtrace, t, packet, &ret)) {1100 // We must be able to write this now 100% without fail1101 libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);1102 ASSERT_RET(sem_post(&libtrace->sem), == 0);1103 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);1104 return ret;1105 } else {1106 assert(!"Our queue is full but I cannot read from it??");1107 }1108 }1109 // Not us we have to give the other threads a chance to write there packets then1110 libtrace->perpkt_queue_full = true;1111 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);1112 for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets1113 ASSERT_RET(sem_post(&libtrace->sem), == 0);1114 1115 contention_stats[t->perpkt_num].full_queue_hits++;1116 ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);1117 // Grab these back1118 for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets1119 ASSERT_RET(sem_wait(&libtrace->sem), == 0);1120 libtrace->perpkt_queue_full = false;1121 }1122 ASSERT_RET(sem_post(&libtrace->sem), == 0);1123 *packet = NULL;1124 } else {1125 // Cannot write to a queue if no ones waiting (I think this is unreachable)1126 // in the general case (unless the user ends early without proper clean up).1127 assert (!"unreachable code??");1128 }1129 }1130 ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);1131 }1132 // Now we go back to checking our queue anyways1133 }1134 }1135 1136 1137 /**1138 1021 * For the first packet of each queue we keep a copy and note the system 1139 1022 * time it was received at. … … 1229 1112 libtrace_thread_t *t = &trace->reporter_thread; 1230 1113 libtrace_vector_t results; 1114 1115 fprintf(stderr, "Reporter thread starting\n"); 1116 1117 /* Wait until all threads are started */ 1118 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 1119 if (trace->state == STATE_ERROR) { 1120 thread_change_state(trace, t, THREAD_FINISHED, false); 1121 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 1122 pthread_exit(NULL); 1123 } 1124 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 1125 1231 1126 libtrace_vector_init(&results, sizeof(libtrace_result_t)); 1232 fprintf(stderr, "Reporter thread starting\n");1233 1127 1234 1128 message.code = MESSAGE_STARTING; … … 1287 1181 uint64_t next_release; 1288 1182 fprintf(stderr, "keepalive thread is starting\n"); 1183 1184 /* Wait until all threads are started */ 1185 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 1186 if (trace->state == STATE_ERROR) { 1187 thread_change_state(trace, &trace->keepalive_thread, THREAD_FINISHED, false); 1188 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 1189 pthread_exit(NULL); 1190 } 1191 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 1289 1192 1290 1193 gettimeofday(&prev, NULL); … … 1458 1361 * assuming a packet is valid. 1459 1362 */ 1460 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets) 1363 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, 1364 libtrace_packet_t *packets[], size_t nb_packets) 1461 1365 { 1462 1366 size_t ret; … … 1493 1397 } 1494 1398 1495 /* Starts perpkt threads 1496 * @return threads_started 1497 */ 1498 static inline int trace_start_perpkt_threads (libtrace_t *libtrace) { 1499 int i; 1500 char name[16]; 1501 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1502 libtrace_thread_t *t = &libtrace->perpkt_threads[i]; 1503 ASSERT_RET(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace), == 0); 1504 snprintf(name, 16, "perpkt-%d", i); 1505 pthread_setname_np(t->tid, name); 1506 } 1507 return libtrace->perpkt_thread_count; 1508 } 1509 1510 /* Start an input trace in a parallel fashion, or restart a paused trace. 1511 * 1512 * NOTE: libtrace lock is held for the majority of this function 1513 * 1514 * @param libtrace the input trace to start 1515 * @param global_blob some global data you can share with the new perpkt threads 1516 * @returns 0 on success 1517 */ 1518 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter) 1519 { 1520 int i; 1521 char name[16]; 1522 sigset_t sig_before, sig_block_all; 1523 assert(libtrace); 1524 if (trace_is_err(libtrace)) { 1399 /* Restarts a parallel trace, this is called from trace_pstart. 1400 * The libtrace lock is held upon calling this function. 1401 * Typically with a parallel trace the threads are not 1402 * killed rather. 1403 */ 1404 static int trace_prestart(libtrace_t * libtrace, void *global_blob, 1405 fn_per_pkt per_pkt, fn_reporter reporter) { 1406 int err = 0; 1407 if (libtrace->state != STATE_PAUSED) { 1408 trace_set_err(libtrace, TRACE_ERR_BAD_STATE, 1409 "trace(%s) is not currently paused", 1410 libtrace->uridata); 1525 1411 return -1; 1526 1412 } 1527 1413 1528 // NOTE: Until the trace is started we wont have a libtrace_lock initialised 1529 if (libtrace->state != STATE_NEW) { 1530 int err = 0; 1531 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1532 if (libtrace->state != STATE_PAUSED) { 1533 trace_set_err(libtrace, TRACE_ERR_BAD_STATE, 1534 "The trace(%s) has already been started and is not paused!!", libtrace->uridata); 1535 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1536 return -1; 1537 } 1538 1539 // Update the per_pkt function, or reuse the old one 1540 if (per_pkt) 1541 libtrace->per_pkt = per_pkt; 1542 1543 if (reporter) 1544 libtrace->reporter = reporter; 1545 1546 assert(libtrace_parallel); 1547 assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]); 1548 assert(libtrace->per_pkt); 1549 1550 if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { 1551 fprintf(stderr, "Restarting trace pstart_input()\n"); 1552 err = libtrace->format->pstart_input(libtrace); 1553 } else { 1554 if (libtrace->format->start_input) { 1555 fprintf(stderr, "Restarting trace start_input()\n"); 1556 err = libtrace->format->start_input(libtrace); 1557 } 1558 } 1559 1560 if (err == 0) { 1561 libtrace->started = true; 1562 libtrace_change_state(libtrace, STATE_RUNNING, false); 1563 } 1564 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1565 return err; 1566 } 1567 1568 assert(libtrace->state == STATE_NEW); 1569 libtrace_parallel = 1; 1570 1571 // Store the user defined things against the trace 1572 libtrace->global_blob = global_blob; 1573 libtrace->per_pkt = per_pkt; 1574 libtrace->reporter = reporter; 1575 1576 ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0); 1577 ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0); 1578 ASSERT_RET(pthread_rwlock_init(&libtrace->window_lock, NULL), == 0); 1579 // Grab the lock 1580 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1581 1582 // Set default buffer sizes 1414 /* Update functions if requested */ 1415 if (per_pkt) 1416 libtrace->per_pkt = per_pkt; 1417 if (reporter) 1418 libtrace->reporter = reporter; 1419 if(global_blob) 1420 libtrace->global_blob = global_blob; 1421 1422 assert(libtrace_parallel); 1423 assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]); 1424 assert(libtrace->per_pkt); 1425 1426 if (libtrace->perpkt_thread_count > 1 && 1427 trace_supports_parallel(libtrace) && 1428 !trace_has_dedicated_hasher(libtrace)) { 1429 fprintf(stderr, "Restarting trace pstart_input()\n"); 1430 err = libtrace->format->pstart_input(libtrace); 1431 } else { 1432 if (libtrace->format->start_input) { 1433 fprintf(stderr, "Restarting trace start_input()\n"); 1434 err = libtrace->format->start_input(libtrace); 1435 } 1436 } 1437 1438 if (err == 0) { 1439 libtrace->started = true; 1440 libtrace_change_state(libtrace, STATE_RUNNING, false); 1441 } 1442 return err; 1443 } 1444 1445 /** 1446 * Verifies the configuration and sets default values for any values not 1447 * specified by the user. 1448 * @return 1449 */ 1450 static void verify_configuration(libtrace_t *libtrace) { 1451 1583 1452 if (libtrace->config.hasher_queue_size <= 0) 1584 1453 libtrace->config.hasher_queue_size = 1000; … … 1607 1476 fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n"); 1608 1477 1609 libtrace->started = true; // Before we start the threads otherwise we could have issues 1610 libtrace_change_state(libtrace, STATE_RUNNING, false); 1611 /* Disable signals - Pthread signal handling */ 1612 1478 if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL) 1479 libtrace->combiner = combiner_unordered; 1480 } 1481 1482 /** 1483 * Starts a libtrace_thread, including allocating memory for messaging. 1484 * Threads are expected to wait until the libtrace look is released. 1485 * Hence why we don't init structures until later. 1486 * 1487 * @param trace The trace the thread is associated with 1488 * @param t The thread that is filled when the thread is started 1489 * @param type The type of thread 1490 * @param start_routine The entry location of the thread 1491 * @param perpkt_num The perpkt thread number (should be set -1 if not perpkt) 1492 * @param name For debugging purposes set the threads name (Optional) 1493 * 1494 * @return 0 on success or -1 upon error in which case the libtrace error is set. 1495 * In this situation the thread structure is zeroed. 1496 */ 1497 static int trace_start_thread(libtrace_t *trace, 1498 libtrace_thread_t *t, 1499 enum thread_types type, 1500 void *(*start_routine) (void *), 1501 int perpkt_num, 1502 const char *name) { 1503 int ret; 1504 assert(t->type == THREAD_EMPTY); 1505 t->trace = trace; 1506 t->ret = NULL; 1507 t->user_data = NULL; 1508 t->type = type; 1509 t->state = THREAD_RUNNING; 1510 ret = pthread_create(&t->tid, NULL, start_routine, (void *) trace); 1511 if (ret != 0) { 1512 libtrace_zero_thread(t); 1513 trace_set_err(trace, ret, "Failed to create a thread"); 1514 return -1; 1515 } 1516 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t)); 1517 if (trace_has_dedicated_hasher(trace) && type == THREAD_PERPKT) { 1518 libtrace_ringbuffer_init(&t->rbuffer, 1519 trace->config.hasher_queue_size, 1520 trace->config.hasher_polling? 1521 LIBTRACE_RINGBUFFER_POLLING: 1522 LIBTRACE_RINGBUFFER_BLOCKING); 1523 } 1524 if(name) 1525 pthread_setname_np(t->tid, name); 1526 t->perpkt_num = perpkt_num; 1527 return 0; 1528 } 1529 1530 /* Start an input trace in the parallel libtrace framework. 1531 * This can also be used to restart an existing parallel. 1532 * 1533 * NOTE: libtrace lock is held for the majority of this function 1534 * 1535 * @param libtrace the input trace to start 1536 * @param global_blob some global data you can share with the new perpkt threads 1537 * @returns 0 on success, otherwise -1 to indicate an error has occured 1538 */ 1539 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 1540 fn_per_pkt per_pkt, fn_reporter reporter) { 1541 int i; 1542 int ret = -1; 1543 char name[16]; 1544 sigset_t sig_before, sig_block_all; 1545 assert(libtrace); 1546 1547 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1548 if (trace_is_err(libtrace)) { 1549 goto cleanup_none; 1550 } 1551 1552 if (libtrace->state == STATE_PAUSED) { 1553 ret = trace_prestart(libtrace, global_blob, per_pkt, reporter); 1554 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1555 return ret; 1556 } 1557 1558 if (libtrace->state != STATE_NEW) { 1559 trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "trace_pstart " 1560 "should be called on a NEW or PAUSED trace but " 1561 "instead was called from %s", 1562 get_trace_state_name(libtrace->state)); 1563 goto cleanup_none; 1564 } 1565 1566 /* Store the user defined things against the trace */ 1567 libtrace->global_blob = global_blob; 1568 libtrace->per_pkt = per_pkt; 1569 libtrace->reporter = reporter; 1570 /* And zero other fields */ 1571 for (i = 0; i < THREAD_STATE_MAX; ++i) { 1572 libtrace->perpkt_thread_states[i] = 0; 1573 } 1574 libtrace->first_packets.first = 0; 1575 libtrace->first_packets.count = 0; 1576 libtrace->first_packets.packets = NULL; 1577 libtrace->perpkt_threads = NULL; 1578 /* Set a global which says we are using a parallel trace. This is 1579 * for backwards compatability due to changes when destroying packets */ 1580 libtrace_parallel = 1; 1581 1582 verify_configuration(libtrace); 1583 1584 /* Try start the format */ 1585 if (libtrace->perpkt_thread_count > 1 && 1586 trace_supports_parallel(libtrace) && 1587 !trace_has_dedicated_hasher(libtrace)) { 1588 printf("This format has direct support for p's\n"); 1589 ret = libtrace->format->pstart_input(libtrace); 1590 } else { 1591 if (libtrace->format->start_input) { 1592 ret = libtrace->format->start_input(libtrace); 1593 } 1594 } 1595 1596 if (ret != 0) { 1597 goto cleanup_none; 1598 } 1599 1600 /* --- Start all the threads we need --- */ 1601 /* Disable signals because it is inherited by the threads we start */ 1613 1602 sigemptyset(&sig_block_all); 1614 1615 1603 ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0); 1616 1604 1617 // If we are using a hasher start it 1618 // If single threaded we don't need a hasher 1619 if (libtrace->perpkt_thread_count > 1 && libtrace->hasher && libtrace->hasher_type != HASHER_HARDWARE) { 1620 libtrace_thread_t *t = &libtrace->hasher_thread; 1621 t->trace = libtrace; 1622 t->ret = NULL; 1623 t->type = THREAD_HASHER; 1624 t->state = THREAD_RUNNING; 1625 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t)); 1626 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0); 1627 snprintf(name, sizeof(name), "hasher-thread"); 1628 pthread_setname_np(t->tid, name); 1605 /* If we need a hasher thread start it 1606 * Special Case: If single threaded we don't need a hasher 1607 */ 1608 if (libtrace->perpkt_thread_count > 1 && libtrace->hasher 1609 && libtrace->hasher_type != HASHER_HARDWARE) { 1610 ret = trace_start_thread(libtrace, &libtrace->hasher_thread, 1611 THREAD_HASHER, hasher_entry, -1, 1612 "hasher-thread"); 1613 if (ret != 0) { 1614 trace_set_err(libtrace, errno, "trace_pstart " 1615 "failed to start a hasher thread."); 1616 goto cleanup_started; 1617 } 1629 1618 } else { 1630 1619 libtrace->hasher_thread.type = THREAD_EMPTY; 1631 1620 } 1632 1621 1633 libtrace_ocache_init(&libtrace->packet_freelist, 1634 (void* (*)()) trace_create_packet, 1635 (void (*)(void *))trace_destroy_packet, 1636 libtrace->config.packet_thread_cache_size, 1637 libtrace->config.packet_cache_size * 4, 1638 libtrace->config.fixed_packet_count); 1639 // Unused slidingwindow code 1640 //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0); 1641 //ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0); 1642 1643 // This will be applied to every new thread that starts, i.e. they will block all signals 1644 // Lets start a fixed number of reading threads 1645 1646 /* Ready some storages */ 1647 libtrace->first_packets.first = 0; 1648 libtrace->first_packets.count = 0; 1622 /* Start up our perpkt threads */ 1623 libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), 1624 libtrace->perpkt_thread_count); 1625 if (!libtrace->perpkt_threads) { 1626 trace_set_err(libtrace, errno, "trace_pstart " 1627 "failed to allocate memory."); 1628 goto cleanup_threads; 1629 } 1630 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1631 snprintf(name, sizeof(name), "perpkt-%d", i); 1632 libtrace_zero_thread(&libtrace->perpkt_threads[i]); 1633 ret = trace_start_thread(libtrace, &libtrace->perpkt_threads[i], 1634 THREAD_PERPKT, perpkt_threads_entry, i, 1635 name); 1636 if (ret != 0) { 1637 trace_set_err(libtrace, errno, "trace_pstart " 1638 "failed to start a perpkt thread."); 1639 goto cleanup_threads; 1640 } 1641 } 1642 1643 /* Start the reporter thread */ 1644 if (reporter) { 1645 if (libtrace->combiner.initialise) 1646 libtrace->combiner.initialise(libtrace, &libtrace->combiner); 1647 ret = trace_start_thread(libtrace, &libtrace->reporter_thread, 1648 THREAD_REPORTER, reporter_entry, -1, 1649 "reporter_thread"); 1650 if (ret != 0) { 1651 trace_set_err(libtrace, errno, "trace_pstart " 1652 "failed to start reporter thread."); 1653 goto cleanup_threads; 1654 } 1655 } 1656 1657 /* Start the keepalive thread */ 1658 if (libtrace->config.tick_interval > 0) { 1659 ret = trace_start_thread(libtrace, &libtrace->keepalive_thread, 1660 THREAD_KEEPALIVE, keepalive_entry, -1, 1661 "keepalive_thread"); 1662 if (ret != 0) { 1663 trace_set_err(libtrace, errno, "trace_pstart " 1664 "failed to start keepalive thread."); 1665 goto cleanup_threads; 1666 } 1667 } 1668 1669 /* Init other data structures */ 1670 libtrace->perpkt_thread_states[THREAD_RUNNING] = libtrace->perpkt_thread_count; 1649 1671 ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0); 1650 libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct __packet_storage_magic_type)); 1651 1652 1653 /* Ready all of our perpkt threads - they are started later */ 1654 libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count); 1655 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1656 libtrace_thread_t *t = &libtrace->perpkt_threads[i]; 1657 t->trace = libtrace; 1658 t->ret = NULL; 1659 t->type = THREAD_PERPKT; 1660 t->state = THREAD_RUNNING; 1661 t->user_data = NULL; 1662 // t->tid DONE on create 1663 t->perpkt_num = i; 1664 if (libtrace->hasher) 1665 libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, 1666 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0); 1667 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t)); 1668 t->recorded_first = false; 1669 t->tracetime_offset_usec = 0;; 1670 } 1671 1672 int threads_started = 0; 1673 /* Setup the trace and start our threads */ 1674 if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { 1675 printf("This format has direct support for p's\n"); 1676 threads_started = libtrace->format->pstart_input(libtrace); 1672 libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, 1673 sizeof(struct __packet_storage_magic_type)); 1674 if (libtrace->first_packets.packets == NULL) { 1675 trace_set_err(libtrace, errno, "trace_pstart " 1676 "failed to allocate memory."); 1677 goto cleanup_threads; 1678 } 1679 1680 /*trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart " 1681 "failed to allocate ocache."); 1682 goto cleanup_threads;*/ 1683 1684 if (libtrace_ocache_init(&libtrace->packet_freelist, 1685 (void* (*)()) trace_create_packet, 1686 (void (*)(void *))trace_destroy_packet, 1687 libtrace->config.packet_thread_cache_size, 1688 libtrace->config.packet_cache_size * 4, 1689 libtrace->config.fixed_packet_count) != 0) { 1690 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart " 1691 "failed to allocate ocache."); 1692 goto cleanup_threads; 1693 } 1694 1695 /* Threads don't start */ 1696 libtrace->started = true; 1697 libtrace_change_state(libtrace, STATE_RUNNING, false); 1698 1699 ret = 0; 1700 goto success; 1701 cleanup_threads: 1702 if (libtrace->first_packets.packets) { 1703 free(libtrace->first_packets.packets); 1704 libtrace->first_packets.packets = NULL; 1705 } 1706 libtrace_change_state(libtrace, STATE_ERROR, false); 1707 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1708 if (libtrace->hasher_thread.type == THREAD_HASHER) { 1709 pthread_join(libtrace->hasher_thread.tid, NULL); 1710 libtrace_zero_thread(&libtrace->hasher_thread); 1711 } 1712 1713 if (libtrace->perpkt_threads) { 1714 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1715 if (libtrace->perpkt_threads[i].type == THREAD_PERPKT) { 1716 pthread_join(libtrace->perpkt_threads[i].tid, NULL); 1717 libtrace_zero_thread(&libtrace->perpkt_threads[i]); 1718 } else break; 1719 } 1720 free(libtrace->perpkt_threads); 1721 libtrace->perpkt_threads = NULL; 1722 } 1723 1724 if (libtrace->reporter_thread.type == THREAD_REPORTER) { 1725 pthread_join(libtrace->reporter_thread.tid, NULL); 1726 libtrace_zero_thread(&libtrace->reporter_thread); 1727 } 1728 1729 if (libtrace->keepalive_thread.type == THREAD_KEEPALIVE) { 1730 pthread_join(libtrace->keepalive_thread.tid, NULL); 1731 libtrace_zero_thread(&libtrace->keepalive_thread); 1732 } 1733 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1734 libtrace_change_state(libtrace, STATE_NEW, false); 1735 assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0); 1736 libtrace->perpkt_thread_states[THREAD_FINISHED] = 0; 1737 cleanup_started: 1738 if (trace_supports_parallel(libtrace) && 1739 !trace_has_dedicated_hasher(libtrace) 1740 && libtrace->perpkt_thread_count > 1) { 1741 if (libtrace->format->ppause_input) 1742 libtrace->format->ppause_input(libtrace); 1677 1743 } else { 1678 if (libtrace->format->start_input) { 1679 threads_started=libtrace->format->start_input(libtrace); 1680 } 1681 } 1682 if (threads_started == 0) 1683 threads_started = trace_start_perpkt_threads(libtrace); 1684 1685 // No combiner set, use a default to reduce the chance of this breaking 1686 if (libtrace->combiner.initialise == NULL && libtrace->combiner.publish == NULL) 1687 libtrace->combiner = combiner_unordered; 1688 1689 if (libtrace->combiner.initialise) 1690 libtrace->combiner.initialise(libtrace, &libtrace->combiner); 1691 1692 libtrace->reporter_thread.type = THREAD_REPORTER; 1693 libtrace->reporter_thread.state = THREAD_RUNNING; 1694 libtrace_message_queue_init(&libtrace->reporter_thread.messages, sizeof(libtrace_message_t)); 1695 if (reporter) { 1696 // Got a real reporter 1697 ASSERT_RET(pthread_create(&libtrace->reporter_thread.tid, NULL, reporter_entry, (void *) libtrace), == 0); 1698 } else { 1699 // Main thread is reporter 1700 libtrace->reporter_thread.tid = pthread_self(); 1701 } 1702 1703 if (libtrace->config.tick_interval > 0) { 1704 libtrace->keepalive_thread.type = THREAD_KEEPALIVE; 1705 libtrace->keepalive_thread.state = THREAD_RUNNING; 1706 libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t)); 1707 ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0); 1708 } 1709 1710 for (i = 0; i < THREAD_STATE_MAX; ++i) { 1711 libtrace->perpkt_thread_states[i] = 0; 1712 } 1713 libtrace->perpkt_thread_states[THREAD_RUNNING] = threads_started; 1714 1715 // Revert back - Allow signals again 1744 if (libtrace->format->pause_input) 1745 libtrace->format->pause_input(libtrace); 1746 } 1747 ret = -1; 1748 success: 1716 1749 ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0); 1750 cleanup_none: 1717 1751 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1718 1719 if (threads_started < 0) 1720 // Error 1721 return threads_started; 1722 1723 // TODO fix these leaks etc 1724 if (libtrace->perpkt_thread_count != threads_started) 1725 fprintf(stderr, "Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count); 1726 1727 1728 return 0; 1752 return ret; 1729 1753 } 1730 1754 -
tools/traceanon/ipenc.c
ra8f2692 r925ae60 19 19 #endif 20 20 21 static enum enc_type_t enc_type = ENC_NONE;21 static __thread enum enc_type_t enc_type = ENC_NONE; 22 22 23 23 static uint32_t masks[33] = { … … 31 31 }; 32 32 33 static uint32_t prefix;34 static uint32_t netmask;33 static __thread uint32_t prefix; 34 static __thread uint32_t netmask; 35 35 static void init_prefix(const char *key) 36 36 { -
tools/traceanon/rijndael.c
rc0a5a50 r925ae60 50 50 #include <string.h> 51 51 52 static State m_state; 53 static Mode m_mode; 54 static Direction m_direction; 55 static UINT8 m_initVector[MAX_IV_SIZE]; 56 static UINT32 m_uRounds; 57 static UINT8 m_expandedKey[_MAX_ROUNDS+1][4][4]; 58 52 static __thread State m_state; 53 static __thread Mode m_mode; 54 static __thread Direction m_direction; 55 static __thread UINT8 m_initVector[MAX_IV_SIZE]; 56 static __thread UINT32 m_uRounds; 57 static __thread UINT8 m_expandedKey[_MAX_ROUNDS+1][4][4]; 59 58 60 59 static UINT8 S[256]=
Note: See TracChangeset
for help on using the changeset viewer.