- Timestamp:
- 08/26/15 10:18:31 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 285acaa
- Parents:
- 7c17e4a (diff), 9a3a846 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - Location:
- lib
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/data-struct/linked_list.c
rcb39d35 r03aca91 52 52 53 53 new->prev = NULL; 54 memcpy( &new->data, item, l->element_size);54 memcpy(new->data, item, l->element_size); 55 55 56 56 if (l->head == NULL) { -
lib/data-struct/object_cache.c
r14c6c08 r4007dbb 27 27 }; 28 28 29 #ifdef ENABLE_MEM_STATS 29 30 extern __thread struct mem_stats mem_hits; 31 #endif 30 32 31 33 struct local_caches { … … 123 125 /* Get TLS for the list of local_caches */ 124 126 static inline struct local_caches *get_local_caches() { 127 #if HAVE_TLS 125 128 static __thread struct local_caches *lcs = NULL; 126 129 if (lcs) { 127 130 return lcs; 128 } else { 131 } 132 #else 133 struct local_caches *lcs; 134 pthread_once(&memory_destructor_once, &once_memory_cache_key_init); 135 if ((lcs=pthread_getspecific(memory_destructor_key)) != 0) { 136 return lcs; 137 } 138 #endif 139 else { 129 140 /* This thread has not been used with a memory pool before */ 130 141 /* Allocate our TLS */ … … 280 291 memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers); 281 292 lc->used -= nb_buffers; 293 #ifdef ENABLE_MEM_STATS 282 294 mem_hits.read.cache_hit += nb_buffers; 283 295 mem_hits.readbulk.cache_hit += 1; 296 #endif 284 297 return nb_buffers; 285 298 } … … 287 300 else if (nb_buffers > lc->total) { 288 301 i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers); 302 #ifdef ENABLE_MEM_STATS 289 303 if (i) 290 304 mem_hits.readbulk.ring_hit += 1; … … 292 306 mem_hits.readbulk.miss += 1; 293 307 mem_hits.read.ring_hit += i; 308 #endif 294 309 } else { // Not enough cached 295 310 // Empty the cache and re-fill it and then see what we're left with 296 311 i = lc->used; 297 312 memcpy(values, lc->cache, sizeof(void *) * lc->used); 313 #ifdef ENABLE_MEM_STATS 298 314 mem_hits.read.cache_hit += i; 315 #endif 299 316 300 317 // Make sure we still meet the minimum requirement … … 303 320 else 304 321 lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0); 305 322 #ifdef ENABLE_MEM_STATS 306 323 if (lc->used == lc->total) 307 324 mem_hits.readbulk.ring_hit += 1; … … 309 326 mem_hits.readbulk.miss += 1; 310 327 mem_hits.read.ring_hit += lc->used; 328 #endif 311 329 } 312 330 … … 319 337 i += remaining; 320 338 } 339 #ifdef ENABLE_MEM_STATS 321 340 mem_hits.read.miss += nb_buffers - i; 341 #endif 322 342 assert(i >= min_nb_buffers); 323 343 return i; … … 379 399 memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers); 380 400 lc->used += nb_buffers; 401 #ifdef ENABLE_MEM_STATS 381 402 mem_hits.write.cache_hit += nb_buffers; 382 403 mem_hits.writebulk.cache_hit += 1; 404 #endif 383 405 return nb_buffers; 384 406 } … … 386 408 else if (nb_buffers > lc->total) { 387 409 i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers); 410 #ifdef ENABLE_MEM_STATS 388 411 if (i) 389 412 mem_hits.writebulk.ring_hit += 1; … … 391 414 mem_hits.writebulk.miss += 1; 392 415 mem_hits.write.ring_hit += i; 416 #endif 393 417 } else { // Not enough cache space but there might later 394 418 // Fill the cache and empty it and then see what we're left with 395 419 i = (lc->total - lc->used); 396 420 memcpy(&lc->cache[lc->used], values, sizeof(void *) * i); 421 #ifdef ENABLE_MEM_STATS 397 422 mem_hits.write.cache_hit += i; 423 #endif 398 424 399 425 // Make sure we still meet the minimum requirement … … 407 433 memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used); 408 434 435 #ifdef ENABLE_MEM_STATS 409 436 if (lc->used) 410 437 mem_hits.writebulk.miss += 1; … … 412 439 mem_hits.writebulk.ring_hit += 1; 413 440 mem_hits.write.ring_hit += lc->total - lc->used; 441 #endif 414 442 } 415 443 … … 422 450 i += remaining; 423 451 } 452 #ifdef ENABLE_MEM_STATS 424 453 mem_hits.write.miss += nb_buffers - i; 454 #endif 425 455 return i; 426 456 } -
lib/format_linux_common.c
r773a2a3 rf2066fa 661 661 linuxcommon_close_input_stream(libtrace, stream); 662 662 } 663 libtrace_list_deinit(FORMAT_DATA->per_stream);664 free(libtrace->format_data);665 libtrace->format_data = NULL;666 663 return -1; 667 664 } -
lib/format_linux_int.c
re99c493 rf2066fa 69 69 { 70 70 int ret = linuxcommon_start_input_stream(libtrace, FORMAT_DATA_FIRST); 71 if (ret != 0) {72 libtrace_list_deinit(FORMAT_DATA->per_stream);73 free(libtrace->format_data);74 libtrace->format_data = NULL;75 }76 71 return ret; 77 72 } -
lib/format_linux_ring.c
r9d89626 rf2066fa 280 280 { 281 281 int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST); 282 if (ret != 0) {283 libtrace_list_deinit(FORMAT_DATA->per_stream);284 free(libtrace->format_data);285 libtrace->format_data = NULL;286 }287 282 return ret; 288 283 } -
lib/libtrace_int.h
rc723e9e r2fa43fa 307 307 /** The sequence is like accepted_packets but we don't reset this after a pause. */ 308 308 uint64_t sequence_number; 309 /** The packet read out by the trace, backwards compatibility to allow us to finalise 310 * a packet when the trace is destroyed */ 311 libtrace_packet_t *last_packet; 309 312 /** The filename from the uri for the trace */ 310 313 char *uridata; … … 330 333 /** The actual freelist */ 331 334 libtrace_ocache_t packet_freelist; 332 /** User defined per_ pkt function called when a pktis ready */333 fn_ per_pkt per_pkt;334 /** User defined reporter function entry point XXX not hooked up*/335 /** User defined per_msg function called when a message is ready */ 336 fn_cb_msg per_msg; 337 /** User defined reporter function entry point */ 335 338 fn_reporter reporter; 336 339 /** The hasher function */ … … 358 361 struct user_configuration config; 359 362 libtrace_combine_t combiner; 363 struct { 364 fn_cb_starting message_starting; 365 fn_cb_dataless message_stopping; 366 fn_cb_dataless message_resuming; 367 fn_cb_dataless message_pausing; 368 fn_cb_packet message_packet; 369 fn_cb_first_packet message_first_packet; 370 fn_cb_tick message_tick_count; 371 fn_cb_tick message_tick_interval; 372 } callbacks; 360 373 }; 361 374 -
lib/libtrace_parallel.h
rd3849c7 rf2066fa 154 154 * @param data unused, do not use this 155 155 * @param sender The sender will be set as the current thread 156 * @return When using a function callback for starting, the returned 157 * value is stored against the thread tls. Otherwise the return is ignored. 156 158 */ 157 159 MESSAGE_STARTING, … … 211 213 * sent once a new packet is encountered 212 214 * 213 * @ 215 * @see trace_get_first_packet() 214 216 */ 215 217 MESSAGE_FIRST_PACKET, … … 225 227 * trace_post_reporter() 226 228 * 227 * @see trace_get_first_packet()228 229 */ 229 230 MESSAGE_POST_REPORTER, … … 236 237 * with an earlier time-stamp. 237 238 * 238 * @param data data.uint64 _tholds the system time-stamp in the239 * @param data data.uint64 holds the system time-stamp in the 239 240 * erf format 240 241 * @param sender should be ignored … … 248 249 * threads will see it between the same packets. 249 250 * 250 * @param data The number of packets seen so far across all threads251 * @param data data.uint64 holds the number of packets seen so far across all threads 251 252 * @param sender Set to the current per-packet thread 252 253 */ … … 397 398 /** 398 399 * The definition for the main function that the user supplies to process 399 * packets.400 * messages. 400 401 * 401 402 * @param trace The trace the packet is related to. … … 412 413 * documentation for the message as to what value these will contain. 413 414 */ 414 typedef void* (*fn_ per_pkt)(libtrace_t* trace,415 416 417 418 415 typedef void* (*fn_cb_msg)(libtrace_t* trace, 416 libtrace_thread_t *thread, 417 int mesg_code, 418 libtrace_generic_t data, 419 libtrace_thread_t *sender); 419 420 420 421 /** … … 452 453 * @param libtrace The input trace to start 453 454 * @param global_blob Global data related to this trace accessible using trace_get_global() 454 * @param per_ pkt A user supplied function called when a packetis ready455 * @param per_msg A user supplied function called when a message is ready 455 456 * @param reporter A user supplied function called when a result is ready. 456 457 * Optional if NULL the reporter thread will not be started. … … 459 460 * This can also be used to restart an existing parallel trace, 460 461 * that has previously been paused using trace_ppause(). 461 * In this case global_blob,per_ pktand reporter will only be updated462 * In this case global_blob,per_msg and reporter will only be updated 462 463 * if they are non-null. Otherwise their previous values will be maintained. 463 464 * 464 465 */ 465 466 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 466 fn_per_pkt per_pkt, fn_reporter reporter); 467 fn_cb_msg per_msg, fn_reporter reporter); 468 469 /** 470 * 471 * @param libtrace The parallel trace 472 * @param t The thread that is running 473 * @param global The global storage 474 * @return The returned value is stored against the threads tls. 475 * This is typically passed as tls argument to other messages. 476 */ 477 typedef void* (*fn_cb_starting)(libtrace_t *libtrace, 478 libtrace_thread_t *t, 479 void *global); 480 481 /** 482 * @param libtrace The parallel trace 483 * @param t The thread that is running 484 * @param global The global storage 485 * @param tls The thread local storage 486 */ 487 typedef void (*fn_cb_dataless)(libtrace_t *libtrace, 488 libtrace_thread_t *t, 489 void *global, 490 void *tls); 491 492 /** 493 * @param libtrace The parallel trace 494 * @param t The thread that is running 495 * @param global The global storage 496 * @param tls The thread local storage 497 */ 498 typedef void (*fn_cb_first_packet)(libtrace_t *libtrace, 499 libtrace_thread_t *t, 500 void *global, 501 void *tls, 502 libtrace_packet_t *first_packet, 503 libtrace_thread_t *sender); 504 505 /** 506 * @param libtrace The parallel trace 507 * @param t The thread that is running 508 * @param global The global storage 509 * @param tls The thread local storage 510 * @param uint64_t Either the timestamp or packet count depending on message type 511 */ 512 typedef void (*fn_cb_tick)(libtrace_t *libtrace, 513 libtrace_thread_t *t, 514 void *global, 515 void *tls, 516 uint64_t order); 517 518 /** 519 * @param libtrace The parallel trace 520 * @param t The thread 521 * @param packet The packet associated with the message 522 * @param global The global storage 523 * @param tls The thread local storage 524 * 525 * @return optionally a packet which is handed back to the library, 526 * typically this is the packet supplied. Otherwise NULL. 527 */ 528 typedef libtrace_packet_t* (*fn_cb_packet)(libtrace_t *libtrace, 529 libtrace_thread_t *t, 530 void *global, 531 void *tls, 532 libtrace_packet_t *packet); 533 534 /** Registers a built-in message with a handler. 535 * Note we do not include the sending thread as an argument to the reporter. 536 * If set to NULL, the message will be sent to default perpkt handler. 537 * 538 * @param libtrace The input trace to start 539 * @param handler the handler to be called when the message is received 540 * @return 0 if successful otherwise -1. 541 */ 542 543 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler); 544 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler); 545 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler); 546 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler); 547 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler); 548 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler); 549 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler); 550 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler); 467 551 468 552 /** Pauses a trace previously started with trace_pstart() … … 542 626 /** Store data against a thread. 543 627 * 544 * @param The parallel trace.545 * @param data The new value to save against the t race628 * @param thread The thread 629 * @param data The new value to save against the thread 546 630 * @return The previously stored value 547 631 * … … 628 712 * as possible (real-time). 629 713 * 630 * @param A parallel input trace714 * @param trace A parallel input trace 631 715 * @param tracetime If true packets are released with time intervals matching 632 716 * the original trace. Otherwise packets are read as fast as possible. … … 810 894 * 811 895 * @param key (Typically) The packets order, see trace_packet_get_order() 812 * @param813 896 */ 814 897 RESULT_PACKET, … … 966 1049 */ 967 1050 DLLEXPORT bool trace_has_finished(libtrace_t * libtrace); 1051 1052 1053 /** Check if libtrace is directly reading from multiple queues 1054 * from the format (such as a NICs hardware queues). 1055 * 1056 * When a parallel trace is running, or if checked after its completion 1057 * this returns true if a trace was able to run natively parallel 1058 * from the format. Otherwise false is returned, meaning libtrace is 1059 * distibuting packets across multiple threads from a single source. 1060 * 1061 * Factors that may stop this happening despite the format supporting 1062 * native parallel reads include: the choice of hasher function, 1063 * the number of threads choosen (such as 1 or more than the trace supports) 1064 * or another error when trying to start the parallel format. 1065 * 1066 * If this is called before the trace is started. I.e. before pstart 1067 * this returns an indication that the trace has the possiblity to support 1068 * native parallel reads. After trace pstart is called this should be 1069 * checked again to confirm this has happened. 1070 * 1071 * 1072 * @return true if the trace is parallel or false if the library is splitting 1073 * the trace into multiple threads. 1074 */ 1075 DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace); 968 1076 969 1077 /** Returns either the sequence number or erf timestamp of a packet. -
lib/trace.c
r8370482 r9a3a846 261 261 libtrace->filtered_packets = 0; 262 262 libtrace->accepted_packets = 0; 263 libtrace->last_packet = NULL; 263 264 264 265 /* Parallel inits */ … … 268 269 libtrace->perpkt_queue_full = false; 269 270 libtrace->global_blob = NULL; 270 libtrace->per_ pkt= NULL;271 libtrace->per_msg = NULL; 271 272 libtrace->reporter = NULL; 272 273 libtrace->hasher = NULL; … … 287 288 ZERO_USER_CONFIG(libtrace->config); 288 289 memset(&libtrace->combiner, 0, sizeof(libtrace->combiner)); 290 memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks)); 289 291 290 292 /* Parse the URI to determine what sort of trace we are dealing with */ … … 382 384 libtrace->io = NULL; 383 385 libtrace->filtered_packets = 0; 386 libtrace->accepted_packets = 0; 387 libtrace->last_packet = NULL; 384 388 385 389 /* Parallel inits */ … … 389 393 libtrace->perpkt_queue_full = false; 390 394 libtrace->global_blob = NULL; 391 libtrace->per_ pkt= NULL;395 libtrace->per_msg = NULL; 392 396 libtrace->reporter = NULL; 393 397 libtrace->hasher = NULL; … … 405 409 ZERO_USER_CONFIG(libtrace->config); 406 410 memset(&libtrace->combiner, 0, sizeof(libtrace->combiner)); 411 memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks)); 407 412 408 413 for(tmp=formats_list;tmp;tmp=tmp->next) { … … 547 552 return -1; 548 553 } 554 555 /* Finish the last packet we read - for backwards compatibility */ 556 if (libtrace->last_packet) 557 trace_fin_packet(libtrace->last_packet); 558 assert(libtrace->last_packet == NULL); 559 549 560 if (libtrace->format->pause_input) 550 561 libtrace->format->pause_input(libtrace); 562 551 563 libtrace->started=false; 552 564 return 0; … … 687 699 } 688 700 701 /* Finish any the last packet we read - for backwards compatibility */ 702 if (libtrace->last_packet) 703 trace_fin_packet(libtrace->last_packet); 704 assert(libtrace->last_packet == NULL); 705 689 706 if (libtrace->format) { 690 707 if (libtrace->started && libtrace->format->pause_input) … … 803 820 packet->trace->format->fin_packet(packet); 804 821 } 822 if (packet->trace && packet->trace->last_packet == packet) 823 packet->trace->last_packet = NULL; 805 824 806 825 if (packet->buf_control == TRACE_CTRL_PACKET && packet->buffer) { … … 825 844 packet->trace->format->fin_packet(packet); 826 845 } 846 if (packet->trace && packet->trace->last_packet == packet) 847 packet->trace->last_packet = NULL; 827 848 828 849 // No matter what we remove the header and link pointers … … 903 924 ++libtrace->accepted_packets; 904 925 ++libtrace->sequence_number; 926 libtrace->last_packet = packet; 905 927 return ret; 906 928 } while(1); -
lib/trace_parallel.c
rd3849c7 rf2066fa 113 113 }; 114 114 115 116 #ifdef ENABLE_MEM_STATS 115 117 // Grrr gcc wants this spelt out 116 118 __thread struct mem_stats mem_hits = {{0},{0},{0},{0}}; 117 119 120 118 121 static void print_memory_stats() { 119 #if 0120 122 uint64_t total; 121 123 #if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__) … … 163 165 total, (double) mem_hits.writebulk.miss / (double) total * 100.0); 164 166 } 167 } 168 #else 169 static void print_memory_stats() {} 165 170 #endif 171 172 static const libtrace_generic_t gen_zero = {0}; 173 174 /* This should optimise away the switch to nothing in the explict cases */ 175 static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type, 176 libtrace_generic_t data, libtrace_thread_t *sender) { 177 fn_cb_dataless fn = NULL; 178 switch (type) { 179 case MESSAGE_STARTING: 180 if (trace->callbacks.message_starting) 181 thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob); 182 else if (trace->per_msg) 183 (*trace->per_msg)(trace, thread, type, data, sender); 184 return; 185 case MESSAGE_FIRST_PACKET: 186 if (trace->callbacks.message_first_packet) 187 (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender); 188 else if (trace->per_msg) 189 (*trace->per_msg)(trace, thread, type, data, sender); 190 return; 191 case MESSAGE_TICK_COUNT: 192 if (trace->callbacks.message_tick_count) 193 (*trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64); 194 else if (trace->per_msg) 195 (*trace->per_msg)(trace, thread, type, data, sender); 196 return; 197 case MESSAGE_TICK_INTERVAL: 198 if (trace->callbacks.message_tick_interval) 199 (*trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data, data.uint64); 200 else if (trace->per_msg) 201 (*trace->per_msg)(trace, thread, type, data, sender); 202 return; 203 case MESSAGE_STOPPING: 204 fn = trace->callbacks.message_stopping; 205 break; 206 case MESSAGE_RESUMING: 207 fn = trace->callbacks.message_resuming; 208 break; 209 case MESSAGE_PAUSING: 210 fn = trace->callbacks.message_pausing; 211 break; 212 213 /* These should be unused */ 214 case MESSAGE_DO_PAUSE: 215 case MESSAGE_DO_STOP: 216 case MESSAGE_POST_REPORTER: 217 case MESSAGE_RESULT: 218 case MESSAGE_PACKET: 219 return; 220 case MESSAGE_USER: 221 break; 222 } 223 if (fn) 224 (*fn)(trace, thread, trace->global_blob, thread->user_data); 225 else if (trace->per_msg) 226 (*trace->per_msg)(trace, thread, type, data, sender); 166 227 } 167 228 … … 304 365 pthread_t tid = pthread_self(); 305 366 // Check if we are reporter or something else 306 if (pthread_equal(tid, libtrace->reporter_thread.tid)) 367 if (libtrace->hasher_thread.type == THREAD_REPORTER && 368 pthread_equal(tid, libtrace->reporter_thread.tid)) 307 369 ret = &libtrace->reporter_thread; 308 else if (pthread_equal(tid, libtrace->hasher_thread.tid)) 370 else if (libtrace->hasher_thread.type == THREAD_HASHER && 371 pthread_equal(tid, libtrace->hasher_thread.tid)) 309 372 ret = &libtrace->hasher_thread; 310 373 else … … 316 379 DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) { 317 380 // Duplicate the packet in standard malloc'd memory and free the 318 // original, This is a 1:1 exchange so isocache count remains unchanged.381 // original, This is a 1:1 exchange so the ocache count remains unchanged. 319 382 if (pkt->buf_control != TRACE_CTRL_PACKET) { 320 383 libtrace_packet_t *dup; … … 324 387 /* Copy the duplicated packet over the existing */ 325 388 memcpy(pkt, dup, sizeof(libtrace_packet_t)); 389 /* Free the packet structure */ 390 free(dup); 326 391 } 327 392 } … … 377 442 t->accepted_packets++; 378 443 libtrace_generic_t data = {.pkt = *packet}; 379 *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t); 444 if (trace->callbacks.message_packet) 445 *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet); 446 else if (trace->per_msg) 447 *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t); 380 448 trace_fin_packet(*packet); 381 449 } else { 382 450 assert((*packet)->error == READ_TICK); 383 451 libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)}; 384 (*trace->per_pkt)(trace, t, MESSAGE_TICK_COUNT, data, t);452 send_message(trace, t, MESSAGE_TICK_COUNT, data, t); 385 453 } 386 454 return 0; … … 449 517 450 518 /* Let the user thread know we are going to pause */ 451 (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);519 send_message(trace, t, MESSAGE_PAUSING, gen_zero, t); 452 520 453 521 /* Send through any remaining packets (or messages) without delay */ … … 490 558 /* Now we do the actual pause, this returns when we resumed */ 491 559 trace_thread_pause(trace, t); 492 (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);560 send_message(trace, t, MESSAGE_RESUMING, gen_zero, t); 493 561 return 1; 494 562 } … … 523 591 524 592 if (trace->format->pregister_thread) { 525 trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));593 trace->format->pregister_thread(trace, t, trace_is_parallel(trace)); 526 594 } 527 595 … … 535 603 536 604 /* Let the per_packet function know we have started */ 537 (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);538 (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);605 send_message(trace, t, MESSAGE_STARTING, gen_zero, t); 606 send_message(trace, t, MESSAGE_RESUMING, gen_zero, t); 539 607 540 608 for (;;) { … … 556 624 goto eof; 557 625 } 558 (*trace->per_pkt)(trace, t, message.code, message.data, message.sender); 626 (*trace->per_msg)(trace, t, message.code, message.data, message.sender); 627 (*trace->per_msg)(trace, t, message.code, message.data, message.sender); 559 628 /* Continue and the empty messages out before packets */ 560 629 continue; … … 619 688 620 689 // Let the per_packet function know we have stopped 621 (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);622 (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t);690 send_message(trace, t, MESSAGE_PAUSING, gen_zero, t); 691 send_message(trace, t, MESSAGE_STOPPING, gen_zero, t); 623 692 624 693 // Free any remaining packets … … 1246 1315 */ 1247 1316 static int trace_prestart(libtrace_t * libtrace, void *global_blob, 1248 fn_ per_pkt per_pkt, fn_reporter reporter) {1317 fn_cb_msg per_msg, fn_reporter reporter) { 1249 1318 int i, err = 0; 1250 1319 if (libtrace->state != STATE_PAUSED) { … … 1289 1358 1290 1359 /* Update functions if requested */ 1291 if (per_ pkt)1292 libtrace->per_ pkt = per_pkt;1293 assert(libtrace->per_ pkt);1360 if (per_msg) 1361 libtrace->per_msg = per_msg; 1362 assert(libtrace->per_msg); 1294 1363 if (reporter) 1295 1364 libtrace->reporter = reporter; … … 1297 1366 libtrace->global_blob = global_blob; 1298 1367 1299 if (libtrace->perpkt_thread_count > 1 && 1300 trace_supports_parallel(libtrace) && 1301 !trace_has_dedicated_hasher(libtrace)) { 1368 if (trace_is_parallel(libtrace)) { 1302 1369 err = libtrace->format->pstart_input(libtrace); 1303 1370 } else { … … 1512 1579 } 1513 1580 1581 DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) { 1582 if (libtrace->state == STATE_NEW) 1583 return trace_supports_parallel(libtrace); 1584 return libtrace->pread == trace_pread_packet_wrapper; 1585 } 1586 1514 1587 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 1515 fn_ per_pkt per_pkt, fn_reporter reporter) {1588 fn_cb_msg per_msg, fn_reporter reporter) { 1516 1589 int i; 1517 1590 int ret = -1; … … 1526 1599 1527 1600 if (libtrace->state == STATE_PAUSED) { 1528 ret = trace_prestart(libtrace, global_blob, per_ pkt, reporter);1601 ret = trace_prestart(libtrace, global_blob, per_msg, reporter); 1529 1602 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1530 1603 return ret; … … 1541 1614 /* Store the user defined things against the trace */ 1542 1615 libtrace->global_blob = global_blob; 1543 libtrace->per_ pkt = per_pkt;1616 libtrace->per_msg = per_msg; 1544 1617 libtrace->reporter = reporter; 1545 1618 /* And zero other fields */ … … 1559 1632 verify_configuration(libtrace); 1560 1633 1634 ret = -1; 1561 1635 /* Try start the format - we prefer parallel over single threaded, as 1562 1636 * these formats should support messages better */ … … 1565 1639 ret = libtrace->format->pstart_input(libtrace); 1566 1640 libtrace->pread = trace_pread_packet_wrapper; 1567 } else { 1641 } 1642 if (ret != 0) { 1568 1643 if (libtrace->format->start_input) { 1569 1644 ret = libtrace->format->start_input(libtrace); … … 1703 1778 libtrace->perpkt_thread_states[THREAD_FINISHED] = 0; 1704 1779 cleanup_started: 1705 if (trace_supports_parallel(libtrace) && 1706 !trace_has_dedicated_hasher(libtrace) 1707 && libtrace->perpkt_thread_count > 1) { 1780 if (libtrace->pread == trace_pread_packet_wrapper) { 1708 1781 if (libtrace->format->ppause_input) 1709 1782 libtrace->format->ppause_input(libtrace); … … 1720 1793 } 1721 1794 1795 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) { 1796 libtrace->callbacks.message_starting = handler; 1797 return 0; 1798 } 1799 1800 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) { 1801 libtrace->callbacks.message_pausing = handler; 1802 return 0; 1803 } 1804 1805 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) { 1806 libtrace->callbacks.message_resuming = handler; 1807 return 0; 1808 } 1809 1810 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) { 1811 libtrace->callbacks.message_stopping = handler; 1812 return 0; 1813 } 1814 1815 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) { 1816 libtrace->callbacks.message_packet = handler; 1817 return 0; 1818 } 1819 1820 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) { 1821 libtrace->callbacks.message_first_packet = handler; 1822 return 0; 1823 } 1824 1825 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) { 1826 libtrace->callbacks.message_tick_count = handler; 1827 return 0; 1828 } 1829 1830 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) { 1831 libtrace->callbacks.message_tick_interval = handler; 1832 return 0; 1833 } 1834 1722 1835 /* 1723 1836 * Pauses a trace, this should only be called by the main thread … … 1728 1841 * 1729 1842 * Once done you should be able to modify the trace setup and call pstart again 1730 * TODO handle changing thread numbers1843 * TODO add support to change the number of threads. 1731 1844 */ 1732 1845 DLLEXPORT int trace_ppause(libtrace_t *libtrace) … … 1826 1939 // Save the statistics against the trace 1827 1940 trace_get_statistics(libtrace, NULL); 1828 if (trace_ supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {1941 if (trace_is_parallel(libtrace)) { 1829 1942 libtrace->started = false; 1830 1943 if (libtrace->format->ppause_input) … … 2248 2361 return 0; 2249 2362 } 2250 2251 2252 2363 2253 2364 static bool config_bool_parse(char *value, size_t nvalue) {
Note: See TracChangeset
for help on using the changeset viewer.