- Timestamp:
- 08/12/14 14:14:50 (7 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 0862c20
- Parents:
- f9a70ca
- Location:
- tools
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
tools/traceanon/traceanon_parallel.c
r9594cf9 rf051c1b 27 27 static int s = 0; 28 28 (void)signal; 29 //trace_interrupt();29 //trace_interrupt(); 30 30 // trace_pstop isn't really signal safe because its got lots of locks in it 31 //trace_pstop(trace);32 31 trace_pstop(trace); 32 /*if (s == 0) { 33 33 if (trace_ppause(trace) == -1) 34 34 trace_perror(trace, "Pause failed"); … … 37 37 if (trace_pstart(trace, NULL, NULL, NULL) == -1) 38 38 trace_perror(trace, "Start failed"); 39 } 39 }*/ 40 40 s = !s; 41 41 } … … 141 141 142 142 143 static uint64_t bad_hash(libtrace_packet_t * pkt)143 UNUSED static uint64_t bad_hash(UNUSED libtrace_packet_t * pkt) 144 144 { 145 145 return 0; … … 147 147 148 148 149 static uint64_t rand_hash(libtrace_packet_t * pkt)149 UNUSED static uint64_t rand_hash(UNUSED libtrace_packet_t * pkt) 150 150 { 151 151 return rand(); … … 153 153 154 154 155 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t) 156 { 157 int i; 155 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t) 156 { 158 157 159 158 if (pkt) { … … 191 190 //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt); 192 191 //trace_publish_result(trace, trace_packet_get_order(pkt), pkt); 193 trace_publish_ packet(trace, pkt);192 trace_publish_result(trace, t, trace_packet_get_order(pkt), pkt, RESULT_PACKET); 194 193 //return ; 195 194 } … … 197 196 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 198 197 switch (mesg->code) { 199 case MESSAGE_START ED:198 case MESSAGE_STARTING: 200 199 enc_init(enc_type,key); 201 200 } … … 204 203 } 205 204 205 struct libtrace_out_t *writer = 0; 206 207 static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) { 208 static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format 209 if (result) { 210 libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result); 211 assert(libtrace_result_get_key(result) == packet_count++); 212 if (trace_write_packet(writer,packet)==-1) { 213 trace_perror_output(writer,"writer"); 214 trace_interrupt(); 215 } 216 trace_free_result_packet(trace, packet); 217 } 218 return NULL; 219 } 220 221 206 222 int main(int argc, char *argv[]) 207 223 { 208 224 //struct libtrace_t *trace = 0; 209 struct libtrace_packet_t *packet/* = trace_create_packet()*/;210 struct libtrace_out_t *writer = 0;211 225 struct sigaction sigact; 212 226 char *output = 0; … … 380 394 //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i); 381 395 i = 2; 382 383 384 if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {396 trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i); 397 398 if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) { 385 399 trace_perror(trace,"trace_start"); 386 400 trace_destroy_output(writer); … … 402 416 sigaction(SIGINT, &sigact, NULL); 403 417 sigaction(SIGTERM, &sigact, NULL); 404 405 // Read in the resulting packets and then free them when done 406 libtrace_vector_t res; 407 int res_size = 0; 408 libtrace_vector_init(&res, sizeof(libtrace_result_t)); 409 uint64_t packet_count = 0; 410 while (!trace_finished(trace)) { 411 // Read messages 412 libtrace_message_t message; 413 414 // We just release and do work currently, maybe if something 415 // interesting comes through we'd deal with that 416 libtrace_thread_get_message(trace, &message); 417 418 while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { } 419 420 if ((res_size = trace_get_results(trace, &res)) == 0) 421 ;/*sched_yield();*/ 422 423 for (i = 0 ; i < res_size ; i++) { 424 libtrace_result_t result; 425 assert(libtrace_vector_get(&res, i, (void *) &result) == 1); 426 packet = libtrace_result_get_value(&result); 427 assert(libtrace_result_get_key(&result) == packet_count); 428 packet_count++; 429 if (trace_write_packet(writer,packet)==-1) { 430 trace_perror_output(writer,"writer"); 431 trace_interrupt(); 432 break; 433 } 434 //trace_destroy_packet(packet); 435 trace_free_result_packet(trace, packet); 436 } 437 } 418 419 // Wait for the trace to finish 438 420 trace_join(trace); 439 440 // Grab everything that's left here441 res_size = trace_get_results(trace, &res);442 443 for (i = 0 ; i < res_size ; i++) {444 libtrace_result_t result;445 assert(libtrace_vector_get(&res, i, (void *) &result) == 1);446 packet = libtrace_result_get_value(&result);447 if (libtrace_result_get_key(&result) != packet_count)448 printf ("Got a %"PRIu64" but expected a %"PRIu64" %d\n", libtrace_result_get_key(&result), packet_count, res_size);449 assert(libtrace_result_get_key(&result) == packet_count);450 451 packet_count++;452 if (trace_write_packet(writer,packet)==-1) {453 trace_perror_output(writer,"writer");454 trace_interrupt();455 break;456 }457 trace_destroy_packet(packet);458 }459 libtrace_vector_destroy(&res);460 421 461 422 //trace_destroy_packet(packet); -
tools/tracertstats/tracertstats_parallel.c
rb13b939 rf051c1b 209 209 // Publish and make a new one new 210 210 //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts); 211 trace_publish_result(trace, (uint64_t) last_ts, results);212 trace_post_re duce(trace);211 trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL); 212 trace_post_reporter(trace); 213 213 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 214 214 last_ts++; … … 234 234 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 235 235 switch (mesg->code) { 236 case MESSAGE_START ED:236 case MESSAGE_STARTING: 237 237 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 238 238 break; 239 case MESSAGE_STOPP ED:239 case MESSAGE_STOPPING: 240 240 // Should we always post this? 241 241 if (results->total.count) { 242 trace_publish_result(trace, (uint64_t) last_ts, results);243 trace_post_re duce(trace);242 trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL); 243 trace_post_reporter(trace); 244 244 results = NULL; 245 245 } … … 260 260 if (next_update_time <= mesg->additional.uint64) { 261 261 //fprintf(stderr, "Got a tick and publishing early!!\n"); 262 trace_publish_result(trace, (uint64_t) last_ts, results);263 trace_post_re duce(trace);262 trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL); 263 trace_post_reporter(trace); 264 264 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 265 265 last_ts++; -
tools/tracestats/tracestats_parallel.c
r9594cf9 rf051c1b 67 67 // trace_interrupt(); 68 68 // trace_pstop isn't really signal safe because its got lots of locks in it 69 //trace_pstop(trace);70 69 trace_pstop(trace); 70 /*if (s == 0) { 71 71 if (trace_ppause(trace) == -1) 72 72 trace_perror(trace, "Pause failed"); … … 75 75 if (trace_pstart(trace, NULL, NULL, NULL) == -1) 76 76 trace_perror(trace, "Start failed"); 77 } 77 }*/ 78 78 s = !s; 79 79 } … … 129 129 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 130 130 switch (mesg->code) { 131 case MESSAGE_STOPP ED:132 trace_publish_result(trace, 0, results); // Only ever using a single key 0131 case MESSAGE_STOPPING: 132 trace_publish_result(trace, t, 0, results, RESULT_NORMAL); // Only ever using a single key 0 133 133 fprintf(stderr, "Thread published resuslts WOWW\n"); 134 134 break; 135 case MESSAGE_START ED:135 case MESSAGE_STARTING: 136 136 results = calloc(1, sizeof(statistics_t) * (filter_count + 1)); 137 137 break; … … 142 142 fprintf(stderr, "Thread is pausing\n"); 143 143 break; 144 case MESSAGE_ PAUSED:144 case MESSAGE_RESUMING: 145 145 fprintf(stderr, "Thread has paused\n"); 146 146 break; … … 228 228 int option = 2; 229 229 //option = 10000; 230 230 //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL); 231 231 option = 2; 232 232 trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
Note: See TracChangeset
for help on using the changeset viewer.