- Timestamp:
- 04/26/14 23:23:17 (8 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 5ce14a5
- Parents:
- 60e8e86
- Location:
- lib
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_dpdk.c
r29bbef0 r17a3dff 827 827 return 0; 828 828 } 829 int mapper_start(void *data); // This actually a void*830 829 831 830 /* Attach memory to the port and start the port or restart the ports. … … 967 966 // Can use remote launch for all 968 967 /*RTE_LCORE_FOREACH_SLAVE(i) { 969 rte_eal_remote_launch( mapper_start, (void *)libtrace, i);968 rte_eal_remote_launch(perpkt_threads_entry, (void *)libtrace, i); 970 969 }*/ 971 970 … … 996 995 char err[500]; 997 996 int enabled_lcore_count = 0, i=0; 998 int tot = libtrace-> mapper_thread_count;997 int tot = libtrace->perpkt_thread_count; 999 998 err[0] = 0; 1000 999 1001 libtrace-> mapper_thread_count;1000 libtrace->perpkt_thread_count; 1002 1001 1003 1002 for (i = 0; i < RTE_MAX_LCORE; i++) … … 1007 1006 } 1008 1007 1009 tot = MIN(libtrace-> mapper_thread_count, enabled_lcore_count);1008 tot = MIN(libtrace->perpkt_thread_count, enabled_lcore_count); 1010 1009 tot = MIN(tot, 8); 1011 printf("Running pstart DPDK %d %d %d %d\n", tot, libtrace-> mapper_thread_count, enabled_lcore_count, rte_lcore_count());1010 printf("Running pstart DPDK %d %d %d %d\n", tot, libtrace->perpkt_thread_count, enabled_lcore_count, rte_lcore_count()); 1012 1011 1013 1012 if (dpdk_start_port_queues(libtrace, FORMAT(libtrace), err, sizeof(err), tot) != 0) { -
lib/format_linux.c
r29bbef0 r17a3dff 649 649 { 650 650 int i; 651 int tot = libtrace-> mapper_thread_count;651 int tot = libtrace->perpkt_thread_count; 652 652 printf("CAlling native pause packet\n"); 653 653 … … 690 690 static int linuxnative_pstart_input(libtrace_t *libtrace) { 691 691 int i = 0; 692 int tot = libtrace-> mapper_thread_count;692 int tot = libtrace->perpkt_thread_count; 693 693 int iserror = 0; 694 694 // We store this here otherwise it will be leaked if the memory doesn't know … … 701 701 // Whats going on this might not work 100% 702 702 // We assume all sockets have been closed ;) 703 printf("Pause and then start called again lets hope that mapper_thread_count hasn't changed\n");703 printf("Pause and then start called again lets hope that perpkt_thread_count hasn't changed\n"); 704 704 } 705 705 -
lib/libtrace.h.in
rabda273 r17a3dff 3130 3130 DLLEXPORT void trace_destroy_result(libtrace_result_t ** result); 3131 3131 3132 // Ways to access Global and TLS 3132 // Ways to access Global and TLS storage that we provide the user 3133 3133 DLLEXPORT void * trace_get_global(libtrace_t *trace); 3134 3134 DLLEXPORT void * trace_set_global(libtrace_t *trace, void * data); … … 3151 3151 DLLEXPORT void trace_packet_set_hash(libtrace_packet_t * packet, uint64_t hash); 3152 3152 3153 DLLEXPORT int retrive_first_packet(libtrace_t *libtrace, libtrace_packet_t **packet, struct timeval **tv); 3153 3154 3154 3155 typedef enum { … … 3175 3176 * A unblockable warning message will be printed to stderr in this case. 3176 3177 */ 3177 TRACE_OPTION_SET_ MAPPER_BUFFER_SIZE,3178 TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, 3178 3179 3179 3180 /** 3180 * Libtrace set mapperthread count3181 * Libtrace set perpkt thread count 3181 3182 */ 3182 TRACE_OPTION_SET_ MAPPER_THREAD_COUNT,3183 TRACE_OPTION_SET_PERPKT_THREAD_COUNT, 3183 3184 3184 3185 /** … … 3220 3221 MESSAGE_STOPPED, 3221 3222 MESSAGE_FIRST_PACKET, 3222 MESSAGE_ MAPPER_ENDED,3223 MESSAGE_ MAPPER_RESUMED,3224 MESSAGE_ MAPPER_PAUSED,3225 MESSAGE_ MAPPER_EOF,3223 MESSAGE_PERPKT_ENDED, 3224 MESSAGE_PERPKT_RESUMED, 3225 MESSAGE_PERPKT_PAUSED, 3226 MESSAGE_PERPKT_EOF, 3226 3227 MESSAGE_POST_REDUCE, 3227 3228 MESSAGE_POST_RANGE, … … 3229 3230 }; 3230 3231 3232 enum hasher_types { 3233 HASHER_BALANCE, /** Balance load across CPUs best as possible this is basically to say don't care about hash, but this might still might be implemented using a hash or round robin etc.. */ 3234 HASHER_BIDIRECTIONAL, /** Use a hash which is uni-directional for TCP flows (IP src dest,TCP port src dest), non TCP 3235 Will be sent to the same place, with the exception of UDP which may or may not be sent to separate cores */ 3236 HASHER_UNIDIRECTIONAL, /** Use a hash which is uni-directional across TCP flow */ 3237 HASHER_CUSTOM, /** Always use the user supplied hasher */ 3238 HASHER_HARDWARE, /** Set by the format if the hashing is going to be done in hardware */ 3239 }; 3240 3231 3241 DLLEXPORT int trace_parallel_config(libtrace_t *libtrace, trace_parallel_option_t option, void *value); 3232 3242 DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet); 3233 3243 DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet); 3244 enum hasher_types; 3245 DLLEXPORT int trace_set_hasher(libtrace_t *trace, enum hasher_types type, fn_hasher hasher, void *data); 3234 3246 3235 3247 #ifdef __cplusplus -
lib/libtrace_int.h
rd6a56b6 r17a3dff 175 175 THREAD_EMPTY, 176 176 THREAD_HASHER, 177 THREAD_ MAPPER,177 THREAD_PERPKT, 178 178 THREAD_REDUCER 179 179 }; … … 184 184 THREAD_FINISHED, 185 185 THREAD_PAUSED 186 };187 188 enum hasher_types {189 HASHER_BALANCE, /** Balance load across CPUs best as possible this is basically to say don't care about hash, but this might still might be implemented using a hash or round robin etc.. */190 HASHER_BIDIRECTIONAL, /** Use a hash which is uni-directional for TCP flows (IP src dest,TCP port src dest), non TCP191 Will be sent to the same place, with the exception of UDP which may or may not be sent to separate cores */192 HASHER_UNIDIRECTIONAL, /** Use a hash which is uni-directional across TCP flow */193 HASHER_CUSTOM, /** Always use the user supplied hasher */194 HASHER_HARDWARE, /** Set by the format if the hashing is going to be done in hardware */195 186 }; 196 187 … … 206 197 #define REDUCE_STEPPING 0x10 207 198 208 #define MAPPER_USE_SLIDING_WINDOW 0x20199 #define PERPKT_USE_SLIDING_WINDOW 0x20 209 200 210 201 /** … … 218 209 void* user_data; // TLS for the user to use 219 210 pthread_t tid; 220 int map_num; // A number from 0-X that represents this mappernumber211 int perpkt_num; // A number from 0-X that represents this perpkt threads number 221 212 // in the table, intended to quickly identify this thread 222 // -1 represents NA (such as in the case this is not a mapperthread)213 // -1 represents NA (such as the case this is not a perpkt thread) 223 214 libtrace_ringbuffer_t rbuffer; // Input 224 215 libtrace_vector_t vector; // Output … … 243 234 struct first_packets { 244 235 pthread_spinlock_t lock; 245 size_t count; // If == mappers we have all236 size_t count; // If == perpkt_thread_count threads we have all 246 237 size_t first; // Valid if count != 0 247 238 struct __packet_storage_magic_type { … … 285 276 /** Use to control pausing threads and finishing threads etc always used with libtrace_lock */ 286 277 pthread_cond_t perpkt_cond; 287 /** Set to the number of mapperthreads that are finishing (or have finished), or to -1 once all have been joined, 0 implies all are running */288 int mappers_finishing;289 /** A count of mappers that are pausing */290 int perpkt _pausing;278 /** Set to the number of perpkt threads that are finishing (or have finished), or to -1 once all have been joined, 0 implies all are running */ 279 int perpkts_finishing; 280 /** A count of perpkt threads that are pausing */ 281 int perpkts_pausing; 291 282 292 283 /** For the sliding window hasher implementation */ … … 294 285 /** Set once trace_join has been called */ 295 286 bool joined; 296 /** Set to indicate a mappers queue is full and such the writing mappercannot proceed */297 bool mapper_queue_full;287 /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */ 288 bool perpkt_queue_full; 298 289 /** Global storage for this trace, shared among all the threads */ 299 290 void* global_blob; … … 302 293 /** The actual freelist */ 303 294 libtrace_ringbuffer_t packet_freelist; 304 /** The number of packets that can queue per mapperthread - XXX consider deadlocks with non malloc()'d packets that need to be released */305 int mapper_buffer_size;295 /** The number of packets that can queue per thread - XXX consider deadlocks with non malloc()'d packets that need to be released */ 296 int perpkt_buffer_size; 306 297 /** The reducer flags */ 307 298 int reducer_flags; … … 320 311 libtrace_thread_t hasher_thread; 321 312 libtrace_thread_t reducer_thread; 322 int mapper_thread_count;323 libtrace_thread_t * mapper_threads; // All our mapperthreads313 int perpkt_thread_count; 314 libtrace_thread_t * perpkt_threads; // All our perpkt threads 324 315 libtrace_slidingwindow_t sliding_window; 325 316 sem_t sem; … … 331 322 inline void libtrace_zero_thread(libtrace_thread_t * t); 332 323 inline void store_first_packet(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t); 324 libtrace_thread_t * get_thread_table(libtrace_t *libtrace); 325 int get_thread_table_num(libtrace_t *libtrace); 326 333 327 334 328 /** A libtrace output trace -
lib/trace.c
r60e8e86 r17a3dff 111 111 112 112 /* Set once pstart is called used for backwards compatibility reasons */ 113 externint libtrace_parallel = 0;113 int libtrace_parallel = 0; 114 114 115 115 /* strncpy is not assured to copy the final \0, so we … … 263 263 // libtrace->libtrace_lock 264 264 // libtrace->perpkt_cond; 265 libtrace->perpkt _pausing = 0;266 libtrace-> mapper_queue_full = false;267 libtrace-> mappers_finishing = -1;265 libtrace->perpkts_pausing = 0; 266 libtrace->perpkt_queue_full = false; 267 libtrace->perpkts_finishing = -1; 268 268 libtrace->reducer_flags = 0; 269 269 libtrace->joined = false; … … 273 273 libtrace->hasher = NULL; 274 274 libtrace->packet_freelist_size = 0; 275 libtrace-> mapper_buffer_size = 0;275 libtrace->perpkt_buffer_size = 0; 276 276 libtrace->expected_key = 0; 277 277 libtrace_zero_ringbuffer(&libtrace->packet_freelist); … … 280 280 libtrace_zero_slidingwindow(&libtrace->sliding_window); 281 281 libtrace->reducer_thread.type = THREAD_EMPTY; 282 libtrace-> mapper_thread_count = 0;283 libtrace-> mapper_threads = NULL;282 libtrace->perpkt_thread_count = 0; 283 libtrace->perpkt_threads = NULL; 284 284 285 285 /* Parse the URI to determine what sort of trace we are dealing with */ … … 381 381 // libtrace->libtrace_lock 382 382 // libtrace->perpkt_cond; 383 libtrace->perpkt _pausing = 0;384 libtrace-> mapper_queue_full = false;385 libtrace-> mappers_finishing = -1;383 libtrace->perpkts_pausing = 0; 384 libtrace->perpkt_queue_full = false; 385 libtrace->perpkts_finishing = -1; 386 386 libtrace->reducer_flags = 0; 387 387 libtrace->joined = false; … … 392 392 libtrace->expected_key = 0; 393 393 libtrace->packet_freelist_size = 0; 394 libtrace-> mapper_buffer_size = 0;394 libtrace->perpkt_buffer_size = 0; 395 395 libtrace_zero_ringbuffer(&libtrace->packet_freelist); 396 396 libtrace_zero_thread(&libtrace->hasher_thread); … … 398 398 libtrace_zero_slidingwindow(&libtrace->sliding_window); 399 399 libtrace->reducer_thread.type = THREAD_EMPTY; 400 libtrace-> mapper_thread_count = 0;401 libtrace-> mapper_threads = NULL;400 libtrace->perpkt_thread_count = 0; 401 libtrace->perpkt_threads = NULL; 402 402 403 403 for(tmp=formats_list;tmp;tmp=tmp->next) { … … 655 655 libtrace_ringbuffer_destroy(&libtrace->packet_freelist); 656 656 657 for (i = 0; i < libtrace-> mapper_thread_count; ++i) {658 assert (libtrace_vector_get_size(&libtrace->mapper_threads[i].vector) == 0);659 libtrace_vector_destroy(&libtrace->mapper_threads[i].vector);660 } 661 free(libtrace-> mapper_threads);662 libtrace-> mapper_threads = NULL;663 libtrace-> mapper_thread_count = 0;657 for (i = 0; i < libtrace->perpkt_thread_count; ++i) { 658 assert (libtrace_vector_get_size(&libtrace->perpkt_threads[i].vector) == 0); 659 libtrace_vector_destroy(&libtrace->perpkt_threads[i].vector); 660 } 661 free(libtrace->perpkt_threads); 662 libtrace->perpkt_threads = NULL; 663 libtrace->perpkt_thread_count = 0; 664 664 665 665 if (libtrace->event.packet) { -
lib/trace_parallel.c
r60e8e86 r17a3dff 125 125 int i; 126 126 struct multithreading_stats totals = {0}; 127 for (i = 0; i < libtrace-> mapper_thread_count ; i++) {128 printf("\nStats for mapperthread#%d\n", i);127 for (i = 0; i < libtrace->perpkt_thread_count ; i++) { 128 printf("\nStats for perpkt thread#%d\n", i); 129 129 printf("\tfull_queue_hits: %"PRIu64"\n", contention_stats[i].full_queue_hits); 130 130 totals.full_queue_hits += contention_stats[i].full_queue_hits; … … 132 132 totals.wait_for_fill_complete_hits += contention_stats[i].wait_for_fill_complete_hits; 133 133 } 134 printf("\nTotals for mapperthreads\n");134 printf("\nTotals for perpkt threads\n"); 135 135 printf("\tfull_queue_hits: %"PRIu64"\n", totals.full_queue_hits); 136 136 printf("\twait_for_fill_complete_hits: %"PRIu64"\n", totals.wait_for_fill_complete_hits); … … 147 147 libtrace_zero_deque(&t->deque); 148 148 t->recorded_first = false; 149 t-> map_num = -1;149 t->perpkt_num = -1; 150 150 } 151 151 … … 156 156 pthread_t tid = pthread_self(); 157 157 158 for (;i<libtrace-> mapper_thread_count ;++i) {159 if (pthread_equal(tid, libtrace-> mapper_threads[i].tid))160 return &libtrace-> mapper_threads[i];158 for (;i<libtrace->perpkt_thread_count ;++i) { 159 if (pthread_equal(tid, libtrace->perpkt_threads[i].tid)) 160 return &libtrace->perpkt_threads[i]; 161 161 } 162 162 return NULL; 163 163 } 164 164 165 int get_thread_table_num(libtrace_t *libtrace); 166 DLLEXPORT int get_thread_table_num(libtrace_t *libtrace) { 165 int get_thread_table_num(libtrace_t *libtrace) { 167 166 int i = 0; 168 167 pthread_t tid = pthread_self(); 169 for (;i<libtrace-> mapper_thread_count; ++i) {170 if (pthread_equal(tid, libtrace-> mapper_threads[i].tid))168 for (;i<libtrace->perpkt_thread_count; ++i) { 169 if (pthread_equal(tid, libtrace->perpkt_threads[i].tid)) 171 170 return i; 172 171 } … … 196 195 printf("Pausing thread #%d\n", get_thread_table_num(trace)); 197 196 assert(pthread_mutex_lock(&trace->libtrace_lock) == 0); 198 trace->perpkt _pausing++;197 trace->perpkts_pausing++; 199 198 pthread_cond_broadcast(&trace->perpkt_cond); 200 199 while (!trace->started) { 201 200 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0); 202 201 } 203 trace->perpkt _pausing--;202 trace->perpkts_pausing--; 204 203 pthread_cond_broadcast(&trace->perpkt_cond); 205 204 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); … … 207 206 } 208 207 209 void* mapper_start(void *data) { 208 /** 209 * The is the entry point for our packet processing threads. 210 */ 211 static void* perpkt_threads_entry(void *data) { 210 212 libtrace_t *trace = (libtrace_t *)data; 211 213 libtrace_thread_t * t; … … 216 218 t = get_thread_table(trace); 217 219 assert(t); 218 //printf("Yay Started Mapperthread #%d\n", (int) get_thread_table_num(trace));220 //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace)); 219 221 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); 220 222 … … 245 247 } 246 248 247 if (trace-> mapper_thread_count == 1) {249 if (trace->perpkt_thread_count == 1) { 248 250 if (!packet) { 249 251 if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet)) … … 286 288 287 289 // Notify only after we've defiantly set the state to finished 288 message.code = MESSAGE_ MAPPER_ENDED;290 message.code = MESSAGE_PERPKT_ENDED; 289 291 message.additional = NULL; 290 292 trace_send_message_to_reducer(trace, &message); … … 335 337 /* We are guaranteed to have a hash function i.e. != NULL */ 336 338 trace_packet_set_hash(packet, (*trace->hasher)(packet, trace->hasher_data)); 337 thread = trace_packet_get_hash(packet) % trace-> mapper_thread_count;339 thread = trace_packet_get_hash(packet) % trace->perpkt_thread_count; 338 340 /* Blocking write to the correct queue - I'm the only writer */ 339 if (trace-> mapper_threads[thread].state != THREAD_FINISHED) {340 libtrace_ringbuffer_write(&trace-> mapper_threads[thread].rbuffer, packet);341 if (trace->perpkt_threads[thread].state != THREAD_FINISHED) { 342 libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet); 341 343 pkt_skipped = 0; 342 344 } else { … … 346 348 347 349 /* Broadcast our last failed read to all threads */ 348 for (i = 0; i < trace-> mapper_thread_count; i++) {350 for (i = 0; i < trace->perpkt_thread_count; i++) { 349 351 libtrace_packet_t * bcast; 350 352 printf("Broadcasting error/EOF now the trace is over\n"); 351 if (i == trace-> mapper_thread_count - 1) {353 if (i == trace->perpkt_thread_count - 1) { 352 354 bcast = packet; 353 355 } else { … … 356 358 } 357 359 assert(pthread_mutex_lock(&trace->libtrace_lock) == 0); 358 if (trace-> mapper_threads[i].state != THREAD_FINISHED) {360 if (trace->perpkt_threads[i].state != THREAD_FINISHED) { 359 361 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); 360 362 // Unlock early otherwise we could deadlock 361 libtrace_ringbuffer_write(&trace-> mapper_threads[i].rbuffer, NULL);363 libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, NULL); 362 364 } else { 363 365 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); … … 371 373 // Notify only after we've defiantly set the state to finished 372 374 libtrace_message_t message; 373 message.code = MESSAGE_ MAPPER_ENDED;375 message.code = MESSAGE_PERPKT_ENDED; 374 376 message.additional = NULL; 375 377 trace_send_message_to_reducer(trace, &message); … … 427 429 { 428 430 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ? 429 libtrace_thread_t* t = &libtrace-> mapper_threads[this_thread];431 libtrace_thread_t* t = &libtrace->perpkt_threads[this_thread]; 430 432 431 433 if (*packet) // Recycle the old get the new … … 468 470 * before returning EOF/error. 469 471 */ 470 inline static int trace_handle_finishing_ mapper(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)472 inline static int trace_handle_finishing_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t) 471 473 { 472 474 /* We are waiting for the condition that another thread ends to check … … 480 482 481 483 // Check before 482 if (libtrace-> mappers_finishing == libtrace->mapper_thread_count) {484 if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) { 483 485 complete = true; 484 486 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); … … 489 491 490 492 // Check after 491 if (libtrace-> mappers_finishing == libtrace->mapper_thread_count) {493 if (libtrace->perpkts_finishing == libtrace->perpkt_thread_count) { 492 494 complete = true; 493 495 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); … … 512 514 * Expects the libtrace_lock to not be held 513 515 */ 514 inline static int trace_finish_ mapper(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t)516 inline static int trace_finish_perpkt(libtrace_t *libtrace, libtrace_packet_t **packet, libtrace_thread_t * t) 515 517 { 516 518 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 517 519 t->state = THREAD_FINISHING; 518 libtrace-> mappers_finishing++;520 libtrace->perpkts_finishing++; 519 521 pthread_cond_broadcast(&libtrace->perpkt_cond); 520 522 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 521 return trace_handle_finishing_ mapper(libtrace, packet, t);523 return trace_handle_finishing_perpkt(libtrace, packet, t); 522 524 } 523 525 … … 537 539 { 538 540 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ? 539 libtrace_thread_t * t = &libtrace-> mapper_threads[this_thread];540 int thread, ret , psize;541 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread]; 542 int thread, ret/*, psize*/; 541 543 542 544 while (1) { … … 554 556 555 557 // Another thread cannot write a packet because a queue has filled up. Is it ours? 556 if (libtrace-> mapper_queue_full) {558 if (libtrace->perpkt_queue_full) { 557 559 contention_stats[this_thread].wait_for_fill_complete_hits++; 558 560 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); … … 576 578 577 579 trace_packet_set_hash(*packet, (*libtrace->hasher)(*packet, libtrace->hasher_data)); 578 thread = trace_packet_get_hash(*packet) % libtrace-> mapper_thread_count;580 thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count; 579 581 if (thread == this_thread) { 580 582 // If it's this thread we must be in order because we checked the buffer once we got the lock … … 583 585 } 584 586 585 if (libtrace-> mapper_threads[thread].state != THREAD_FINISHED) {586 while (!libtrace_ringbuffer_try_swrite_bl(&libtrace-> mapper_threads[thread].rbuffer, *packet)) {587 libtrace-> mapper_queue_full = true;587 if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) { 588 while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) { 589 libtrace->perpkt_queue_full = true; 588 590 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 589 591 contention_stats[this_thread].full_queue_hits++; … … 591 593 } 592 594 *packet = NULL; 593 libtrace-> mapper_queue_full = false;595 libtrace->perpkt_queue_full = false; 594 596 } else { 595 597 /* We can get here if the user closes the thread before natural completion/or error */ … … 616 618 { 617 619 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ? 618 libtrace_thread_t * t = &libtrace-> mapper_threads[this_thread];619 int ret, i, thread , psize;620 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread]; 621 int ret, i, thread/*, psize*/; 620 622 621 623 if (t->state == THREAD_FINISHING) 622 return trace_handle_finishing_ mapper(libtrace, packet, t);624 return trace_handle_finishing_perpkt(libtrace, packet, t); 623 625 624 626 while (1) { … … 641 643 642 644 // TODO put on *proper* condition variable 643 if (libtrace-> mapper_queue_full) {645 if (libtrace->perpkt_queue_full) { 644 646 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 645 647 assert(sem_post(&libtrace->sem) == 0); … … 661 663 return 0; 662 664 else 663 return trace_finish_ mapper(libtrace, packet, t);665 return trace_finish_perpkt(libtrace, packet, t); 664 666 } 665 667 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); … … 678 680 while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) { 679 681 assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0); 680 if (libtrace-> mapper_queue_full) {682 if (libtrace->perpkt_queue_full) { 681 683 // I might be the holdup in which case if I can read my queue I should do that and return 682 684 if(try_waiting_queue(libtrace, t, packet, &ret)) { … … 689 691 // Read greedily as many as we can 690 692 while (libtrace_slidingwindow_try_read(&libtrace->sliding_window, (void **) packet, NULL)) { 691 thread = trace_packet_get_hash(*packet) % libtrace-> mapper_thread_count;692 if (libtrace-> mapper_threads[thread].state != THREAD_FINISHED) {693 while (!libtrace_ringbuffer_try_swrite_bl(&libtrace-> mapper_threads[thread].rbuffer, *packet)) {693 thread = trace_packet_get_hash(*packet) % libtrace->perpkt_thread_count; 694 if (libtrace->perpkt_threads[thread].state != THREAD_FINISHED) { 695 while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) { 694 696 if (this_thread == thread) 695 697 { … … 700 702 if(try_waiting_queue(libtrace, t, packet, &ret)) { 701 703 // We must be able to write this now 100% without fail 702 libtrace_ringbuffer_write(&libtrace-> mapper_threads[thread].rbuffer, *packet);704 libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet); 703 705 assert(sem_post(&libtrace->sem) == 0); 704 706 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0); … … 709 711 } 710 712 // Not us we have to give the other threads a chance to write there packets then 711 libtrace-> mapper_queue_full = true;713 libtrace->perpkt_queue_full = true; 712 714 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0); 713 for (i = 0; i < libtrace-> mapper_thread_count-1; i++) // Release all other threads to read there packets715 for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets 714 716 assert(sem_post(&libtrace->sem) == 0); 715 717 … … 717 719 assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0); 718 720 // Grab these back 719 for (i = 0; i < libtrace-> mapper_thread_count-1; i++) // Release all other threads to read there packets721 for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets 720 722 assert(sem_wait(&libtrace->sem) == 0); 721 libtrace-> mapper_queue_full = false;723 libtrace->perpkt_queue_full = false; 722 724 } 723 725 assert(sem_post(&libtrace->sem) == 0); … … 753 755 dup = trace_copy_packet(packet); 754 756 assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0); 755 libtrace->first_packets.packets[t-> map_num].packet = dup;757 libtrace->first_packets.packets[t->perpkt_num].packet = dup; 756 758 //printf("Stored first packet time=%f\n", trace_get_seconds(dup)); 757 memcpy(&libtrace->first_packets.packets[t-> map_num].tv, &tv, sizeof(tv));759 memcpy(&libtrace->first_packets.packets[t->perpkt_num].tv, &tv, sizeof(tv)); 758 760 // Now update the first 759 761 libtrace->first_packets.count++; 760 762 if (libtrace->first_packets.count == 1) { 761 763 // We the first entry hence also the first known packet 762 libtrace->first_packets.first = t-> map_num;764 libtrace->first_packets.first = t->perpkt_num; 763 765 } else { 764 766 // Check if we are newer than the previous 'first' packet … … 766 768 if (trace_get_seconds(dup) < 767 769 trace_get_seconds(libtrace->first_packets.packets[first].packet)) 768 libtrace->first_packets.first = t-> map_num;770 libtrace->first_packets.first = t->perpkt_num; 769 771 } 770 772 assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0); … … 778 780 779 781 /** 780 * Returns 1 if it s certain that the first packet is truly the first packet782 * Returns 1 if it's certain that the first packet is truly the first packet 781 783 * rather than a best guess based upon threads that have published so far. 782 784 * Otherwise 0 is returned. … … 791 793 *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet; 792 794 *tv = &libtrace->first_packets.packets[libtrace->first_packets.first].tv; 793 if (libtrace->first_packets.count == libtrace-> mapper_thread_count) {795 if (libtrace->first_packets.count == libtrace->perpkt_thread_count) { 794 796 ret = 1; 795 797 } else { … … 944 946 } else if (trace_has_dedicated_hasher(libtrace)) { 945 947 ret = trace_pread_packet_hasher_thread(libtrace, packet); 946 } else if (libtrace->reducer_flags & MAPPER_USE_SLIDING_WINDOW) {948 } else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) { 947 949 ret = trace_pread_packet_sliding_window(libtrace, packet); 948 950 } else { … … 967 969 int i; 968 970 969 for (i = 0; i < libtrace-> mapper_thread_count; i++) {970 libtrace_thread_t *t = &libtrace-> mapper_threads[i];971 assert(pthread_create(&t->tid, NULL, mapper_start, (void *) libtrace) == 0);972 } 973 return libtrace-> mapper_thread_count;971 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 972 libtrace_thread_t *t = &libtrace->perpkt_threads[i]; 973 assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0); 974 } 975 return libtrace->perpkt_thread_count; 974 976 } 975 977 … … 988 990 if (trace_is_err(libtrace)) 989 991 return -1;; 990 if (libtrace->perpkt _pausing != 0) {992 if (libtrace->perpkts_pausing != 0) { 991 993 printf("Restarting trace\n"); 992 994 libtrace->format->pstart_input(libtrace); … … 1005 1007 libtrace->per_pkt = per_pkt; 1006 1008 libtrace->reducer = reducer; 1007 libtrace-> mappers_finishing = 0;1009 libtrace->perpkts_finishing = 0; 1008 1010 // libtrace->hasher = &rand_hash; /* Hasher now set via option */ 1009 1011 … … 1014 1016 1015 1017 // Set default buffer sizes 1016 if (libtrace-> mapper_buffer_size <= 0)1017 libtrace-> mapper_buffer_size = 1000;1018 1019 if (libtrace-> mapper_thread_count <= 0)1020 libtrace-> mapper_thread_count = 2; // XXX scale to system1018 if (libtrace->perpkt_buffer_size <= 0) 1019 libtrace->perpkt_buffer_size = 1000; 1020 1021 if (libtrace->perpkt_thread_count <= 0) 1022 libtrace->perpkt_thread_count = 2; // XXX scale to system 1021 1023 1022 1024 if(libtrace->packet_freelist_size <= 0) 1023 libtrace->packet_freelist_size = (libtrace-> mapper_buffer_size + 1) * libtrace->mapper_thread_count;1025 libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count; 1024 1026 1025 1027 if(libtrace->packet_freelist_size < 1026 (libtrace-> mapper_buffer_size + 1) * libtrace->mapper_thread_count)1028 (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count) 1027 1029 fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n"); 1028 1030 … … 1062 1064 libtrace->first_packets.count = 0; 1063 1065 assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0); 1064 libtrace->first_packets.packets = calloc(libtrace-> mapper_thread_count, sizeof(struct __packet_storage_magic_type));1065 1066 1067 /* Start all of our mapperthreads */1068 libtrace-> mapper_threads = calloc(sizeof(libtrace_thread_t), libtrace->mapper_thread_count);1069 for (i = 0; i < libtrace-> mapper_thread_count; i++) {1070 libtrace_thread_t *t = &libtrace-> mapper_threads[i];1066 libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct __packet_storage_magic_type)); 1067 1068 1069 /* Start all of our perpkt threads */ 1070 libtrace->perpkt_threads = calloc(sizeof(libtrace_thread_t), libtrace->perpkt_thread_count); 1071 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1072 libtrace_thread_t *t = &libtrace->perpkt_threads[i]; 1071 1073 t->trace = libtrace; 1072 1074 t->ret = NULL; 1073 t->type = THREAD_ MAPPER;1075 t->type = THREAD_PERPKT; 1074 1076 t->state = THREAD_RUNNING; 1075 1077 t->user_data = NULL; 1076 1078 // t->tid DONE on create 1077 t-> map_num = i;1079 t->perpkt_num = i; 1078 1080 if (libtrace->hasher) 1079 libtrace_ringbuffer_init(&t->rbuffer, libtrace-> mapper_buffer_size, LIBTRACE_RINGBUFFER_POLLING);1081 libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING); 1080 1082 // Depending on the mode vector or deque might be chosen 1081 1083 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t)); … … 1091 1093 int threads_started = 0; 1092 1094 /* Setup the trace and start our threads */ 1093 if (libtrace-> mapper_thread_count > 1 && libtrace->format->pstart_input) {1095 if (libtrace->perpkt_thread_count > 1 && libtrace->format->pstart_input) { 1094 1096 printf("This format has direct support for p's\n"); 1095 1097 threads_started = libtrace->format->pstart_input(libtrace); … … 1112 1114 1113 1115 // TODO fix these leaks etc 1114 if (libtrace-> mapper_thread_count != threads_started)1115 printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace-> mapper_thread_count);1116 if (libtrace->perpkt_thread_count != threads_started) 1117 printf("Warning started threads not equal requested s=%d r=%d", threads_started, libtrace->perpkt_thread_count); 1116 1118 1117 1119 … … 1124 1126 * 2. All perpkt threads are paused waiting on a condition var 1125 1127 * 3. Then call ppause on the underlying format if found 1126 * 4. Return with perpkt _pausing set to mapper_count (Used when restarting so we reuse the threads)1128 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads) 1127 1129 * 1128 1130 * Once done you should be a able to modify the trace setup and call pstart again … … 1148 1150 1149 1151 printf("Sending messages \n"); 1150 // Stop threads, skip this one if its a mapper1151 for (i = 0; i < libtrace-> mapper_thread_count; i++) {1152 if (&libtrace-> mapper_threads[i] != t) {1152 // Stop threads, skip this one if its a perpkt 1153 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1154 if (&libtrace->perpkt_threads[i] != t) { 1153 1155 libtrace_message_t message; 1154 1156 message.code = MESSAGE_PAUSE; 1155 1157 message.additional = NULL; 1156 trace_send_message_to_thread(libtrace, &libtrace-> mapper_threads[i], &message);1158 trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message); 1157 1159 } 1158 1160 } … … 1163 1165 1164 1166 if (t) { 1165 // A mapperis doing the pausing interesting fake a extra thread paused1167 // A perpkt is doing the pausing interesting fake a extra thread paused 1166 1168 // We rely on the user to not return before starting the trace again 1167 1169 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 1168 libtrace->perpkt _pausing++;1170 libtrace->perpkts_pausing++; 1169 1171 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 1170 1172 } … … 1178 1180 // Wait for all threads to pause 1179 1181 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 1180 while (libtrace-> mapper_thread_count != libtrace->perpkt_pausing) {1182 while (libtrace->perpkt_thread_count != libtrace->perpkts_pausing) { 1181 1183 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0); 1182 1184 } … … 1222 1224 // Now send a message asking the threads to stop 1223 1225 // This will be retrieved before trying to read another packet 1224 for (i = 0; i < libtrace-> mapper_thread_count; i++) {1226 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1225 1227 libtrace_message_t message; 1226 1228 message.code = MESSAGE_STOP; 1227 1229 message.additional = NULL; 1228 trace_send_message_to_thread(libtrace, &libtrace-> mapper_threads[i], &message);1230 trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message); 1229 1231 } 1230 1232 … … 1303 1305 int i; 1304 1306 1305 /* Firstly wait for the mapperthreads to finish, since these are1307 /* Firstly wait for the perpkt threads to finish, since these are 1306 1308 * user controlled */ 1307 for (i=0; i< libtrace-> mapper_thread_count; i++) {1308 //printf("Waiting to join with mapper#%d\n", i);1309 assert(pthread_join(libtrace-> mapper_threads[i].tid, NULL) == 0);1310 //printf("Joined with mapper#%d\n", i);1309 for (i=0; i< libtrace->perpkt_thread_count; i++) { 1310 //printf("Waiting to join with perpkt #%d\n", i); 1311 assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0); 1312 //printf("Joined with perpkt #%d\n", i); 1311 1313 // So we must do our best effort to empty the queue - so 1312 1314 // the producer (or any other threads) don't block. … … 1314 1316 // Mark that we are no longer accepting packets 1315 1317 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 1316 libtrace-> mapper_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer1318 libtrace->perpkt_threads[i].state = THREAD_FINISHED; // Important we are finished before we empty the buffer 1317 1319 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 1318 while(libtrace_ringbuffer_try_read(&libtrace-> mapper_threads[i].rbuffer, (void **) &packet))1319 if (packet) // This could be NULL iff the mapperfinishes early1320 while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet)) 1321 if (packet) // This could be NULL iff the perpkt finishes early 1320 1322 trace_destroy_packet(packet); 1321 1323 } … … 1332 1334 // Now that everything is finished nothing can be touching our 1333 1335 // buffers so clean them up 1334 for (i = 0; i < libtrace-> mapper_thread_count; i++) {1336 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1335 1337 // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up 1336 1338 // if they lost timeslice before-during a write 1337 1339 libtrace_packet_t * packet; 1338 while(libtrace_ringbuffer_try_read(&libtrace-> mapper_threads[i].rbuffer, (void **) &packet))1340 while(libtrace_ringbuffer_try_read(&libtrace->perpkt_threads[i].rbuffer, (void **) &packet)) 1339 1341 trace_destroy_packet(packet); 1340 1342 if (libtrace->hasher) { 1341 assert(libtrace_ringbuffer_is_empty(&libtrace-> mapper_threads[i].rbuffer));1342 libtrace_ringbuffer_destroy(&libtrace-> mapper_threads[i].rbuffer);1343 assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer)); 1344 libtrace_ringbuffer_destroy(&libtrace->perpkt_threads[i].rbuffer); 1343 1345 } 1344 1346 // Cannot destroy vector yet, this happens with trace_destroy … … 1486 1488 { 1487 1489 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ? 1488 libtrace_thread_t * t = &libtrace-> mapper_threads[this_thread];1490 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread]; 1489 1491 1490 1492 assert (pthread_spin_lock(&t->tmp_spinlock) == 0); … … 1506 1508 { 1507 1509 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ? 1508 libtrace_thread_t * t = &libtrace-> mapper_threads[this_thread];1510 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread]; 1509 1511 if (t->tmp_key != key) { 1510 1512 if (t->tmp_data) { … … 1525 1527 // Who am I??? 1526 1528 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ? 1527 libtrace_thread_t * t = &libtrace-> mapper_threads[this_thread];1529 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread]; 1528 1530 // Now put it into my table 1529 1531 static __thread int count = 0; … … 1578 1580 libtrace_vector_empty(results); 1579 1581 // Check all of the temp queues 1580 for (i = 0; i < libtrace-> mapper_thread_count; ++i) {1582 for (i = 0; i < libtrace->perpkt_thread_count; ++i) { 1581 1583 libtrace_result_t r = {0,0}; 1582 assert (pthread_spin_lock(&libtrace-> mapper_threads[i].tmp_spinlock) == 0);1583 if (libtrace-> mapper_threads[i].tmp_key == key) {1584 libtrace_result_set_key_value(&r, key, libtrace-> mapper_threads[i].tmp_data);1585 libtrace-> mapper_threads[i].tmp_data = NULL;1584 assert (pthread_spin_lock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0); 1585 if (libtrace->perpkt_threads[i].tmp_key == key) { 1586 libtrace_result_set_key_value(&r, key, libtrace->perpkt_threads[i].tmp_data); 1587 libtrace->perpkt_threads[i].tmp_data = NULL; 1586 1588 printf("Found in temp queue\n"); 1587 1589 } 1588 assert (pthread_spin_unlock(&libtrace-> mapper_threads[i].tmp_spinlock) == 0);1590 assert (pthread_spin_unlock(&libtrace->perpkt_threads[i].tmp_spinlock) == 0); 1589 1591 if (libtrace_result_get_value(&r)) { 1590 1592 // Got a result still in temporary … … 1593 1595 } else { 1594 1596 // This might be waiting on the actual queue 1595 libtrace_queue_t *v = &libtrace-> mapper_threads[i].deque;1597 libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque; 1596 1598 if (libtrace_deque_peek_front(v, (void *) &r) && 1597 1599 libtrace_result_get_value(&r)) { 1598 assert (libtrace_deque_pop_front(&libtrace-> mapper_threads[i].deque, (void *) &r) == 1);1600 assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[i].deque, (void *) &r) == 1); 1599 1601 printf("Found in real queue\n"); 1600 1602 libtrace_vector_push_back(results, &r); … … 1621 1623 if (flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) { 1622 1624 int live_count = 0; 1623 bool live[libtrace-> mapper_thread_count]; // Set if a trace is alive1624 uint64_t key[libtrace-> mapper_thread_count]; // Cached keys1625 bool live[libtrace->perpkt_thread_count]; // Set if a trace is alive 1626 uint64_t key[libtrace->perpkt_thread_count]; // Cached keys 1625 1627 uint64_t min_key = UINT64_MAX; // XXX use max int here stdlimit.h? 1626 1628 int min_queue = -1; 1627 1629 1628 1630 /* Loop through check all are alive (have data) and find the smallest */ 1629 for (i = 0; i < libtrace-> mapper_thread_count; ++i) {1630 libtrace_queue_t *v = &libtrace-> mapper_threads[i].deque;1631 for (i = 0; i < libtrace->perpkt_thread_count; ++i) { 1632 libtrace_queue_t *v = &libtrace->perpkt_threads[i].deque; 1631 1633 if (libtrace_deque_get_size(v) != 0) { 1632 1634 libtrace_result_t r; … … 1645 1647 1646 1648 /* Now remove the smallest and loop - special case if all threads have joined we always flush whats left */ 1647 while ((live_count == libtrace-> mapper_thread_count) || (live_count &&1649 while ((live_count == libtrace->perpkt_thread_count) || (live_count && 1648 1650 ((flags & REDUCE_SEQUENTIAL && min_key == libtrace->expected_key) || 1649 1651 libtrace->joined))) { … … 1651 1653 libtrace_result_t r; 1652 1654 1653 assert (libtrace_deque_pop_front(&libtrace-> mapper_threads[min_queue].deque, (void *) &r) == 1);1655 assert (libtrace_deque_pop_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r) == 1); 1654 1656 libtrace_vector_push_back(results, &r); 1655 1657 … … 1658 1660 1659 1661 // Now update the one we just removed 1660 if (libtrace_deque_get_size(&libtrace-> mapper_threads[min_queue].deque) )1662 if (libtrace_deque_get_size(&libtrace->perpkt_threads[min_queue].deque) ) 1661 1663 { 1662 libtrace_deque_peek_front(&libtrace-> mapper_threads[min_queue].deque, (void *) &r);1664 libtrace_deque_peek_front(&libtrace->perpkt_threads[min_queue].deque, (void *) &r); 1663 1665 key[min_queue] = libtrace_result_get_key(&r); 1664 1666 if (key[min_queue] <= min_key) { … … 1668 1670 min_key = key[min_queue]; // Update our minimum 1669 1671 // Check all find the smallest again - all are alive 1670 for (i = 0; i < libtrace-> mapper_thread_count; ++i) {1672 for (i = 0; i < libtrace->perpkt_thread_count; ++i) { 1671 1673 if (live[i] && min_key > key[i]) { 1672 1674 min_key = key[i]; … … 1680 1682 min_key = UINT64_MAX; // Update our minimum 1681 1683 // Check all find the smallest again - all are alive 1682 for (i = 0; i < libtrace-> mapper_thread_count; ++i) {1684 for (i = 0; i < libtrace->perpkt_thread_count; ++i) { 1683 1685 // Still not 100% TODO (what if order is wrong or not increasing) 1684 1686 if (live[i] && min_key >= key[i]) { … … 1690 1692 } 1691 1693 } else { // Queues are not in order - return all results in the queue 1692 for (i = 0; i < libtrace-> mapper_thread_count; i++) {1693 libtrace_vector_append(results, &libtrace-> mapper_threads[i].vector);1694 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1695 libtrace_vector_append(results, &libtrace->perpkt_threads[i].vector); 1694 1696 } 1695 1697 if (flags & REDUCE_SORT) { … … 1721 1723 // TODO I don't like using this so much 1722 1724 //assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 1723 for (i = 0; i < libtrace-> mapper_thread_count; i++) {1724 if (libtrace-> mapper_threads[i].state == THREAD_RUNNING)1725 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1726 if (libtrace->perpkt_threads[i].state == THREAD_RUNNING) 1725 1727 b++; 1726 1728 } … … 1735 1737 case TRACE_OPTION_SET_HASHER: 1736 1738 return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL); 1737 case TRACE_OPTION_SET_ MAPPER_BUFFER_SIZE:1738 libtrace-> mapper_buffer_size = *((int *) value);1739 case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE: 1740 libtrace->perpkt_buffer_size = *((int *) value); 1739 1741 return 1; 1740 1742 case TRACE_OPTION_SET_PACKET_FREELIST_SIZE: 1741 1743 libtrace->packet_freelist_size = *((int *) value); 1742 1744 return 1; 1743 case TRACE_OPTION_SET_ MAPPER_THREAD_COUNT:1744 libtrace-> mapper_thread_count = *((int *) value);1745 case TRACE_OPTION_SET_PERPKT_THREAD_COUNT: 1746 libtrace->perpkt_thread_count = *((int *) value); 1745 1747 return 1; 1746 1748 case TRACE_DROP_OUT_OF_ORDER: … … 1770 1772 case TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER: 1771 1773 if (*((int *) value)) 1772 libtrace->reducer_flags |= MAPPER_USE_SLIDING_WINDOW;1774 libtrace->reducer_flags |= PERPKT_USE_SLIDING_WINDOW; 1773 1775 else 1774 libtrace->reducer_flags &= ~ MAPPER_USE_SLIDING_WINDOW;1776 libtrace->reducer_flags &= ~PERPKT_USE_SLIDING_WINDOW; 1775 1777 return 1; 1776 1778 case TRACE_OPTION_TRACETIME:
Note: See TracChangeset
for help on using the changeset viewer.