Changeset 76291d1
- Timestamp:
- 03/31/15 16:02:53 (7 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 4007dbb
- Parents:
- 58bfabf
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/libtrace_int.h
rc723e9e r76291d1 358 358 struct user_configuration config; 359 359 libtrace_combine_t combiner; 360 struct { 361 fn_handler message_starting; 362 fn_handler message_stopping; 363 fn_handler message_resuming; 364 fn_handler message_pausing; 365 fn_handler message_packet; 366 } callbacks; 360 367 }; 361 368 -
lib/libtrace_parallel.h
rd3849c7 r76291d1 465 465 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 466 466 fn_per_pkt per_pkt, fn_reporter reporter); 467 468 469 /** 470 * @param libtrace The parallel trace 471 * @param t The thread 472 * @param data The data associated with the message 473 * @param global The global storage 474 * @param tls The thread local storage 475 */ 476 typedef void* (*fn_handler)(libtrace_t *libtrace, 477 libtrace_thread_t *t, 478 libtrace_generic_t data, 479 void *global, 480 void *tls); 481 482 /** Registers a built-in message with a handler. 483 * Note we do not include the sending thread as an argument to the reporter. 484 * If set to NULL, the message will be sent to default perpkt handler. 485 * 486 * @param libtrace The input trace to start 487 * @param message The message to intercept 488 * @param handler the handler to be called when the message is received 489 * @return 0 if successful otherwise -1. 490 */ 491 DLLEXPORT int trace_set_handler(libtrace_t *libtrace, enum libtrace_messages message, fn_handler handler); 467 492 468 493 /** Pauses a trace previously started with trace_pstart() -
lib/trace.c
r8370482 r76291d1 287 287 ZERO_USER_CONFIG(libtrace->config); 288 288 memset(&libtrace->combiner, 0, sizeof(libtrace->combiner)); 289 memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks)); 289 290 290 291 /* Parse the URI to determine what sort of trace we are dealing with */ … … 405 406 ZERO_USER_CONFIG(libtrace->config); 406 407 memset(&libtrace->combiner, 0, sizeof(libtrace->combiner)); 408 memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks)); 407 409 408 410 for(tmp=formats_list;tmp;tmp=tmp->next) { -
lib/trace_parallel.c
rd3849c7 r76291d1 377 377 t->accepted_packets++; 378 378 libtrace_generic_t data = {.pkt = *packet}; 379 *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t); 379 if (trace->callbacks.message_packet) 380 *packet = (*trace->callbacks.message_packet)(trace, t, data, trace->global_blob, t->user_data); 381 else 382 *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t); 380 383 trace_fin_packet(*packet); 381 384 } else { … … 535 538 536 539 /* Let the per_packet function know we have started */ 537 (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t); 538 (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t); 540 if (trace->callbacks.message_starting) 541 (*trace->callbacks.message_starting)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data); 542 else 543 (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t); 544 545 if (trace->callbacks.message_resuming) 546 (*trace->callbacks.message_resuming)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data); 547 else 548 (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t); 539 549 540 550 for (;;) { … … 619 629 620 630 // Let the per_packet function know we have stopped 621 (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t); 622 (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t); 631 if (trace->callbacks.message_pausing) 632 (*trace->callbacks.message_pausing)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data); 633 else 634 (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t); 635 if (trace->callbacks.message_stopping) 636 (*trace->callbacks.message_stopping)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data); 637 else 638 (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t); 639 623 640 624 641 // Free any remaining packets … … 1719 1736 return ret; 1720 1737 } 1738 1739 DLLEXPORT int trace_set_handler(libtrace_t *libtrace, enum libtrace_messages message, fn_handler handler) { 1740 switch (message) { 1741 case MESSAGE_STARTING: 1742 libtrace->callbacks.message_starting = handler; 1743 return 0; 1744 case MESSAGE_STOPPING: 1745 libtrace->callbacks.message_stopping = handler; 1746 return 0; 1747 case MESSAGE_RESUMING: 1748 libtrace->callbacks.message_resuming = handler; 1749 return 0; 1750 case MESSAGE_PAUSING: 1751 libtrace->callbacks.message_pausing = handler; 1752 return 0; 1753 case MESSAGE_PACKET: 1754 libtrace->callbacks.message_packet = handler; 1755 return 0; 1756 default: 1757 return -1; 1758 } 1759 return -1; 1760 } 1761 1721 1762 1722 1763 /* -
tools/tracestats/tracestats_parallel.c
rdfbdda7a r76291d1 87 87 88 88 89 static void* per_packet(libtrace_t *trace , libtrace_thread_t *t,90 int mesg , libtrace_generic_t data,89 static void* per_packet(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, 90 int mesg UNUSED, libtrace_generic_t data UNUSED, 91 91 libtrace_thread_t *sender UNUSED) 92 92 { 93 /* Using first entry as total and those after for filter counts */94 static __thread statistics_t * results = NULL;95 int i, wlen;96 libtrace_stat_t *stats;97 libtrace_generic_t gen;98 99 switch (mesg) {100 case MESSAGE_PACKET:101 /* Apply filters to every packet note the result */102 wlen = trace_get_wire_length(data.pkt);103 for(i=0;i<filter_count;++i) {104 if (filters[i].filter == NULL)105 continue;106 if(trace_apply_filter(filters[i].filter,data.pkt) > 0) {107 results[i+1].count++;108 results[i+1].bytes+=wlen;109 }110 if (trace_is_err(trace)) {111 trace_perror(trace, "trace_apply_filter");112 fprintf(stderr, "Removing filter from filterlist\n");113 /* This is a race, but will be atomic */114 filters[i].filter = NULL;115 }116 }117 results[0].count++;118 results[0].bytes +=wlen;119 return data.pkt;120 case MESSAGE_STARTING:121 /* Allocate space to hold a total count and one for each filter */122 results = calloc(1, sizeof(statistics_t) * (filter_count + 1));123 break;124 case MESSAGE_STOPPING:125 /* We only output one result per thread with the key 0 when the126 * trace is over. */127 gen.ptr = results;128 trace_publish_result(trace, t, 0, gen, RESULT_USER);129 break;130 default:131 break;132 }133 93 return NULL; 134 94 } … … 184 144 } 185 145 146 147 static void* fn_starting(libtrace_t *trace UNUSED, libtrace_thread_t *t, 148 libtrace_generic_t data UNUSED, void *global UNUSED, void*tls UNUSED) { 149 /* Allocate space to hold a total count and one for each filter */ 150 statistics_t *results = calloc(1, sizeof(statistics_t) * (filter_count + 1)); 151 trace_set_tls(t, results); 152 return NULL; 153 } 154 155 156 static void* fn_stopping(libtrace_t *trace, libtrace_thread_t *t UNUSED, 157 libtrace_generic_t data UNUSED, void *global UNUSED, void*tls) { 158 statistics_t *results = tls; 159 libtrace_generic_t gen; 160 /* We only output one result per thread with the key 0 when the 161 * trace is over. */ 162 gen.ptr = results; 163 trace_publish_result(trace, t, 0, gen, RESULT_USER); 164 return NULL; 165 } 166 167 static void* fn_packet(libtrace_t *trace, libtrace_thread_t *t UNUSED, 168 libtrace_generic_t data, void *global UNUSED, void*tls) { 169 statistics_t *results = tls; 170 int i, wlen; 171 172 /* Apply filters to every packet note the result */ 173 wlen = trace_get_wire_length(data.pkt); 174 for(i=0;i<filter_count;++i) { 175 if (filters[i].filter == NULL) 176 continue; 177 if(trace_apply_filter(filters[i].filter,data.pkt) > 0) { 178 results[i+1].count++; 179 results[i+1].bytes+=wlen; 180 } 181 if (trace_is_err(trace)) { 182 trace_perror(trace, "trace_apply_filter"); 183 fprintf(stderr, "Removing filter from filterlist\n"); 184 /* This is a race, but will be atomic */ 185 filters[i].filter = NULL; 186 } 187 } 188 results[0].count++; 189 results[0].bytes +=wlen; 190 return data.pkt; 191 } 192 186 193 /* Process a trace, counting packets that match filter(s) */ 187 194 static void run_trace(char *uri, char *config, char *config_file) … … 212 219 } 213 220 } 221 222 trace_set_handler(trace, MESSAGE_PACKET, fn_packet); 223 trace_set_handler(trace, MESSAGE_STARTING, fn_starting); 224 trace_set_handler(trace, MESSAGE_STOPPING, fn_stopping); 214 225 215 226 /* Start the trace as a parallel trace */
Note: See TracChangeset
for help on using the changeset viewer.