Changeset 0862c20
- Timestamp:
- 08/12/14 15:22:24 (7 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 957a72a
- Parents:
- 7e4e9b8 (diff), f051c1b (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - Files:
-
- 1 added
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/libtrace.h.in
ra49a9eb rf051c1b 244 244 uint64_t key; 245 245 void * value; 246 int is_packet;246 int type; 247 247 } libtrace_result_t; 248 #define RESULT_NORMAL 0 249 #define RESULT_PACKET 1 250 #define RESULT_TICK 2 248 251 249 252 typedef struct libtrace_thread_t libtrace_thread_t; … … 3196 3199 3197 3200 typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread); 3198 typedef void* (*fn_re ducer)(libtrace_t* trace, void* global_blob);3201 typedef void* (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m); 3199 3202 typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data); 3200 3203 3201 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_re ducer reducer);3204 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter); 3202 3205 DLLEXPORT int trace_ppause(libtrace_t *libtrace); 3203 3206 DLLEXPORT int trace_pstop(libtrace_t *libtrace); … … 3219 3222 3220 3223 3221 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value); 3222 DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet); 3224 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type); 3223 3225 typedef struct libtrace_vector libtrace_vector_t; 3224 3226 DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results); 3225 3227 3226 DLLEXPORT int trace_post_re duce(libtrace_t *libtrace);3228 DLLEXPORT int trace_post_reporter(libtrace_t *libtrace); 3227 3229 DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace); 3228 3230 DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message); 3229 3231 DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message); 3230 DLLEXPORT int trace_send_message_to_re ducer(libtrace_t * libtrace, libtrace_message_t * message);3232 DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message); 3231 3233 DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message); 3232 3234 DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message); … … 3246 3248 */ 3247 3249 TRACE_OPTION_SET_HASHER, 3248 3249 /**3250 * See diagrams, this sets the maximum size of freelist used to3251 * maintain packets and there memory buffers.3252 * NOTE setting this to less than recommend could cause deadlock a3253 * trace that manages its own packets.3254 * A unblockable error message will be printed.3255 */3256 TRACE_OPTION_SET_PACKET_FREELIST_SIZE,3257 3258 /**3259 * See diagrams, this sets the maximum size of buffers used between3260 * the single hasher thread and the buffer.3261 * NOTE setting this to less than recommend could cause deadlock a3262 * trace that manages its own packets.3263 * A unblockable warning message will be printed to stderr in this case.3264 */3265 TRACE_OPTION_SET_PERPKT_BUFFER_SIZE,3266 3250 3267 3251 /** … … 3282 3266 * Libtrace ordered results, results in each queue are ordered by key 3283 3267 * however my not be sequential, a typically case is packet timestamps 3284 * the re ducer will receive packets in order - note threasholds3268 * the reporter will receive packets in order - note threasholds 3285 3269 * will be used such that a empty queue wont break things 3286 3270 */ … … 3308 3292 3309 3293 enum libtrace_messages { 3310 MESSAGE_STARTED, 3294 MESSAGE_STARTING, 3295 MESSAGE_RESUMING, 3296 MESSAGE_STOPPING, 3297 MESSAGE_PAUSING, 3311 3298 MESSAGE_DO_PAUSE, 3312 MESSAGE_PAUSING,3313 MESSAGE_PAUSED,3314 3299 MESSAGE_DO_STOP, 3315 MESSAGE_STOPPED,3316 3300 MESSAGE_FIRST_PACKET, 3317 3301 MESSAGE_PERPKT_ENDED, … … 3319 3303 MESSAGE_PERPKT_PAUSED, 3320 3304 MESSAGE_PERPKT_EOF, 3321 MESSAGE_POST_RE DUCE,3305 MESSAGE_POST_REPORTER, 3322 3306 MESSAGE_POST_RANGE, 3323 3307 MESSAGE_TICK, … … 3386 3370 DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace); 3387 3371 3372 /** 3373 * Tuning the parallel sizes 3374 */ 3375 struct user_configuration { 3376 // Packet memory cache settings (ocache_init) total 3377 /** 3378 * See diagrams, this sets the maximum size of freelist used to 3379 * maintain packets and their memory buffers. 3380 * NOTE setting this to less than recommend could cause deadlock a 3381 * trace that manages its own packets. 3382 * A unblockable error message will be printed. 3383 */ 3384 size_t packet_global_cache_size; 3385 // Per thread 3386 size_t packet_thread_cache_size; 3387 // Packet count limited 3388 bool fixed_packet_count; 3389 // Bursts/Batches of packets this size are combined, used in single thread mode 3390 size_t burst_size; 3391 // Each perpkt thread has a queue leading into the reporter 3392 //size_t reporter_queue_size; 3393 /** The tick interval - in milliseconds (0) */ 3394 size_t tick_interval; 3395 // The tick interval for file based traces, in number of packets TODO implement this 3396 size_t tick_count; 3397 3398 // The number of per packet threads requested 3399 size_t perpkt_threads; 3400 3401 /** 3402 * See diagrams, this sets the maximum size of buffers used between 3403 * the single hasher thread and the buffer. 3404 * NOTE setting this to less than recommend could cause deadlock a 3405 * trace that manages its own packets. 3406 * A unblockable warning message will be printed to stderr in this case. 3407 */ 3408 /** The number of packets that can queue per thread from hasher thread */ 3409 size_t hasher_queue_size; 3410 3411 // Reporter threashold before results are sent 3412 size_t reporter_thold; 3413 }; 3414 3415 #define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration)); 3416 3388 3417 #ifdef __cplusplus 3389 3418 } /* extern "C" */ -
lib/libtrace_int.h
rbe3f75b rf051c1b 177 177 THREAD_HASHER, 178 178 THREAD_PERPKT, 179 THREAD_RE DUCER,179 THREAD_REPORTER, 180 180 THREAD_KEEPALIVE 181 181 }; … … 307 307 /** For the sliding window hasher implementation */ 308 308 pthread_rwlock_t window_lock; 309 /** Set once trace_join has been called */310 bool joined;311 309 /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */ 312 310 bool perpkt_queue_full; 313 311 /** Global storage for this trace, shared among all the threads */ 314 312 void* global_blob; 315 /** Requested size of the pkt buffer (currently only used if using dedicated hasher thread) */316 int packet_freelist_size;317 313 /** The actual freelist */ 318 314 libtrace_ocache_t packet_freelist; 319 /** The number of packets that can queue per thread - XXX consider deadlocks with non malloc()'d packets that need to be released */ 320 int perpkt_buffer_size; 321 /** The reducer flags */ 322 int reducer_flags; 323 /** The tick interval - in milliseconds (0 or -ve==disabled) */ 324 int tick_interval; 315 /** The reporter flags */ 316 int reporter_flags; 325 317 /** Used to track the next expected key */ 326 318 uint64_t expected_key; 327 319 /** User defined per_pkt function called when a pkt is ready */ 328 320 fn_per_pkt per_pkt; 329 /** User defined re ducer function entry point XXX not hooked up */330 fn_re ducer reducer;321 /** User defined reporter function entry point XXX not hooked up */ 322 fn_reporter reporter; 331 323 /** The hasher function */ 332 324 enum hasher_types hasher_type; … … 336 328 337 329 libtrace_thread_t hasher_thread; 338 libtrace_thread_t re ducer_thread;330 libtrace_thread_t reporter_thread; 339 331 libtrace_thread_t keepalive_thread; 340 332 int perpkt_thread_count; … … 351 343 */ 352 344 uint64_t dropped_packets; 353 uint64_t received_packets; 345 uint64_t received_packets; 346 struct user_configuration config; 354 347 }; 355 348 -
lib/trace.c
rf9a70ca rf051c1b 265 265 libtrace->state = STATE_NEW; 266 266 libtrace->perpkt_queue_full = false; 267 libtrace->re ducer_flags = 0;267 libtrace->reporter_flags = 0; 268 268 libtrace->global_blob = NULL; 269 269 libtrace->per_pkt = NULL; 270 libtrace->re ducer = NULL;270 libtrace->reporter = NULL; 271 271 libtrace->hasher = NULL; 272 libtrace->packet_freelist_size = 0;273 libtrace->perpkt_buffer_size = 0;274 272 libtrace->expected_key = 0; 275 273 libtrace_zero_ocache(&libtrace->packet_freelist); 276 274 libtrace_zero_thread(&libtrace->hasher_thread); 277 libtrace_zero_thread(&libtrace->re ducer_thread);275 libtrace_zero_thread(&libtrace->reporter_thread); 278 276 libtrace_zero_thread(&libtrace->keepalive_thread); 279 277 libtrace_zero_slidingwindow(&libtrace->sliding_window); 280 libtrace->re ducer_thread.type = THREAD_EMPTY;278 libtrace->reporter_thread.type = THREAD_EMPTY; 281 279 libtrace->perpkt_thread_count = 0; 282 280 libtrace->perpkt_threads = NULL; 283 281 libtrace->tracetime = 0; 284 libtrace->tick_interval = 0;285 282 libtrace->first_packets.first = 0; 286 283 libtrace->first_packets.count = 0; 287 284 libtrace->first_packets.packets = NULL; 288 285 libtrace->dropped_packets = UINT64_MAX; 286 ZERO_USER_CONFIG(libtrace->config); 289 287 290 288 /* Parse the URI to determine what sort of trace we are dealing with */ … … 388 386 libtrace->state = STATE_NEW; // TODO MAYBE DEAD 389 387 libtrace->perpkt_queue_full = false; 390 libtrace->re ducer_flags = 0;388 libtrace->reporter_flags = 0; 391 389 libtrace->global_blob = NULL; 392 390 libtrace->per_pkt = NULL; 393 libtrace->re ducer = NULL;391 libtrace->reporter = NULL; 394 392 libtrace->hasher = NULL; 395 393 libtrace->expected_key = 0; 396 libtrace->packet_freelist_size = 0;397 libtrace->perpkt_buffer_size = 0;398 394 libtrace_zero_ocache(&libtrace->packet_freelist); 399 395 libtrace_zero_thread(&libtrace->hasher_thread); 400 libtrace_zero_thread(&libtrace->re ducer_thread);396 libtrace_zero_thread(&libtrace->reporter_thread); 401 397 libtrace_zero_thread(&libtrace->keepalive_thread); 402 398 libtrace_zero_slidingwindow(&libtrace->sliding_window); 403 libtrace->re ducer_thread.type = THREAD_EMPTY;399 libtrace->reporter_thread.type = THREAD_EMPTY; 404 400 libtrace->perpkt_thread_count = 0; 405 401 libtrace->perpkt_threads = NULL; 406 402 libtrace->tracetime = 0; 407 libtrace->tick_interval = 0;403 ZERO_USER_CONFIG(libtrace->config); 408 404 409 405 for(tmp=formats_list;tmp;tmp=tmp->next) { -
lib/trace_parallel.c
rbe3f75b rf051c1b 101 101 102 102 103 #define VERBOSE_DEBBUGING 0103 #define VERBOSE_DEBBUGING 1 104 104 105 105 … … 181 181 182 182 /** 183 * True if the trace has dedicated hasher thread otherwise false, 184 * to be used after the trace is running 185 */ 186 static inline int trace_has_dedicated_reporter(libtrace_t * libtrace) 187 { 188 assert(libtrace->state != STATE_NEW); 189 return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter; 190 } 191 192 /** 183 193 * Changes a thread's state and broadcasts the condition variable. This 184 194 * should always be done when the lock is held. … … 206 216 207 217 #if VERBOSE_DEBBUGING 208 fprintf(stderr, "Thread %d State changed from %d to %d\n",t->tid,209 t->state, prev_state);218 fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid, 219 prev_state, t->state); 210 220 #endif 211 221 if (need_lock) … … 232 242 #if VERBOSE_DEBBUGING 233 243 fprintf(stderr, "Trace(%s) state changed from %s to %s\n", 234 trace->uridata, get_trace_state_name( trace->state),235 get_trace_state_name( prev_state));244 trace->uridata, get_trace_state_name(prev_state), 245 get_trace_state_name(trace->state)); 236 246 #endif 237 247 if (need_lock) … … 309 319 if (!(ret = get_thread_table(libtrace))) { 310 320 pthread_t tid = pthread_self(); 311 // Check if we are re ducer or something else312 if (pthread_equal(tid, libtrace->re ducer_thread.tid))313 ret = &libtrace->re ducer_thread;321 // Check if we are reporter or something else 322 if (pthread_equal(tid, libtrace->reporter_thread.tid)) 323 ret = &libtrace->reporter_thread; 314 324 else if (pthread_equal(tid, libtrace->hasher_thread.tid)) 315 325 ret = &libtrace->hasher_thread; … … 324 334 { 325 335 libtrace_result_t *res = (libtrace_result_t *)data; 326 if (res-> is_packet) {336 if (res->type == RESULT_PACKET) { 327 337 // Duplicate the packet in standard malloc'd memory and free the 328 // original 338 // original, This is a 1:1 exchange so is ocache count remains unchanged. 329 339 libtrace_packet_t *oldpkt, *dup; 330 340 oldpkt = (libtrace_packet_t *) res->value; … … 342 352 static void trace_make_results_packets_safe(libtrace_t *trace) { 343 353 libtrace_thread_t *t = get_thread_descriptor(trace); 344 if (trace->re ducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))354 if (trace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) 345 355 libtrace_deque_apply_function(&t->deque, &do_copy_result_packet); 346 356 else … … 353 363 */ 354 364 static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) { 355 trace_make_results_packets_safe(trace); 365 if (t->type == THREAD_PERPKT) 366 trace_make_results_packets_safe(trace); 356 367 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 357 368 thread_change_state(trace, t, THREAD_PAUSED, false); … … 363 374 } 364 375 365 #define PACKETQUEUES 10366 376 367 377 /** … … 372 382 libtrace_thread_t * t; 373 383 libtrace_message_t message = {0}; 374 libtrace_packet_t *packets[ PACKETQUEUES] = {NULL};384 libtrace_packet_t *packets[trace->config.burst_size]; 375 385 size_t nb_packets; 376 386 size_t i; 377 387 388 memset(&packets, 0, sizeof(void*) * trace->config.burst_size); 378 389 // Force this thread to wait until trace_pstart has been completed 379 390 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); … … 389 400 // Send a message to say we've started 390 401 391 message.code = MESSAGE_STARTED; 402 // Let the per_packet function know we have started 403 message.code = MESSAGE_STARTING; 392 404 message.sender = t; 393 394 // Let the per_packet function know we have started405 (*trace->per_pkt)(trace, NULL, &message, t); 406 message.code = MESSAGE_RESUMING; 395 407 (*trace->per_pkt)(trace, NULL, &message, t); 396 408 … … 403 415 // Send message to say we are pausing, TODO consider sender 404 416 message.code = MESSAGE_PAUSING; 417 message.sender = t; 405 418 (*trace->per_pkt)(trace, NULL, &message, t); 406 419 // If a hasher thread is running empty input queues so we don't loose data … … 409 422 // The hasher has stopped by this point, so the queue shouldn't be filling 410 423 while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 411 nb_packets = trace_pread_packet(trace, t, packets, 1); 412 if (nb_packets == 1) { 413 if (packets[0]->error > 0) 414 packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t); 415 } else { 416 fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", packets[0]->error, libtrace_ringbuffer_is_empty(&t->rbuffer)); 424 ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1); 425 if (packets[0]->error > 0) 426 packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t); 427 else if (packets[0]->error != -2) { 428 // EOF or error, either way we'll stop 429 while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 430 ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1); 431 // No packets after this should have any data in them 432 assert(packets[0]->error <= 0); 433 } 434 goto stop; 417 435 } 418 436 } 419 437 } 420 // Send a paused message as a final chance to memory copy any packets421 message.code = MESSAGE_PAUSED;422 (*trace->per_pkt)(trace, NULL, &message, t);423 438 // Now we do the actual pause, this returns when we are done 424 439 trace_thread_pause(trace, t); 440 message.code = MESSAGE_RESUMING; 441 (*trace->per_pkt)(trace, NULL, &message, t); 425 442 // Check for new messages as soon as we return 426 443 continue; … … 440 457 nb_packets = 1; 441 458 } else { 442 nb_packets = trace_pread_packet(trace, t, packets, PACKETQUEUES);459 nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size); 443 460 } 444 461 // Loop through the packets we just read … … 450 467 // An error this should be the last packet we read 451 468 size_t z; 452 for (z = i ; z < nb_packets; ++z) 453 fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[i]->error); 454 assert (i == nb_packets-1); 469 // We could have an eof or error and a message such as pause 470 for (z = i ; z < nb_packets; ++z) { 471 fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error); 472 assert (packets[z]->error < 1); 473 } 455 474 goto stop; 456 475 } … … 463 482 stop: 464 483 /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */ 484 465 485 // Let the per_packet function know we have stopped 466 message.code = MESSAGE_STOPPED; 467 message.sender = NULL; 486 message.code = MESSAGE_PAUSING; 487 message.sender = t; 488 (*trace->per_pkt)(trace, NULL, &message, t); 489 message.code = MESSAGE_STOPPING; 468 490 message.additional.uint64 = 0; 469 491 (*trace->per_pkt)(trace, NULL, &message, t); 470 492 471 493 // Free any remaining packets 472 for (i = 0; i < PACKETQUEUES; i++) {494 for (i = 0; i < trace->config.burst_size; i++) { 473 495 if (packets[i]) { 474 496 libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1); … … 483 505 message.code = MESSAGE_PERPKT_ENDED; 484 506 message.additional.uint64 = 0; 485 trace_send_message_to_re ducer(trace, &message);507 trace_send_message_to_reporter(trace, &message); 486 508 487 509 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); … … 566 588 pkt_skipped = 0; 567 589 } else { 590 assert(!"Dropping a packet!!"); 568 591 pkt_skipped = 1; // Reuse that packet no one read it 569 592 } … … 573 596 for (i = 0; i < trace->perpkt_thread_count; i++) { 574 597 libtrace_packet_t * bcast; 575 printf("Broadcasting error/EOF now the trace is over\n");598 fprintf(stderr, "Broadcasting error/EOF now the trace is over\n"); 576 599 if (i == trace->perpkt_thread_count - 1) { 577 600 bcast = packet; … … 582 605 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 583 606 if (trace->perpkt_threads[i].state != THREAD_FINISHED) { 584 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);585 607 // Unlock early otherwise we could deadlock 586 608 libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast); 609 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 587 610 } else { 611 fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i); 588 612 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 589 613 } … … 596 620 message.code = MESSAGE_PERPKT_ENDED; 597 621 message.additional.uint64 = 0; 598 trace_send_message_to_re ducer(trace, &message);622 trace_send_message_to_reporter(trace, &message); 599 623 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); 600 624 if (trace->format->punregister_thread) { … … 1070 1094 libtrace_message_t mesg = {0}; 1071 1095 mesg.code = MESSAGE_FIRST_PACKET; 1072 trace_send_message_to_re ducer(libtrace, &mesg);1096 trace_send_message_to_reporter(libtrace, &mesg); 1073 1097 t->recorded_first = true; 1074 1098 } … … 1124 1148 1125 1149 /** Similar to delay_tracetime but send messages to all threads periodically */ 1150 static void* reporter_entry(void *data) { 1151 libtrace_message_t message = {0}; 1152 libtrace_t *trace = (libtrace_t *)data; 1153 libtrace_thread_t *t = &trace->reporter_thread; 1154 size_t res_size; 1155 libtrace_vector_t results; 1156 libtrace_vector_init(&results, sizeof(libtrace_result_t)); 1157 fprintf(stderr, "Reporter thread starting\n"); 1158 libtrace_result_t result; 1159 size_t i; 1160 1161 message.code = MESSAGE_STARTING; 1162 message.sender = t; 1163 (*trace->reporter)(trace, NULL, &message); 1164 message.code = MESSAGE_RESUMING; 1165 (*trace->reporter)(trace, NULL, &message); 1166 1167 while (!trace_finished(trace)) { 1168 1169 //while ( != LIBTRACE_MQ_FAILED) { } 1170 libtrace_message_queue_get(&t->messages, &message); 1171 1172 switch (message.code) { 1173 // Check for results 1174 case MESSAGE_POST_REPORTER: 1175 res_size = trace_get_results(trace, &results); 1176 for (i = 0; i < res_size; i++) { 1177 ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1); 1178 (*trace->reporter)(trace, &result, NULL); 1179 } 1180 break; 1181 case MESSAGE_DO_PAUSE: 1182 message.code = MESSAGE_PAUSING; 1183 message.sender = t; 1184 (*trace->reporter)(trace, NULL, &message); 1185 trace_thread_pause(trace, t); 1186 message.code = MESSAGE_RESUMING; 1187 (*trace->reporter)(trace, NULL, &message); 1188 break; 1189 default: 1190 (*trace->reporter)(trace, NULL, &message); 1191 } 1192 } 1193 1194 // Flush out whats left now all our threads have finished 1195 res_size = trace_get_results(trace, &results); 1196 for (i = 0; i < res_size; i++) { 1197 ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1); 1198 (*trace->reporter)(trace, &result, NULL); 1199 } 1200 1201 // GOODBYE 1202 message.code = MESSAGE_PAUSING; 1203 message.sender = t; 1204 (*trace->reporter)(trace, NULL, &message); 1205 message.code = MESSAGE_STOPPING; 1206 (*trace->reporter)(trace, NULL, &message); 1207 1208 thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true); 1209 print_memory_stats(); 1210 return NULL; 1211 } 1212 1213 /** Similar to delay_tracetime but send messages to all threads periodically */ 1126 1214 static void* keepalive_entry(void *data) { 1127 1215 struct timeval prev, next; … … 1135 1223 while (trace->state != STATE_FINSHED) { 1136 1224 fd_set rfds; 1137 next_release = tv_to_usec(&prev) + (trace-> tick_interval * 1000);1225 next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000); 1138 1226 gettimeofday(&next, NULL); 1139 1227 if (next_release > tv_to_usec(&next)) { … … 1329 1417 * @returns 0 on success 1330 1418 */ 1331 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_re ducer reducer)1419 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter) 1332 1420 { 1333 1421 int i; … … 1353 1441 if (per_pkt) 1354 1442 libtrace->per_pkt = per_pkt; 1443 1444 if (reporter) 1445 libtrace->reporter = reporter; 1355 1446 1356 1447 assert(libtrace_parallel); … … 1382 1473 libtrace->global_blob = global_blob; 1383 1474 libtrace->per_pkt = per_pkt; 1384 libtrace->re ducer = reducer;1475 libtrace->reporter = reporter; 1385 1476 1386 1477 ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0); … … 1391 1482 1392 1483 // Set default buffer sizes 1393 if (libtrace-> perpkt_buffer_size <= 0)1394 libtrace-> perpkt_buffer_size = 1000;1484 if (libtrace->config.hasher_queue_size <= 0) 1485 libtrace->config.hasher_queue_size = 1000; 1395 1486 1396 1487 if (libtrace->perpkt_thread_count <= 0) { … … 1402 1493 } 1403 1494 1404 if(libtrace->packet_freelist_size <= 0) 1405 libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count; 1406 1407 if(libtrace->packet_freelist_size < 1408 (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count) 1495 if (libtrace->config.reporter_thold <= 0) 1496 libtrace->config.reporter_thold = 100; 1497 if (libtrace->config.burst_size <= 0) 1498 libtrace->config.burst_size = 10; 1499 if (libtrace->config.packet_thread_cache_size <= 0) 1500 libtrace->config.packet_thread_cache_size = 20; 1501 if (libtrace->config.packet_global_cache_size <= 0) 1502 libtrace->config.packet_global_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count; 1503 1504 if (libtrace->config.packet_global_cache_size < 1505 (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count) 1409 1506 fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n"); 1410 1507 … … 1432 1529 libtrace->hasher_thread.type = THREAD_EMPTY; 1433 1530 } 1434 //libtrace_ocache_init(&libtrace->packet_freelist, trace_create_packet, trace_destroy_packet, 64, libtrace->packet_freelist_size * 4, true); 1531 1435 1532 libtrace_ocache_init(&libtrace->packet_freelist, 1436 1533 (void* (*)()) trace_create_packet, 1437 1534 (void (*)(void *))trace_destroy_packet, 1438 64, 1439 libtrace->packet_freelist_size * 4, 1440 true); 1535 libtrace->config.packet_thread_cache_size, 1536 libtrace->config.packet_global_cache_size * 4, 1537 libtrace->config.fixed_packet_count); 1538 // Unused slidingwindow code 1441 1539 //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0); 1442 ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0); 1540 //ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0); 1541 1443 1542 // This will be applied to every new thread that starts, i.e. they will block all signals 1444 1543 // Lets start a fixed number of reading threads 1445 1446 // For now we never have a dedicated thread for the reducer1447 // i.e. This main thread is used as the reducer1448 libtrace->reducer_thread.tid = pthread_self();1449 libtrace->reducer_thread.type = THREAD_REDUCER;1450 libtrace->reducer_thread.state = THREAD_RUNNING;1451 libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));1452 1544 1453 1545 /* Ready some storages */ … … 1470 1562 t->perpkt_num = i; 1471 1563 if (libtrace->hasher) 1472 libtrace_ringbuffer_init(&t->rbuffer, libtrace-> perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING);1564 libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, LIBTRACE_RINGBUFFER_POLLING); 1473 1565 // Depending on the mode vector or deque might be chosen 1474 1566 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t)); … … 1492 1584 threads_started = trace_start_perpkt_threads(libtrace); 1493 1585 1494 if (libtrace->tick_interval > 0) { 1586 libtrace->reporter_thread.type = THREAD_REPORTER; 1587 libtrace->reporter_thread.state = THREAD_RUNNING; 1588 libtrace_message_queue_init(&libtrace->reporter_thread.messages, sizeof(libtrace_message_t)); 1589 if (reporter) { 1590 // Got a real reporter 1591 ASSERT_RET(pthread_create(&libtrace->reporter_thread.tid, NULL, reporter_entry, (void *) libtrace), == 0); 1592 } else { 1593 // Main thread is reporter 1594 libtrace->reporter_thread.tid = pthread_self(); 1595 } 1596 1597 if (libtrace->config.tick_interval > 0) { 1495 1598 libtrace->keepalive_thread.type = THREAD_KEEPALIVE; 1496 1599 libtrace->keepalive_thread.state = THREAD_RUNNING; … … 1580 1683 fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n"); 1581 1684 } 1685 } 1686 1687 // Deal with the reporter 1688 if (trace_has_dedicated_reporter(libtrace)) { 1689 fprintf(stderr, "Reporter thread running we deal with this special!\n"); 1690 libtrace_message_t message = {0}; 1691 message.code = MESSAGE_DO_PAUSE; 1692 trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message); 1693 // Wait for it to pause 1694 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1695 while (libtrace->reporter_thread.state == THREAD_RUNNING) { 1696 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0); 1697 } 1698 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1582 1699 } 1583 1700 … … 1760 1877 // buffers so clean them up 1761 1878 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1762 // Its possible 1 packet got added by the re ducer (or 1 per any other thread) since we cleaned up1879 // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up 1763 1880 // if they lost timeslice before-during a write 1764 1881 libtrace_packet_t * packet; … … 1773 1890 // TODO consider perpkt threads marking trace as finished before join is called 1774 1891 libtrace_change_state(libtrace, STATE_FINSHED, true); 1892 1893 if (trace_has_dedicated_reporter(libtrace)) { 1894 fprintf(stderr, "Waiting to join with the reporter\n"); 1895 pthread_join(libtrace->reporter_thread.tid, NULL); 1896 fprintf(stderr, "Joined with the reporter\n"); 1897 assert(libtrace->reporter_thread.state == THREAD_FINISHED); 1898 } 1775 1899 1776 1900 // Wait for the tick (keepalive) thread if it has been started … … 1812 1936 * Return backlog indicator 1813 1937 */ 1814 DLLEXPORT int trace_post_re duce(libtrace_t *libtrace)1938 DLLEXPORT int trace_post_reporter(libtrace_t *libtrace) 1815 1939 { 1816 1940 libtrace_message_t message = {0}; 1817 message.code = MESSAGE_POST_RE DUCE;1941 message.code = MESSAGE_POST_REPORTER; 1818 1942 message.sender = get_thread_descriptor(libtrace); 1819 return libtrace_message_queue_put(&libtrace->re ducer_thread.messages, (void *) &message);1943 return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message); 1820 1944 } 1821 1945 … … 1823 1947 * Return backlog indicator 1824 1948 */ 1825 DLLEXPORT int trace_send_message_to_re ducer(libtrace_t * libtrace, libtrace_message_t * message)1826 { 1827 //printf("Sending message code=%d to re ducer\n", message->code);1949 DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message) 1950 { 1951 //printf("Sending message code=%d to reporter\n", message->code); 1828 1952 message->sender = get_thread_descriptor(libtrace); 1829 return libtrace_message_queue_put(&libtrace->re ducer_thread.messages, message);1953 return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message); 1830 1954 } 1831 1955 … … 1835 1959 DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message) 1836 1960 { 1837 //printf("Sending message code=%d to re ducer\n", message->code);1961 //printf("Sending message code=%d to reporter\n", message->code); 1838 1962 message->sender = get_thread_descriptor(libtrace); 1839 1963 return libtrace_message_queue_put(&t->messages, message); … … 1847 1971 libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message); 1848 1972 } 1849 //printf("Sending message code=%d to re ducer\n", message->code);1973 //printf("Sending message code=%d to reporter\n", message->code); 1850 1974 return 0; 1851 1975 } … … 1909 2033 /** 1910 2034 * Publish to the reduce queue, return 1911 */ 1912 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) { 2035 * Should only be called by a perpkt thread, i.e. from a perpkt handler 2036 */ 2037 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) { 1913 2038 libtrace_result_t res; 1914 res.is_packet = 0;1915 // Who am I???1916 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?1917 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];1918 // Now put it into my table1919 2039 UNUSED static __thread int count = 0; 1920 2040 res.type = type; 1921 2041 1922 2042 libtrace_result_set_key_value(&res, key, value); … … 1930 2050 printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque)); 1931 2051 count = (count+1)%1000;*/ 1932 if (libtrace->re ducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {1933 if (libtrace_deque_get_size(&t->deque) >= 800) {1934 trace_post_re duce(libtrace);2052 if (libtrace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) { 2053 if (libtrace_deque_get_size(&t->deque) >= libtrace->config.reporter_thold) { 2054 trace_post_reporter(libtrace); 1935 2055 } 1936 2056 //while (libtrace_deque_get_size(&t->deque) >= 1000) … … 1941 2061 // sched_yield(); 1942 2062 1943 if (libtrace_vector_get_size(&t->vector) >= 800) {1944 trace_post_re duce(libtrace);2063 if (libtrace_vector_get_size(&t->vector) >= libtrace->config.reporter_thold) { 2064 trace_post_reporter(libtrace); 1945 2065 } 1946 2066 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :) 1947 2067 } 1948 2068 } 1949 1950 DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {1951 libtrace_result_t res;1952 // Who am I???1953 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?1954 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];1955 // Now put it into my table1956 UNUSED static __thread int count = 0;1957 1958 res.is_packet = 1;1959 libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);1960 /*1961 if (count == 1)1962 printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));1963 count = (count+1) %1000;1964 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)1965 */1966 /*if (count == 1)1967 printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));1968 count = (count+1)%1000;*/1969 if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {1970 if (libtrace_deque_get_size(&t->deque) >= 800) {1971 trace_post_reduce(libtrace);1972 }1973 //while (libtrace_deque_get_size(&t->deque) >= 1000)1974 // sched_yield();1975 libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)1976 } else {1977 //while (libtrace_vector_get_size(&t->vector) >= 1000)1978 // sched_yield();1979 1980 if (libtrace_vector_get_size(&t->vector) >= 800) {1981 trace_post_reduce(libtrace);1982 }1983 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)1984 }1985 }1986 1987 2069 1988 2070 static int compareres(const void* p1, const void* p2) … … 1998 2080 DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) { 1999 2081 int i; 2000 int flags = libtrace->re ducer_flags; // Hint these aren't a changing2082 int flags = libtrace->reporter_flags; // Hint these aren't a changing 2001 2083 2002 2084 libtrace_vector_empty(results); … … 2105 2187 DLLEXPORT int trace_finished(libtrace_t * libtrace) { 2106 2188 // TODO I don't like using this so much, we could use state!!! 2107 return !(libtrace->perpkt_thread_states[THREAD_RUNNING] || libtrace->perpkt_thread_states[THREAD_FINISHING]);2189 return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count; 2108 2190 } 2109 2191 … … 2113 2195 switch (option) { 2114 2196 case TRACE_OPTION_TICK_INTERVAL: 2115 libtrace-> tick_interval = *((int *) value);2197 libtrace->config.tick_interval = *((int *) value); 2116 2198 return 1; 2117 2199 case TRACE_OPTION_SET_HASHER: 2118 2200 return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL); 2119 case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:2120 libtrace->perpkt_buffer_size = *((int *) value);2121 return 1;2122 case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:2123 libtrace->packet_freelist_size = *((int *) value);2124 return 1;2125 2201 case TRACE_OPTION_SET_PERPKT_THREAD_COUNT: 2126 2202 libtrace->perpkt_thread_count = *((int *) value); … … 2128 2204 case TRACE_DROP_OUT_OF_ORDER: 2129 2205 if (*((int *) value)) 2130 libtrace->re ducer_flags |= REDUCE_DROP_OOO;2206 libtrace->reporter_flags |= REDUCE_DROP_OOO; 2131 2207 else 2132 libtrace->re ducer_flags &= ~REDUCE_DROP_OOO;2208 libtrace->reporter_flags &= ~REDUCE_DROP_OOO; 2133 2209 return 1; 2134 2210 case TRACE_OPTION_SEQUENTIAL: 2135 2211 if (*((int *) value)) 2136 libtrace->re ducer_flags |= REDUCE_SEQUENTIAL;2212 libtrace->reporter_flags |= REDUCE_SEQUENTIAL; 2137 2213 else 2138 libtrace->re ducer_flags &= ~REDUCE_SEQUENTIAL;2214 libtrace->reporter_flags &= ~REDUCE_SEQUENTIAL; 2139 2215 return 1; 2140 2216 case TRACE_OPTION_ORDERED: 2141 2217 if (*((int *) value)) 2142 libtrace->re ducer_flags |= REDUCE_ORDERED;2218 libtrace->reporter_flags |= REDUCE_ORDERED; 2143 2219 else 2144 libtrace->re ducer_flags &= ~REDUCE_ORDERED;2220 libtrace->reporter_flags &= ~REDUCE_ORDERED; 2145 2221 return 1; 2146 2222 case TRACE_OPTION_TRACETIME: -
test/Makefile
r17c5749 rf051c1b 15 15 BINS_PARALLEL = test-format-parallel test-format-parallel-hasher \ 16 16 test-format-parallel-singlethreaded test-format-parallel-stressthreads \ 17 test-format-parallel-singlethreaded-hasher 17 test-format-parallel-singlethreaded-hasher test-format-parallel-reporter 18 18 19 19 BINS = test-pcap-bpf test-event test-time test-dir test-wireless test-errors \ -
test/do-tests-parallel.sh
r54834a1 rf051c1b 57 57 do_test ./test-format-parallel-stressthreads erf 58 58 59 echo \* Read stress testing with 100 threads 60 do_test ./test-format-parallel-reporter erf 61 59 62 echo 60 63 echo "Tests passed: $OK" -
test/test-format-parallel-hasher.c
r59ef093 rf051c1b 95 95 bool seen_start_message; 96 96 bool seen_stop_message; 97 bool seen_ paused_message;97 bool seen_resumed_message; 98 98 bool seen_pausing_message; 99 99 int count; … … 125 125 } 126 126 else switch (mesg->code) { 127 case MESSAGE_START ED:127 case MESSAGE_STARTING: 128 128 assert(tls == NULL); 129 129 tls = calloc(sizeof(struct TLS), 1); … … 132 132 tls->seen_start_message = true; 133 133 break; 134 case MESSAGE_STOPP ED:134 case MESSAGE_STOPPING: 135 135 assert(tls->seen_start_message); 136 136 assert(tls != NULL); … … 139 139 140 140 // All threads publish to verify the thread count 141 trace_publish_result(trace, (uint64_t) 0, (void *) tls->count);142 trace_post_re duce(trace);141 trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL); 142 trace_post_reporter(trace); 143 143 free(tls); 144 144 break; … … 152 152 tls->seen_pausing_message = true; 153 153 break; 154 case MESSAGE_ PAUSED:155 assert(tls->seen_pausing_message );156 tls->seen_ paused_message = true;154 case MESSAGE_RESUMING: 155 assert(tls->seen_pausing_message || tls->seen_start_message); 156 tls->seen_resumed_message = true; 157 157 break; 158 158 } … … 197 197 trace_pstart(trace, NULL, per_packet, NULL); 198 198 iferr(trace,tracename); 199 200 199 /* Make sure traces survive a pause and restart */ 201 200 trace_ppause(trace); -
test/test-format-parallel-singlethreaded-hasher.c
r59ef093 rf051c1b 95 95 bool seen_start_message; 96 96 bool seen_stop_message; 97 bool seen_ paused_message;97 bool seen_resuming_message; 98 98 bool seen_pausing_message; 99 99 int count; … … 125 125 } 126 126 else switch (mesg->code) { 127 case MESSAGE_START ED:127 case MESSAGE_STARTING: 128 128 assert(tls == NULL); 129 129 tls = calloc(sizeof(struct TLS), 1); … … 132 132 tls->seen_start_message = true; 133 133 break; 134 case MESSAGE_STOPP ED:134 case MESSAGE_STOPPING: 135 135 assert(tls->seen_start_message); 136 136 assert(tls != NULL); … … 139 139 140 140 // All threads publish to verify the thread count 141 trace_publish_result(trace, (uint64_t) 0, (void *) tls->count);142 trace_post_re duce(trace);141 trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL); 142 trace_post_reporter(trace); 143 143 free(tls); 144 144 break; … … 152 152 tls->seen_pausing_message = true; 153 153 break; 154 case MESSAGE_ PAUSED:155 assert(tls->seen_pausing_message );156 tls->seen_ paused_message = true;154 case MESSAGE_RESUMING: 155 assert(tls->seen_pausing_message || tls->seen_start_message); 156 tls->seen_resuming_message = true; 157 157 break; 158 158 } -
test/test-format-parallel-singlethreaded.c
r59ef093 rf051c1b 95 95 bool seen_start_message; 96 96 bool seen_stop_message; 97 bool seen_ paused_message;97 bool seen_resuming_message; 98 98 bool seen_pausing_message; 99 99 int count; … … 125 125 } 126 126 else switch (mesg->code) { 127 case MESSAGE_START ED:127 case MESSAGE_STARTING: 128 128 assert(tls == NULL); 129 129 tls = calloc(sizeof(struct TLS), 1); … … 132 132 tls->seen_start_message = true; 133 133 break; 134 case MESSAGE_STOPP ED:134 case MESSAGE_STOPPING: 135 135 assert(tls->seen_start_message); 136 136 assert(tls != NULL); … … 139 139 140 140 // All threads publish to verify the thread count 141 trace_publish_result(trace, (uint64_t) 0, (void *) tls->count);142 trace_post_re duce(trace);141 trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL); 142 trace_post_reporter(trace); 143 143 free(tls); 144 144 break; … … 152 152 tls->seen_pausing_message = true; 153 153 break; 154 case MESSAGE_ PAUSED:155 assert(tls->seen_pausing_message );156 tls->seen_ paused_message = true;154 case MESSAGE_RESUMING: 155 assert(tls->seen_pausing_message || tls->seen_start_message); 156 tls->seen_resuming_message = true; 157 157 break; 158 158 } -
test/test-format-parallel-stressthreads.c
r59ef093 rf051c1b 95 95 bool seen_start_message; 96 96 bool seen_stop_message; 97 bool seen_ paused_message;97 bool seen_resuming_message; 98 98 bool seen_pausing_message; 99 99 int count; … … 125 125 } 126 126 else switch (mesg->code) { 127 case MESSAGE_START ED:127 case MESSAGE_STARTING: 128 128 assert(tls == NULL); 129 129 tls = calloc(sizeof(struct TLS), 1); … … 132 132 tls->seen_start_message = true; 133 133 break; 134 case MESSAGE_STOPP ED:134 case MESSAGE_STOPPING: 135 135 assert(tls->seen_start_message); 136 136 assert(tls != NULL); … … 139 139 140 140 // All threads publish to verify the thread count 141 trace_publish_result(trace, (uint64_t) 0, (void *) tls->count);142 trace_post_re duce(trace);141 trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL); 142 trace_post_reporter(trace); 143 143 free(tls); 144 144 break; … … 152 152 tls->seen_pausing_message = true; 153 153 break; 154 case MESSAGE_ PAUSED:155 assert(tls->seen_pausing_message );156 tls->seen_ paused_message = true;154 case MESSAGE_RESUMING: 155 assert(tls->seen_pausing_message || tls->seen_start_message); 156 tls->seen_resuming_message = true; 157 157 break; 158 158 } -
test/test-format-parallel.c
r59ef093 rf051c1b 95 95 bool seen_start_message; 96 96 bool seen_stop_message; 97 bool seen_ paused_message;97 bool seen_resuming_message; 98 98 bool seen_pausing_message; 99 99 int count; … … 132 132 } 133 133 else switch (mesg->code) { 134 case MESSAGE_START ED:134 case MESSAGE_STARTING: 135 135 assert(!seen_start_message || seen_paused_message); 136 136 assert(tls == NULL); … … 141 141 tls->seen_start_message = true; 142 142 break; 143 case MESSAGE_STOPP ED:143 case MESSAGE_STOPPING: 144 144 assert(seen_start_message); 145 145 assert(tls != NULL); … … 152 152 153 153 // All threads publish to verify the thread count 154 trace_publish_result(trace, (uint64_t) 0, (void *) count);155 trace_post_re duce(trace);154 trace_publish_result(trace, t, (uint64_t) 0, (void *) count, RESULT_NORMAL); 155 trace_post_reporter(trace); 156 156 break; 157 157 case MESSAGE_TICK: … … 165 165 tls->seen_pausing_message = true; 166 166 break; 167 case MESSAGE_ PAUSED:168 assert( seen_pausing_message);167 case MESSAGE_RESUMING: 168 assert(tls->seen_pausing_message || tls->seen_start_message ); 169 169 seen_paused_message = true; 170 tls->seen_ paused_message = true;170 tls->seen_resuming_message = true; 171 171 break; 172 172 } -
tools/traceanon/traceanon_parallel.c
r9594cf9 rf051c1b 27 27 static int s = 0; 28 28 (void)signal; 29 //trace_interrupt();29 //trace_interrupt(); 30 30 // trace_pstop isn't really signal safe because its got lots of locks in it 31 //trace_pstop(trace);32 31 trace_pstop(trace); 32 /*if (s == 0) { 33 33 if (trace_ppause(trace) == -1) 34 34 trace_perror(trace, "Pause failed"); … … 37 37 if (trace_pstart(trace, NULL, NULL, NULL) == -1) 38 38 trace_perror(trace, "Start failed"); 39 } 39 }*/ 40 40 s = !s; 41 41 } … … 141 141 142 142 143 static uint64_t bad_hash(libtrace_packet_t * pkt)143 UNUSED static uint64_t bad_hash(UNUSED libtrace_packet_t * pkt) 144 144 { 145 145 return 0; … … 147 147 148 148 149 static uint64_t rand_hash(libtrace_packet_t * pkt)149 UNUSED static uint64_t rand_hash(UNUSED libtrace_packet_t * pkt) 150 150 { 151 151 return rand(); … … 153 153 154 154 155 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t) 156 { 157 int i; 155 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t) 156 { 158 157 159 158 if (pkt) { … … 191 190 //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt); 192 191 //trace_publish_result(trace, trace_packet_get_order(pkt), pkt); 193 trace_publish_ packet(trace, pkt);192 trace_publish_result(trace, t, trace_packet_get_order(pkt), pkt, RESULT_PACKET); 194 193 //return ; 195 194 } … … 197 196 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 198 197 switch (mesg->code) { 199 case MESSAGE_START ED:198 case MESSAGE_STARTING: 200 199 enc_init(enc_type,key); 201 200 } … … 204 203 } 205 204 205 struct libtrace_out_t *writer = 0; 206 207 static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) { 208 static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format 209 if (result) { 210 libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result); 211 assert(libtrace_result_get_key(result) == packet_count++); 212 if (trace_write_packet(writer,packet)==-1) { 213 trace_perror_output(writer,"writer"); 214 trace_interrupt(); 215 } 216 trace_free_result_packet(trace, packet); 217 } 218 return NULL; 219 } 220 221 206 222 int main(int argc, char *argv[]) 207 223 { 208 224 //struct libtrace_t *trace = 0; 209 struct libtrace_packet_t *packet/* = trace_create_packet()*/;210 struct libtrace_out_t *writer = 0;211 225 struct sigaction sigact; 212 226 char *output = 0; … … 380 394 //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i); 381 395 i = 2; 382 383 384 if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {396 trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i); 397 398 if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) { 385 399 trace_perror(trace,"trace_start"); 386 400 trace_destroy_output(writer); … … 402 416 sigaction(SIGINT, &sigact, NULL); 403 417 sigaction(SIGTERM, &sigact, NULL); 404 405 // Read in the resulting packets and then free them when done 406 libtrace_vector_t res; 407 int res_size = 0; 408 libtrace_vector_init(&res, sizeof(libtrace_result_t)); 409 uint64_t packet_count = 0; 410 while (!trace_finished(trace)) { 411 // Read messages 412 libtrace_message_t message; 413 414 // We just release and do work currently, maybe if something 415 // interesting comes through we'd deal with that 416 libtrace_thread_get_message(trace, &message); 417 418 while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { } 419 420 if ((res_size = trace_get_results(trace, &res)) == 0) 421 ;/*sched_yield();*/ 422 423 for (i = 0 ; i < res_size ; i++) { 424 libtrace_result_t result; 425 assert(libtrace_vector_get(&res, i, (void *) &result) == 1); 426 packet = libtrace_result_get_value(&result); 427 assert(libtrace_result_get_key(&result) == packet_count); 428 packet_count++; 429 if (trace_write_packet(writer,packet)==-1) { 430 trace_perror_output(writer,"writer"); 431 trace_interrupt(); 432 break; 433 } 434 //trace_destroy_packet(packet); 435 trace_free_result_packet(trace, packet); 436 } 437 } 418 419 // Wait for the trace to finish 438 420 trace_join(trace); 439 440 // Grab everything that's left here441 res_size = trace_get_results(trace, &res);442 443 for (i = 0 ; i < res_size ; i++) {444 libtrace_result_t result;445 assert(libtrace_vector_get(&res, i, (void *) &result) == 1);446 packet = libtrace_result_get_value(&result);447 if (libtrace_result_get_key(&result) != packet_count)448 printf ("Got a %"PRIu64" but expected a %"PRIu64" %d\n", libtrace_result_get_key(&result), packet_count, res_size);449 assert(libtrace_result_get_key(&result) == packet_count);450 451 packet_count++;452 if (trace_write_packet(writer,packet)==-1) {453 trace_perror_output(writer,"writer");454 trace_interrupt();455 break;456 }457 trace_destroy_packet(packet);458 }459 libtrace_vector_destroy(&res);460 421 461 422 //trace_destroy_packet(packet); -
tools/tracertstats/tracertstats_parallel.c
rb13b939 rf051c1b 209 209 // Publish and make a new one new 210 210 //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts); 211 trace_publish_result(trace, (uint64_t) last_ts, results);212 trace_post_re duce(trace);211 trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL); 212 trace_post_reporter(trace); 213 213 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 214 214 last_ts++; … … 234 234 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 235 235 switch (mesg->code) { 236 case MESSAGE_START ED:236 case MESSAGE_STARTING: 237 237 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 238 238 break; 239 case MESSAGE_STOPP ED:239 case MESSAGE_STOPPING: 240 240 // Should we always post this? 241 241 if (results->total.count) { 242 trace_publish_result(trace, (uint64_t) last_ts, results);243 trace_post_re duce(trace);242 trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL); 243 trace_post_reporter(trace); 244 244 results = NULL; 245 245 } … … 260 260 if (next_update_time <= mesg->additional.uint64) { 261 261 //fprintf(stderr, "Got a tick and publishing early!!\n"); 262 trace_publish_result(trace, (uint64_t) last_ts, results);263 trace_post_re duce(trace);262 trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL); 263 trace_post_reporter(trace); 264 264 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 265 265 last_ts++; -
tools/tracestats/tracestats_parallel.c
r9594cf9 rf051c1b 67 67 // trace_interrupt(); 68 68 // trace_pstop isn't really signal safe because its got lots of locks in it 69 //trace_pstop(trace);70 69 trace_pstop(trace); 70 /*if (s == 0) { 71 71 if (trace_ppause(trace) == -1) 72 72 trace_perror(trace, "Pause failed"); … … 75 75 if (trace_pstart(trace, NULL, NULL, NULL) == -1) 76 76 trace_perror(trace, "Start failed"); 77 } 77 }*/ 78 78 s = !s; 79 79 } … … 129 129 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 130 130 switch (mesg->code) { 131 case MESSAGE_STOPP ED:132 trace_publish_result(trace, 0, results); // Only ever using a single key 0131 case MESSAGE_STOPPING: 132 trace_publish_result(trace, t, 0, results, RESULT_NORMAL); // Only ever using a single key 0 133 133 fprintf(stderr, "Thread published resuslts WOWW\n"); 134 134 break; 135 case MESSAGE_START ED:135 case MESSAGE_STARTING: 136 136 results = calloc(1, sizeof(statistics_t) * (filter_count + 1)); 137 137 break; … … 142 142 fprintf(stderr, "Thread is pausing\n"); 143 143 break; 144 case MESSAGE_ PAUSED:144 case MESSAGE_RESUMING: 145 145 fprintf(stderr, "Thread has paused\n"); 146 146 break; … … 228 228 int option = 2; 229 229 //option = 10000; 230 230 //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL); 231 231 option = 2; 232 232 trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
Note: See TracChangeset
for help on using the changeset viewer.