Changeset f625817
- Timestamp:
- 09/11/15 15:00:27 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 322c516
- Parents:
- 8c7490fe
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/combiner_ordered.c
r3dd5acc rf625817 65 65 libtrace_generic_t gt = {.res = &r}; 66 66 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1); 67 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread); 67 send_message(trace, &trace->reporter_thread, 68 MESSAGE_RESULT, gt, 69 &trace->reporter_thread); 68 70 return 0; 69 71 } … … 88 90 libtrace_generic_t gt = {.res = &r}; 89 91 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1); 90 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread); 92 send_message(trace, &trace->reporter_thread, 93 MESSAGE_RESULT, gt, 94 &trace->reporter_thread); 91 95 return 0; 92 96 } … … 152 156 153 157 ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1); 154 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread); 158 send_message(trace, &trace->reporter_thread, 159 MESSAGE_RESULT, gt, 160 NULL); 155 161 156 162 // Now update the one we just removed -
lib/combiner_sorted.c
r3dd5acc rf625817 67 67 continue; 68 68 } 69 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread); 69 send_message(trace, &trace->reporter_thread, MESSAGE_RESULT, 70 gt, NULL); 70 71 } 71 72 libtrace_vector_empty(&queues[0]); -
lib/combiner_unordered.c
r3dd5acc rf625817 49 49 c->last_count_tick = r.key; 50 50 } 51 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread); 51 send_message(trace, &trace->reporter_thread, 52 MESSAGE_RESULT, gt, NULL); 52 53 } 53 54 } -
lib/libtrace.h.in
rd420777 rf625817 252 252 /** Opaque structure holding information about libtrace thread */ 253 253 typedef struct libtrace_thread_t libtrace_thread_t; 254 255 /** Opaque structure holding callback functions for libtrace threads */ 256 typedef struct callback_set libtrace_callback_set_t; 254 257 255 258 /** If the packet has allocated its own memory the buffer_control should be … … 1480 1483 DLLEXPORT void trace_destroy_output(libtrace_out_t *trace); 1481 1484 1485 /** Create a callback set that can be used to define callbacks for parallel 1486 * libtrace threads. 1487 * 1488 * @return A pointer to a freshly allocated callback set. 1489 */ 1490 DLLEXPORT libtrace_callback_set_t *trace_create_callback_set(); 1491 1492 /** Destroys a callback set, freeing up an resources it was using. 1493 * 1494 * @param cbset The callback set to be destroyed. 1495 */ 1496 DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset); 1497 1498 1482 1499 /** Check (and clear) the current error state of an input trace 1483 1500 * @param trace The input trace to check the error state on -
lib/libtrace_int.h
r2fa43fa rf625817 284 284 }; 285 285 #define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration)); 286 287 struct callback_set { 288 289 fn_cb_starting message_starting; 290 fn_cb_dataless message_stopping; 291 fn_cb_dataless message_resuming; 292 fn_cb_dataless message_pausing; 293 fn_cb_packet message_packet; 294 fn_cb_result message_result; 295 fn_cb_first_packet message_first_packet; 296 fn_cb_tick message_tick_count; 297 fn_cb_tick message_tick_interval; 298 fn_cb_usermessage message_user; 299 }; 286 300 287 301 /** A libtrace input trace … … 333 347 /** The actual freelist */ 334 348 libtrace_ocache_t packet_freelist; 335 /** User defined per_msg function called when a message is ready */336 fn_cb_msg per_msg;337 /** User defined reporter function entry point */338 fn_reporter reporter;339 349 /** The hasher function */ 340 350 enum hasher_types hasher_type; … … 361 371 struct user_configuration config; 362 372 libtrace_combine_t combiner; 363 struct { 364 fn_cb_starting message_starting; 365 fn_cb_dataless message_stopping; 366 fn_cb_dataless message_resuming; 367 fn_cb_dataless message_pausing; 368 fn_cb_packet message_packet; 369 fn_cb_first_packet message_first_packet; 370 fn_cb_tick message_tick_count; 371 fn_cb_tick message_tick_interval; 372 } callbacks; 373 374 /* Set of callbacks to be executed by per packet threads in response 375 * to various messages. */ 376 struct callback_set *perpkt_cbs; 377 /* Set of callbacks to be executed by the reporter thread in response 378 * to various messages. */ 379 struct callback_set *reporter_cbs; 373 380 }; 374 381 … … 380 387 libtrace_thread_t * get_thread_table(libtrace_t *libtrace); 381 388 389 390 void send_message(libtrace_t *trace, libtrace_thread_t *target, 391 const enum libtrace_messages type, 392 libtrace_generic_t data, libtrace_thread_t *sender); 382 393 383 394 /** A libtrace output trace -
lib/libtrace_parallel.h
r3dd5acc rf625817 460 460 * @param libtrace The input trace to start 461 461 * @param global_blob Global data related to this trace accessible using trace_get_global() 462 * @param per_ msg A user supplied function called when a message is ready463 * @param reporter A user supplied function called when a result is ready.462 * @param per_packet_cbs A set of user supplied functions to be called in response to events being observed by the per_pkt threads. 463 * @param reporter_cbs A set of user supplied functions to be called in response to events / results being seen by the reporter thread. 464 464 * Optional if NULL the reporter thread will not be started. 465 465 * @return 0 on success, otherwise -1 to indicate an error has occurred … … 467 467 * This can also be used to restart an existing parallel trace, 468 468 * that has previously been paused using trace_ppause(). 469 * In this case global_blob,per_msg and reporter will only be updated 470 * if they are non-null. Otherwise their previous values will be maintained. 469 * In this case global_blob, per_packet_cbs and reporter_cbs will only be 470 * updated if they are non-null. Otherwise their previous values will be 471 * maintained. 471 472 * 472 473 */ 473 474 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 474 fn_cb_msg per_msg, fn_reporter reporter); 475 libtrace_callback_set_t *per_packet_cbs, 476 libtrace_callback_set_t *reporter_cbs); 475 477 476 478 /** … … 539 541 libtrace_packet_t *packet); 540 542 543 /** 544 * Callback for handling a result message. Should only be required by the 545 * reporter thread. 546 * 547 * @param libtrace The parallel trace 548 * @param sender The thread that generated this result 549 * @param global The global storage 550 * @param tls The thread local storage 551 * @param result The result associated with the message 552 * 553 */ 554 typedef void (*fn_cb_result)(libtrace_t *libtrace, libtrace_thread_t *sender, 555 void *global, void *tls, libtrace_result_t *result); 556 557 558 /** 559 * Callback for handling any user-defined message types. This will handle 560 * any messages with a type >= MESSAGE_USER. 561 * 562 * @param libtrace The parallel trace 563 * @param t The thread 564 * @param global The global storage 565 * @param tls The thread local storage 566 * @param mesg The code identifying the message type 567 * @param data The data associated with the message 568 * 569 */ 570 typedef void (*fn_cb_usermessage) (libtrace_t *libtrace, libtrace_thread_t *t, 571 void *global, void *tls, int mesg, libtrace_generic_t data); 572 541 573 /** Registers a built-in message with a handler. 542 574 * Note we do not include the sending thread as an argument to the reporter. … … 548 580 */ 549 581 550 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler); 551 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler); 552 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler); 553 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler); 554 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler); 555 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler); 556 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler); 557 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler); 582 DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset, 583 fn_cb_starting handler); 584 585 DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset, 586 fn_cb_dataless handler); 587 588 DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset, 589 fn_cb_dataless handler); 590 591 DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset, 592 fn_cb_dataless handler); 593 594 DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset, 595 fn_cb_packet handler); 596 597 DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset, 598 fn_cb_first_packet handler); 599 600 DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset, 601 fn_cb_result handler); 602 603 DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset, 604 fn_cb_tick handler); 605 606 DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset, 607 fn_cb_tick handler); 608 609 DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset, 610 fn_cb_usermessage handler); 558 611 559 612 /** Pauses a trace previously started with trace_pstart() -
lib/trace.c
r0a368ae rf625817 269 269 libtrace->perpkt_queue_full = false; 270 270 libtrace->global_blob = NULL; 271 libtrace->per_msg = NULL;272 libtrace->reporter = NULL;273 271 libtrace->hasher = NULL; 274 272 libtrace_zero_ocache(&libtrace->packet_freelist); … … 288 286 ZERO_USER_CONFIG(libtrace->config); 289 287 memset(&libtrace->combiner, 0, sizeof(libtrace->combiner)); 290 memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks)); 288 libtrace->perpkt_cbs = NULL; 289 libtrace->reporter_cbs = NULL; 291 290 292 291 /* Parse the URI to determine what sort of trace we are dealing with */ … … 393 392 libtrace->perpkt_queue_full = false; 394 393 libtrace->global_blob = NULL; 395 libtrace->per_msg = NULL;396 libtrace->reporter = NULL;397 394 libtrace->hasher = NULL; 398 395 libtrace_zero_ocache(&libtrace->packet_freelist); … … 409 406 ZERO_USER_CONFIG(libtrace->config); 410 407 memset(&libtrace->combiner, 0, sizeof(libtrace->combiner)); 411 memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks)); 408 libtrace->perpkt_cbs = NULL; 409 libtrace->reporter_cbs = NULL; 412 410 413 411 for(tmp=formats_list;tmp;tmp=tmp->next) { … … 721 719 // This has all of our packets 722 720 libtrace_ocache_destroy(&libtrace->packet_freelist); 723 if (libtrace->combiner.destroy && libtrace->reporter )721 if (libtrace->combiner.destroy && libtrace->reporter_cbs) 724 722 libtrace->combiner.destroy(libtrace, &libtrace->combiner); 725 723 free(libtrace->perpkt_threads); 726 724 libtrace->perpkt_threads = NULL; 727 725 libtrace->perpkt_thread_count = 0; 728 } 726 727 } 728 729 if (libtrace->perpkt_cbs) 730 trace_destroy_callback_set(libtrace->perpkt_cbs); 731 if (libtrace->reporter_cbs) 732 trace_destroy_callback_set(libtrace->reporter_cbs); 733 729 734 730 735 if (libtrace->event.packet) { -
lib/trace_parallel.c
r9346e4a rf625817 80 80 81 81 #include "libtrace.h" 82 #include "libtrace_ int.h"82 #include "libtrace_parallel.h" 83 83 84 84 #ifdef HAVE_PCAP_BPF_H … … 173 173 174 174 /* This should optimise away the switch to nothing in the explict cases */ 175 static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type, 176 libtrace_generic_t data, libtrace_thread_t *sender) { 175 inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, 176 const enum libtrace_messages type, 177 libtrace_generic_t data, libtrace_thread_t *sender) { 178 177 179 fn_cb_dataless fn = NULL; 178 switch (type) { 180 enum libtrace_messages switchtype; 181 libtrace_callback_set_t *cbs = NULL; 182 183 if (thread == &trace->reporter_thread) { 184 cbs = trace->reporter_cbs; 185 } else { 186 cbs = trace->perpkt_cbs; 187 } 188 189 if (cbs == NULL) 190 return; 191 192 if (type >= MESSAGE_USER) 193 switchtype = MESSAGE_USER; 194 else 195 switchtype = (enum libtrace_messages) type; 196 197 switch (switchtype) { 179 198 case MESSAGE_STARTING: 180 if (trace->callbacks.message_starting) 181 thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob); 182 else if (trace->per_msg) 183 (*trace->per_msg)(trace, thread, type, data, sender); 199 if (cbs->message_starting) 200 thread->user_data = (*cbs->message_starting)(trace, 201 thread, trace->global_blob); 184 202 return; 185 203 case MESSAGE_FIRST_PACKET: 186 if ( trace->callbacks.message_first_packet)187 (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender);188 else if (trace->per_msg) 189 (*trace->per_msg)(trace, thread, type, data, sender);204 if (cbs->message_first_packet) 205 (*cbs->message_first_packet)(trace, thread, 206 trace->global_blob, thread->user_data, 207 data.pkt, sender); 190 208 return; 191 209 case MESSAGE_TICK_COUNT: 192 if ( trace->callbacks.message_tick_count)193 (* trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64);194 else if (trace->per_msg) 195 (*trace->per_msg)(trace, thread, type, data, sender);210 if (cbs->message_tick_count) 211 (*cbs->message_tick_count)(trace, thread, 212 trace->global_blob, thread->user_data, 213 data.uint64); 196 214 return; 197 215 case MESSAGE_TICK_INTERVAL: 198 if ( trace->callbacks.message_tick_interval)199 (* trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data, data.uint64);200 else if (trace->per_msg) 201 (*trace->per_msg)(trace, thread, type, data, sender);216 if (cbs->message_tick_interval) 217 (*cbs->message_tick_interval)(trace, thread, 218 trace->global_blob, thread->user_data, 219 data.uint64); 202 220 return; 203 221 case MESSAGE_STOPPING: 204 fn = trace->callbacks.message_stopping;222 fn = cbs->message_stopping; 205 223 break; 206 224 case MESSAGE_RESUMING: 207 fn = trace->callbacks.message_resuming;225 fn = cbs->message_resuming; 208 226 break; 209 227 case MESSAGE_PAUSING: 210 fn = trace->callbacks.message_pausing;228 fn = cbs->message_pausing; 211 229 break; 230 case MESSAGE_USER: 231 if (cbs->message_user) 232 (*cbs->message_user)(trace, thread, trace->global_blob, 233 thread->user_data, type, data); 234 return; 235 case MESSAGE_RESULT: 236 if (cbs->message_result) 237 (*cbs->message_result)(trace, thread, 238 trace->global_blob, thread->user_data, 239 data.res); 212 240 213 241 /* These should be unused */ … … 215 243 case MESSAGE_DO_STOP: 216 244 case MESSAGE_POST_REPORTER: 217 case MESSAGE_RESULT:218 245 case MESSAGE_PACKET: 219 246 return; 220 case MESSAGE_USER: 221 break; 222 } 247 } 248 223 249 if (fn) 224 250 (*fn)(trace, thread, trace->global_blob, thread->user_data); 225 else if (trace->per_msg) 226 (*trace->per_msg)(trace, thread, type, data, sender); 251 } 252 253 DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() { 254 libtrace_callback_set_t *cbset; 255 256 cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t)); 257 memset(cbset, 0, sizeof(libtrace_callback_set_t)); 258 return cbset; 259 } 260 261 DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) { 262 free(cbset); 227 263 } 228 264 … … 239 275 { 240 276 assert(libtrace->state != STATE_NEW); 241 return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter ;277 return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs; 242 278 } 243 279 … … 441 477 } 442 478 t->accepted_packets++; 443 libtrace_generic_t data = {.pkt = *packet}; 444 if (trace->callbacks.message_packet) 445 *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet); 446 else if (trace->per_msg) 447 *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t); 479 if (trace->perpkt_cbs->message_packet) 480 *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet); 448 481 trace_fin_packet(*packet); 449 482 } else { … … 624 657 goto eof; 625 658 } 626 (*trace->per_msg)(trace, t, message.code, message.data, message.sender); 627 (*trace->per_msg)(trace, t, message.code, message.data,message.sender);659 send_message(trace, t, message.code, message.data, 660 message.sender); 628 661 /* Continue and the empty messages out before packets */ 629 662 continue; … … 1067 1100 } 1068 1101 1069 (*trace->reporter)(trace, MESSAGE_STARTING, (libtrace_generic_t){0}, t);1070 (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);1102 send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t); 1103 send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t); 1071 1104 1072 1105 while (!trace_has_finished(trace)) { … … 1085 1118 assert(trace->combiner.pause); 1086 1119 trace->combiner.pause(trace, &trace->combiner); 1087 (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t); 1120 send_message(trace, t, MESSAGE_PAUSING, 1121 (libtrace_generic_t) {0}, t); 1088 1122 trace_thread_pause(trace, t); 1089 (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t); 1123 send_message(trace, t, MESSAGE_RESUMING, 1124 (libtrace_generic_t) {0}, t); 1090 1125 break; 1091 1126 default: 1092 (*trace->reporter)(trace, message.code, message.data, message.sender); 1127 send_message(trace, t, message.code, message.data, 1128 message.sender); 1093 1129 } 1094 1130 } … … 1098 1134 1099 1135 // GOODBYE 1100 (*trace->reporter)(trace, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);1101 (*trace->reporter)(trace, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);1136 send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t); 1137 send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t); 1102 1138 1103 1139 thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true); … … 1315 1351 */ 1316 1352 static int trace_prestart(libtrace_t * libtrace, void *global_blob, 1317 fn_cb_msg per_msg, fn_reporter reporter) { 1353 libtrace_callback_set_t *per_packet_cbs, 1354 libtrace_callback_set_t *reporter_cbs) { 1318 1355 int i, err = 0; 1319 1356 if (libtrace->state != STATE_PAUSED) { … … 1358 1395 1359 1396 /* Update functions if requested */ 1360 if (per_msg)1361 libtrace->per_msg = per_msg;1362 assert(libtrace->per_msg);1363 if (reporter)1364 libtrace->reporter = reporter;1365 1397 if(global_blob) 1366 1398 libtrace->global_blob = global_blob; 1399 1400 if (per_packet_cbs) { 1401 if (libtrace->perpkt_cbs) 1402 trace_destroy_callback_set(libtrace->perpkt_cbs); 1403 libtrace->perpkt_cbs = trace_create_callback_set(); 1404 memcpy(libtrace->perpkt_cbs, per_packet_cbs, 1405 sizeof(libtrace_callback_set_t)); 1406 } 1407 1408 if (reporter_cbs) { 1409 if (libtrace->reporter_cbs) 1410 trace_destroy_callback_set(libtrace->reporter_cbs); 1411 1412 libtrace->reporter_cbs = trace_create_callback_set(); 1413 memcpy(libtrace->reporter_cbs, reporter_cbs, 1414 sizeof(libtrace_callback_set_t)); 1415 } 1367 1416 1368 1417 if (trace_is_parallel(libtrace)) { … … 1586 1635 1587 1636 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 1588 fn_cb_msg per_msg, fn_reporter reporter) { 1637 libtrace_callback_set_t *per_packet_cbs, 1638 libtrace_callback_set_t *reporter_cbs) { 1589 1639 int i; 1590 1640 int ret = -1; … … 1599 1649 1600 1650 if (libtrace->state == STATE_PAUSED) { 1601 ret = trace_prestart(libtrace, global_blob, per_msg, reporter); 1651 ret = trace_prestart(libtrace, global_blob, per_packet_cbs, 1652 reporter_cbs); 1602 1653 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1603 1654 return ret; … … 1614 1665 /* Store the user defined things against the trace */ 1615 1666 libtrace->global_blob = global_blob; 1616 libtrace->per_msg = per_msg; 1617 libtrace->reporter = reporter; 1667 1668 /* Save a copy of the callbacks in case the user tries to change them 1669 * on us later */ 1670 if (!per_packet_cbs) { 1671 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart " 1672 "requires a non-NULL set of per packet " 1673 "callbacks."); 1674 goto cleanup_none; 1675 } 1676 1677 if (per_packet_cbs->message_packet == NULL) { 1678 trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per " 1679 "packet callbacks must include a handler " 1680 "for a packet. Please set this using " 1681 "trace_set_packet_cb()."); 1682 goto cleanup_none; 1683 } 1684 1685 libtrace->perpkt_cbs = trace_create_callback_set(); 1686 memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t)); 1687 1688 if (reporter_cbs) { 1689 libtrace->reporter_cbs = trace_create_callback_set(); 1690 memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t)); 1691 } 1692 1693 1694 1695 1618 1696 /* And zero other fields */ 1619 1697 for (i = 0; i < THREAD_STATE_MAX; ++i) { … … 1694 1772 1695 1773 /* Start the reporter thread */ 1696 if (reporter ) {1774 if (reporter_cbs) { 1697 1775 if (libtrace->combiner.initialise) 1698 1776 libtrace->combiner.initialise(libtrace, &libtrace->combiner); … … 1793 1871 } 1794 1872 1795 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) { 1796 libtrace->callbacks.message_starting = handler; 1797 return 0; 1798 } 1799 1800 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) { 1801 libtrace->callbacks.message_pausing = handler; 1802 return 0; 1803 } 1804 1805 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) { 1806 libtrace->callbacks.message_resuming = handler; 1807 return 0; 1808 } 1809 1810 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) { 1811 libtrace->callbacks.message_stopping = handler; 1812 return 0; 1813 } 1814 1815 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) { 1816 libtrace->callbacks.message_packet = handler; 1817 return 0; 1818 } 1819 1820 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) { 1821 libtrace->callbacks.message_first_packet = handler; 1822 return 0; 1823 } 1824 1825 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) { 1826 libtrace->callbacks.message_tick_count = handler; 1827 return 0; 1828 } 1829 1830 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) { 1831 libtrace->callbacks.message_tick_interval = handler; 1873 DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset, 1874 fn_cb_starting handler) { 1875 cbset->message_starting = handler; 1876 return 0; 1877 } 1878 1879 DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset, 1880 fn_cb_dataless handler) { 1881 cbset->message_pausing = handler; 1882 return 0; 1883 } 1884 1885 DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset, 1886 fn_cb_dataless handler) { 1887 cbset->message_resuming = handler; 1888 return 0; 1889 } 1890 1891 DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset, 1892 fn_cb_dataless handler) { 1893 cbset->message_stopping = handler; 1894 return 0; 1895 } 1896 1897 DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset, 1898 fn_cb_packet handler) { 1899 cbset->message_packet = handler; 1900 return 0; 1901 } 1902 1903 DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset, 1904 fn_cb_first_packet handler) { 1905 cbset->message_first_packet = handler; 1906 return 0; 1907 } 1908 1909 DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset, 1910 fn_cb_tick handler) { 1911 cbset->message_tick_count = handler; 1912 return 0; 1913 } 1914 1915 DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset, 1916 fn_cb_tick handler) { 1917 cbset->message_tick_interval = handler; 1918 return 0; 1919 } 1920 1921 DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset, 1922 fn_cb_result handler) { 1923 cbset->message_result = handler; 1924 return 0; 1925 } 1926 1927 DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset, 1928 fn_cb_usermessage handler) { 1929 cbset->message_user = handler; 1832 1930 return 0; 1833 1931 } -
tools/tracertstats/tracertstats.c
r8c7490fe rf625817 130 130 131 131 static uint64_t glob_last_ts = 0; 132 static void process_result(libtrace_t *trace, int mesg,133 libtrace_generic_t data,134 libtrace_thread_t *sender UNUSED) {132 static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED, 133 void *global UNUSED, void *tls UNUSED, 134 libtrace_result_t *result) { 135 135 uint64_t ts = 0; 136 136 static bool stopped = false; … … 142 142 return; 143 143 144 switch (mesg) { 145 case MESSAGE_RESULT: 146 ts = data.res->key; 147 res = data.res->value.ptr; 148 if (glob_last_ts == 0) 149 glob_last_ts = ts; 150 while ((glob_last_ts >> 32) < (ts >> 32)) { 151 report_results(glob_last_ts >> 32, count, bytes); 152 count = 0; 153 bytes = 0; 154 for (j = 0; j < filter_count; j++) 155 filters[j].count = filters[j].bytes = 0; 156 glob_last_ts = ts; 157 } 158 count += res->total.count; 159 packets_seen += res->total.count; 160 bytes += res->total.bytes; 161 for (j = 0; j < filter_count; j++) { 162 filters[j].count += res->filters[j].count; 163 filters[j].bytes += res->filters[j].bytes; 164 } 165 free(res); 166 } 144 ts = result->key; 145 res = result->value.ptr; 146 if (glob_last_ts == 0) 147 glob_last_ts = ts; 148 while ((glob_last_ts >> 32) < (ts >> 32)) { 149 report_results(glob_last_ts >> 32, count, bytes); 150 count = 0; 151 bytes = 0; 152 for (j = 0; j < filter_count; j++) 153 filters[j].count = filters[j].bytes = 0; 154 glob_last_ts = ts; 155 } 156 count += res->total.count; 157 packets_seen += res->total.count; 158 bytes += res->total.bytes; 159 for (j = 0; j < filter_count; j++) { 160 filters[j].count += res->filters[j].count; 161 filters[j].bytes += res->filters[j].bytes; 162 } 163 free(res); 167 164 168 165 /* Be careful to only call pstop once from within this thread! */ … … 246 243 { 247 244 libtrace_t *trace = NULL; 248 if (!merge_inputs) 245 libtrace_callback_set_t *pktcbs, *repcbs; 246 247 if (!merge_inputs) 249 248 create_output(uri); 250 249 … … 268 267 } 269 268 270 trace_cb_starting(trace, cb_starting); 271 trace_cb_stopping(trace, cb_stopping); 272 trace_cb_packet(trace, cb_packet); 273 trace_cb_tick_count(trace, cb_tick); 274 trace_cb_tick_interval(trace, cb_tick); 275 276 if (trace_pstart(trace, NULL, NULL, process_result)==-1) { 269 pktcbs = trace_create_callback_set(); 270 trace_set_starting_cb(pktcbs, cb_starting); 271 trace_set_stopping_cb(pktcbs, cb_stopping); 272 trace_set_packet_cb(pktcbs, cb_packet); 273 trace_set_tick_count_cb(pktcbs, cb_tick); 274 trace_set_tick_interval_cb(pktcbs, cb_tick); 275 276 repcbs = trace_create_callback_set(); 277 trace_set_result_cb(repcbs, cb_result); 278 279 if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) { 277 280 trace_perror(trace,"Failed to start trace"); 278 281 trace_destroy(trace); 282 trace_destroy_callback_set(pktcbs); 283 trace_destroy_callback_set(repcbs); 279 284 if (!merge_inputs) 280 285 output_destroy(output); … … 292 297 293 298 trace_destroy(trace); 299 trace_destroy_callback_set(pktcbs); 300 trace_destroy_callback_set(repcbs); 294 301 295 302 if (!merge_inputs) -
tools/tracestats/Makefile.am
r29bbef0 rf625817 1 bin_PROGRAMS = tracestats tracestats_parallel1 bin_PROGRAMS = tracestats 2 2 bin_SCRIPTS = tracesummary 3 3 … … 7 7 include ../Makefile.tools 8 8 tracestats_SOURCES = tracestats.c 9 tracestats_parallel_SOURCES = tracestats_parallel.c
Note: See TracChangeset
for help on using the changeset viewer.