- Timestamp:
- 06/04/14 02:28:58 (7 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- b13b939
- Parents:
- fac8c46
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
tools/tracertstats/tracertstats_parallel.c
r17a3dff r82facc5 65 65 66 66 #define DEFAULT_OUTPUT_FMT "txt" 67 #define TRACE_TIME 168 67 69 68 struct libtrace_t *trace; … … 190 189 } timestamp_sync_t; 191 190 192 193 static int reduce_tracetime(libtrace_t* trace, void* global_blob, uint64_t *last_ts)194 {195 int i,j;196 //uint64_t count=0, bytes=0;197 static uint64_t ts = 0;198 libtrace_vector_t results;199 libtrace_vector_init(&results, sizeof(libtrace_result_t));200 trace_get_results_check_temp(trace, &results, *last_ts);201 //trace_get_results(trace, &results);202 //uint64_t packets;203 204 /* Get the results from each core and sum 'em up */205 for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {206 libtrace_result_t result;207 208 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);209 ts = libtrace_result_get_key(&result);210 if (*last_ts == 0)211 *last_ts = ts;212 213 result_t * res = libtrace_result_get_value(&result);214 static result_t * last_res = NULL;215 if (res == last_res) {216 printf("Hmm could be asserting but I'm not ;)\n");217 }218 //assert(res != last_res);219 last_res = res;220 //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);221 /*while (*last_ts < ts) {222 report_results((double) *last_ts * (double) packet_interval, count, bytes);223 count = 0;224 bytes = 0;225 for (j = 0; j < filter_count; j++)226 filters[j].count = filters[j].bytes = 0;227 (*last_ts)++;228 }*/229 230 count += res->total.count;231 bytes += res->total.bytes;232 for (j = 0; j < filter_count; j++) {233 filters[j].count += res->filters[j].count;234 filters[j].bytes += res->filters[j].bytes;235 }236 free(res);237 }238 report_results((double) *last_ts * (double) packet_interval, count, bytes);239 count = 0;240 bytes = 0;241 for (j = 0; j < filter_count; j++)242 filters[j].count = filters[j].bytes = 0;243 (*last_ts)++;244 245 // Done with these results - Free internally and externally246 libtrace_vector_destroy(&results);247 248 return 0;249 }250 251 191 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, 252 192 libtrace_message_t *mesg, … … 259 199 // Unsure when we would hit this case but the old code had it, I 260 200 // guess we should keep it 261 if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) == NULL) {262 201 if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) { 202 //fprintf(stderr, "Got packet t=%x\n", t); 263 203 ts = trace_get_seconds(pkt) / packet_interval; 264 204 if (last_ts == 0) 265 205 last_ts = ts; 266 206 267 207 while (packet_interval != UINT64_MAX && last_ts<ts) { 268 208 // Publish and make a new one new 209 fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts); 269 210 trace_publish_result(trace, (uint64_t) last_ts, results); 270 211 trace_post_reduce(trace); … … 296 237 break; 297 238 case MESSAGE_STOPPED: 239 // Should we always post this? 298 240 if (results->total.count) { 299 241 trace_publish_result(trace, (uint64_t) last_ts, results); 300 242 trace_post_reduce(trace); 243 results = NULL; 301 244 } 245 break; 246 case MESSAGE_TICK: 247 { 248 int64_t offset; 249 struct timeval *tv, tv_real; 250 libtrace_packet_t *first_packet = NULL; 251 retrive_first_packet(trace, &first_packet, &tv); 252 if (first_packet != NULL) { 253 // So figure out our running offset 254 tv_real = trace_get_timeval(first_packet); 255 offset = tv_to_usec(tv) - tv_to_usec(&tv_real); 256 // Get time of day and do this stuff 257 uint64_t next_update_time; 258 next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset; 259 if (next_update_time <= mesg->additional.uint64) { 260 fprintf(stderr, "Got a tick and publishing early!!\n"); 261 trace_publish_result(trace, (uint64_t) last_ts, results); 262 trace_post_reduce(trace); 263 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 264 last_ts++; 265 } else { 266 fprintf(stderr, "Got a tick but no publish ...\n"); 267 } 268 } else { 269 fprintf(stderr, "Got a tick but no packets seen yet!!!\n"); 270 } 271 } 302 272 } 303 273 } 304 274 return pkt; 305 275 } 306 void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key); 307 /** 308 * A trace time version of map which will attempt to keep upto date 309 * with the incoming data and detect cases where results are missing and 310 * recover correctly. 311 */ 312 static void* per_packet_tracetime(libtrace_t *trace, libtrace_packet_t *pkt, 313 libtrace_message_t *mesg, 314 libtrace_thread_t *t) 315 { 316 // Using first entry as total and those after for filter counts 317 int i; 318 static __thread uint64_t last_ts = 0, ts = 0; 319 static __thread double debug_last = 0.0; 320 static __thread result_t * tmp_result = NULL; 321 322 if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) { 323 ts = trace_get_seconds(pkt) / packet_interval; 324 325 if (debug_last != 0.0 && debug_last > trace_get_seconds(pkt)) 326 printf("packets out of order bitch :(\n"); 327 debug_last = trace_get_seconds(pkt); 328 if (last_ts == 0) 329 last_ts = ts; 330 331 /* 332 while (packet_interval != UINT64_MAX && last_ts<ts) { 333 // Publish and make new 334 trace_publish_result(trace, (uint64_t) last_ts, results); 335 results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count); 336 memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count); 337 last_ts++; 338 }*/ 339 340 /* Calculate count for filters */ 341 for(i=0;i<filter_count;++i) { 342 if(trace_apply_filter(filters[i].filter, pkt)) { 343 tmp_result->filters[i].count = 1; 344 tmp_result->filters[i].bytes = trace_get_wire_length(pkt); 345 } else { 346 tmp_result->filters[i].count = 0; 347 tmp_result->filters[i].bytes = 0; 348 } 349 } 350 351 /* Now Update the currently stored result */ 352 result_t * results = (result_t *) trace_retrive_inprogress_result(trace, ts); 353 354 if (!results) { 355 results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count); 356 memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count); 357 } 358 assert(results); 359 /* Now add to the current results */ 360 results->total.count++; 361 results->total.bytes +=trace_get_wire_length(pkt); 362 /* Now add on filters */ 363 for(i=0;i<filter_count;++i) { 364 results->filters[i].count += tmp_result->filters[i].count; 365 results->filters[i].bytes += tmp_result->filters[i].bytes; 366 } 367 /* Now release the lock and send it away place that back into the buffer */ 368 trace_update_inprogress_result(trace, ts, (void *) results); 369 /*if (count >= packet_count) { 370 report_results(ts,count,bytes); 371 count=0; 372 bytes=0; 373 }*/ // Hmm what was happening here doesn't match up with any of the documentations!!! 374 } 375 if (mesg) { 376 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 377 switch (mesg->code) { 378 case MESSAGE_STARTED: 379 tmp_result = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 380 break; 381 case MESSAGE_STOPPED: 382 trace_retrive_inprogress_result(trace, 0); 383 trace_update_inprogress_result(trace, 1, NULL); 384 } 385 } 386 // Done push the final results 387 /*if (results->total.count) 388 trace_publish_result(trace, (uint64_t) last_ts, results);*/ 389 390 return pkt; 276 277 static uint64_t bad_hash(const libtrace_packet_t * pkt, void *data) { 278 return 0; 391 279 } 392 280 … … 422 310 trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i); 423 311 trace_parallel_config(trace, TRACE_OPTION_TRACETIME, &i); 312 trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL); 424 313 #if TRACE_TIME 425 314 if (trace_pstart(trace, NULL, &per_packet_tracetime, NULL)==-1) { … … 434 323 } 435 324 436 #if TRACE_TIME 437 // First we wait for a message telling us that a timestamp has been 438 // published this allows us to approximately synchronize with the time 439 libtrace_message_t message; 440 int64_t offset; 441 libtrace_packet_t *packet; 442 struct timeval *tv, tv_real; 443 444 445 do { 446 // TODO Put a timeout here also 447 libtrace_thread_get_message(trace, &message); 448 } while (retrive_first_packet(trace, &packet, &tv) == 0); 449 tv_real = trace_get_timeval(packet); 450 offset = tv_to_usec(&tv_real) - tv_to_usec(tv); 451 last_ts = trace_get_seconds(packet) / packet_interval; 452 printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts); 453 /* 454 while (!got_first) { 455 // Wait for a message indicating we've got our 'first' packet, note not a 100% guarantee its our first but pretty likely 456 457 458 459 assert(pthread_mutex_lock(&lock_more) == 0); 460 461 for (i=0; i < 2; ++i) { 462 if (initial_stamps[i].difference_usecs) { // Hmm certainly this cannot possibly lineup 100%?? 463 got_first=1; 464 last_ts = initial_stamps[i].first_interval_number; 465 offset = initial_stamps[i].difference_usecs; 466 printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts); 467 } 468 } 469 assert(pthread_mutex_unlock(&lock_more) == 0); 470 }*/ 471 while (!trace_finished(trace)) { 472 struct timeval tv; 473 // Now try our best to read that one out 474 475 // Read messages 476 //libtrace_thread_get_message(trace, &message); 477 478 // We just release and do work currently, maybe if something 479 // interesting comes through we'd deal with that 480 //libtrace_thread_get_message(trace, &message); 481 482 //while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { } 483 484 /* Now wait for a second after we should see the results */ 485 uint64_t next_update_time, t_usec; 486 next_update_time = (last_ts*packet_interval + packet_interval + 1) * 1000000 + offset; 487 gettimeofday(&tv, NULL); 488 t_usec = tv.tv_sec; 489 t_usec *= 1000000; 490 t_usec += tv.tv_usec; 491 492 //printf("Current time=%"PRIu64" Next result ready=%"PRIu64" =%f\n", t_usec, next_update_time, ((double) next_update_time - (double) t_usec) / 1000000.0); 493 if (next_update_time > t_usec) { 494 tv.tv_sec = (next_update_time - t_usec) / 1000000; 495 tv.tv_usec = (next_update_time - t_usec) % 1000000; 496 select(0, NULL, NULL, NULL, &tv); 497 } 498 reduce_tracetime(trace, NULL, &last_ts); 499 } 500 #else 325 501 326 // reduce 502 327 while (!trace_finished(trace)) { … … 511 336 reduce(trace, NULL, &last_ts); 512 337 } 513 #endif514 338 515 339 // Wait for all threads to stop
Note: See TracChangeset
for help on using the changeset viewer.