- Timestamp:
- 02/11/15 11:13:27 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 0b01fea
- Parents:
- fed9152 (diff), 12ae766 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - Location:
- lib
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_dpdk.c
r1960910 r12ae766 201 201 */ 202 202 203 /* Print verbose messages to std out*/204 #define DEBUG 1203 /* Print verbose messages to stderr */ 204 #define DEBUG 0 205 205 206 206 /* Use clock_gettime() for nanosecond resolution rather than gettimeofday() … … 351 351 if (format_data->nb_blacklist >= sizeof (format_data->blacklist) 352 352 / sizeof (format_data->blacklist[0])) { 353 printf("Warning: too many devices to blacklist consider"353 fprintf(stderr, "Warning: too many devices to blacklist consider" 354 354 " increasing BLACK_LIST_SIZE"); 355 355 break; … … 625 625 if (my_cpu < 0) { 626 626 /* If we can assign to a core on the same numa node */ 627 printf("Using pci card on numa_node%d\n", format_data->nic_numa_node);627 fprintf(stderr, "Using pci card on numa_node%d\n", format_data->nic_numa_node); 628 628 if(format_data->nic_numa_node >= 0) { 629 629 int max_node_cpu = -1; … … 735 735 struct rte_eth_dev_info dev_info; 736 736 rte_eth_dev_info_get(0, &dev_info); 737 printf("Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d",737 fprintf(stderr, "Device port=0\n\tmin_rx_bufsize=%d\n\tmax_rx_pktlen=%d\n\tmax rx queues=%d\n\tmax tx queues=%d", 738 738 (int) dev_info.min_rx_bufsize, (int) dev_info.max_rx_pktlen, (int) dev_info.max_rx_queues, (int) dev_info.max_tx_queues); 739 739 … … 1206 1206 */ 1207 1207 #if DEBUG 1208 printf("Creating mempool named %s\n", format_data->mempool_name);1208 fprintf(stderr, "Creating mempool named %s\n", format_data->mempool_name); 1209 1209 #endif 1210 1210 format_data->pktmbuf_pool = … … 1243 1243 } 1244 1244 #if DEBUG 1245 printf("Doing dev configure\n");1245 fprintf(stderr, "Doing dev configure\n"); 1246 1246 #endif 1247 1247 /* Initialise the TX queue a minimum value if using this port for … … 1259 1259 for (i=0; i < rx_queues; i++) { 1260 1260 #if DEBUG 1261 printf("Doing queue configure\n");1261 fprintf(stderr, "Doing queue configure\n"); 1262 1262 #endif 1263 1263 … … 1419 1419 * gives it. 1420 1420 * 1421 * We then allow a mapper thread to be started on every real core as DPDK would 1421 * We then allow a mapper thread to be started on every real core as DPDK would, 1422 1422 * we also bind these to the corresponding CPU cores. 1423 1423 * … … 1437 1437 // in this case physical cores on the system will not exist so we don't bind 1438 1438 // these to any particular physical core 1439 pthread_mutex_lock(&libtrace->libtrace_lock); 1439 1440 if (reading) { 1440 1441 #if HAVE_LIBNUMA … … 1472 1473 // TODO proper libtrace style error here!! 1473 1474 fprintf(stderr, "Too many threads for DPDK!!\n"); 1475 pthread_mutex_unlock(&libtrace->libtrace_lock); 1474 1476 return -1; 1475 1477 } … … 1496 1498 if (i != 0) { 1497 1499 fprintf(stderr, "Warning pthread_setaffinity_np failed\n"); 1500 pthread_mutex_unlock(&libtrace->libtrace_lock); 1498 1501 return -1; 1499 1502 } … … 1508 1511 } 1509 1512 } 1513 pthread_mutex_unlock(&libtrace->libtrace_lock); 1510 1514 return 0; 1511 1515 } … … 1523 1527 1524 1528 assert(rte_lcore_id() < RTE_MAX_LCORE); 1525 1529 pthread_mutex_lock(&libtrace->libtrace_lock); 1526 1530 // Skip if master!! 1527 1531 if (rte_lcore_id() == rte_get_master_lcore()) { 1528 1532 fprintf(stderr, "INFO: we are skipping unregistering the master lcore\n"); 1533 pthread_mutex_unlock(&libtrace->libtrace_lock); 1529 1534 return; 1530 1535 } … … 1535 1540 RTE_PER_LCORE(_lcore_id) = -1; // Might make the world burn if used again 1536 1541 assert(cfg->lcore_count >= 1); // We cannot unregister the master LCORE!! 1542 pthread_mutex_unlock(&libtrace->libtrace_lock); 1537 1543 return; 1538 1544 } -
lib/libtrace_int.h
r04bf7c5 r12ae766 193 193 */ 194 194 struct libtrace_thread_t { 195 int accepted_packets; // The number of packets accepted only used if pread 195 uint64_t accepted_packets; // The number of packets accepted only used if pread 196 uint64_t filtered_packets; 196 197 // is retreving packets 197 198 // Set to true once the first packet has been stored … … 307 308 fn_hasher hasher; // If valid using a separate thread 308 309 void *hasher_data; 309 310 /** The pread_packet choosen path for the configuration */ 311 int (*pread)(libtrace_t *, libtrace_thread_t *, libtrace_packet_t **, size_t); 312 310 313 libtrace_thread_t hasher_thread; 311 314 libtrace_thread_t reporter_thread; … … 929 932 /** 930 933 * Register a thread for use with the format or using the packets produced 931 * by it. This is NOT only used for threads reading packets in fact all934 * by it. This is NOT only used for threads reading packets in fact all 932 935 * threads use this. 936 * 937 * The libtrace lock is not held by this format but can be aquired 938 * by the format. 933 939 * 934 940 * Some use cases include setting up any thread local storage required for -
lib/trace.c
r04bf7c5 r858ce90 282 282 libtrace->dropped_packets = UINT64_MAX; 283 283 libtrace->received_packets = UINT64_MAX; 284 libtrace->pread = NULL; 284 285 ZERO_USER_CONFIG(libtrace->config); 285 286 … … 396 397 libtrace->perpkt_threads = NULL; 397 398 libtrace->tracetime = 0; 399 libtrace->pread = NULL; 398 400 ZERO_USER_CONFIG(libtrace->config); 399 401 … … 1957 1959 { 1958 1960 assert(trace); 1961 int i = 0; 1962 uint64_t ret = trace->filtered_packets; 1963 for (i = 0; i < trace->perpkt_thread_count; i++) { 1964 ret += trace->perpkt_threads[i].filtered_packets; 1965 } 1959 1966 if (trace->format->get_filtered_packets) { 1960 1967 return trace->format->get_filtered_packets(trace)+ 1961 trace->filtered_packets;1962 } 1963 return trace->filtered_packets;1968 ret; 1969 } 1970 return ret; 1964 1971 } 1965 1972 -
lib/trace_parallel.c
r04bf7c5 r12ae766 101 101 #include <unistd.h> 102 102 103 104 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets); 105 103 static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t); 106 104 extern int libtrace_parallel; 107 105 … … 226 224 227 225 if (trace->config.debug_state) 228 fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,229 226 fprintf(stderr, "Thread %d state changed from %d to %d\n", 227 (int) t->tid, prev_state, t->state); 230 228 231 229 pthread_cond_broadcast(&trace->perpkt_cond); … … 293 291 void libtrace_zero_thread(libtrace_thread_t * t) { 294 292 t->accepted_packets = 0; 293 t->filtered_packets = 0; 295 294 t->recorded_first = false; 296 295 t->tracetime_offset_usec = 0; … … 387 386 } 388 387 389 390 391 /**392 * Dispatches packets to their correct place and applies any translations393 * as needed.394 * 388 /** 389 * Sends a packet to the user, expects either a valid packet or a TICK packet. 390 * 391 * Note READ_MESSAGE will only be returned if tracetime is true. 392 * 393 * @brief dispatch_packet 395 394 * @param trace 396 395 * @param t 397 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse 398 * @return -1 if an error or EOF has occured and the trace should end, otherwise a postive number (or 0) 399 * representing the number of packets returned, these will be at the beginning of the array. 400 */ 401 static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets, 402 size_t nb_packets) { 403 libtrace_message_t message; 404 size_t i, empty = 0; 405 for (i = 0; i < nb_packets; ++i) { 406 if (packets[i]->error > 0) { 407 packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t); 408 trace_fin_packet(packets[i]); 409 } else if (packets[i]->error == READ_TICK) { 410 message.code = MESSAGE_TICK; 411 message.additional.uint64 = trace_packet_get_order(packets[i]); 412 message.sender = t; 413 (*trace->per_pkt)(trace, NULL, &message, t); 414 } else if (packets[i]->error != READ_MESSAGE) { 415 // An error this should be the last packet we read 416 size_t z; 417 // We could have an eof or error and a message such as pause 418 for (z = i + 1 ; z < nb_packets; ++z) { 419 fprintf(stderr, "i=%d nb_packets=%d err=%d, seq=%d\n", (int) z, (int) nb_packets, packets[z]->error, (int) packets[z]->order); 420 assert (packets[z]->error <= 0); 421 } 422 return -1; 423 } 424 if (packets[i]) { 425 // Move full slots to front 426 if (empty != i) { 427 packets[empty] = packets[i]; 428 packets[i] = NULL; 429 } 430 ++empty; 431 // Finish packets while still in CPU cache 432 } 433 } 434 return empty; 435 } 436 437 static inline int dispatch_packet(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packet) { 438 libtrace_message_t message; 396 * @param packet A pointer to the packet storage, which may be set to null upon 397 * return, or a packet to be finished. 398 * @return 0 is successful, otherwise if playing back in tracetime 399 * READ_MESSAGE(-2) can be returned in which case the packet is not sent. 400 */ 401 static inline int dispatch_packet(libtrace_t *trace, 402 libtrace_thread_t *t, 403 libtrace_packet_t **packet, 404 bool tracetime) { 439 405 if ((*packet)->error > 0) { 406 if (tracetime) { 407 if (delay_tracetime(trace, packet[0], t) == READ_MESSAGE) 408 return READ_MESSAGE; 409 } 440 410 *packet = (*trace->per_pkt)(trace, *packet, NULL, t); 441 411 trace_fin_packet(*packet); 442 } else if ((*packet)->error == READ_TICK) { 412 } else { 413 libtrace_message_t message; 414 assert((*packet)->error == READ_TICK); 443 415 message.code = MESSAGE_TICK; 444 416 message.additional.uint64 = trace_packet_get_order(*packet); 445 417 message.sender = t; 446 418 (*trace->per_pkt)(trace, NULL, &message, t); 447 } else if ((*packet)->error != READ_MESSAGE) {448 return -1;449 419 } 450 420 return 0; 421 } 422 423 /** 424 * Pauses a per packet thread, messages will not be processed when the thread 425 * is paused. 426 * 427 * This process involves reading packets if a hasher thread is used. As such 428 * this function can fail to pause due to errors when reading in which case 429 * the thread should be stopped instead. 430 * 431 * 432 * @brief trace_perpkt_thread_pause 433 * @return READ_ERROR(-1) or READ_EOF(0) or 1 if successfull 434 */ 435 static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t, 436 libtrace_packet_t *packets[], 437 int *nb_packets, int *empty, int *offset) { 438 libtrace_message_t message = {0}; 439 libtrace_packet_t * packet = NULL; 440 441 /* Let the user thread know we are going to pause */ 442 message.code = MESSAGE_PAUSING; 443 message.sender = t; 444 (*trace->per_pkt)(trace, NULL, &message, t); 445 446 /* Send through any remaining packets (or messages) without delay */ 447 448 /* First send those packets already read, as fast as possible 449 * This should never fail or check for messages etc. */ 450 for (;*offset < *nb_packets; ++*offset) { 451 ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0); 452 /* Move full slots to front as we go */ 453 if (packets[*offset]) { 454 if (*empty != *offset) { 455 packets[*empty] = packets[*offset]; 456 packets[*offset] = NULL; 457 } 458 ++*empty; 459 } 460 } 461 462 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1); 463 /* If a hasher thread is running, empty input queues so we don't lose data */ 464 if (trace_has_dedicated_hasher(trace)) { 465 fprintf(stderr, "Trace is using a hasher thread emptying queues\n"); 466 // The hasher has stopped by this point, so the queue shouldn't be filling 467 while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) { 468 int ret = trace->pread(trace, t, &packet, 1); 469 if (ret == 1) { 470 if (packet->error > 0) { 471 store_first_packet(trace, packet, t); 472 } 473 ASSERT_RET(dispatch_packet(trace, t, &packet, 1), == 0); 474 if (packet == NULL) 475 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1); 476 } else if (ret != READ_MESSAGE) { 477 /* Ignore messages we pick these up next loop */ 478 assert (ret == READ_EOF || ret == READ_ERROR); 479 /* Verify no packets are remaining */ 480 /* TODO refactor this sanity check out!! */ 481 while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 482 ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0); 483 // No packets after this should have any data in them 484 assert(packet->error <= 0); 485 } 486 fprintf(stderr, "PREAD_FAILED %d\n", ret); 487 libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1); 488 return -1; 489 } 490 } 491 } 492 libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1); 493 494 /* Now we do the actual pause, this returns when we resumed */ 495 trace_thread_pause(trace, t); 496 message.code = MESSAGE_RESUMING; 497 (*trace->per_pkt)(trace, NULL, &message, t); 498 return 1; 451 499 } 452 500 … … 456 504 static void* perpkt_threads_entry(void *data) { 457 505 libtrace_t *trace = (libtrace_t *)data; 458 libtrace_thread_t * 506 libtrace_thread_t *t; 459 507 libtrace_message_t message = {0}; 460 508 libtrace_packet_t *packets[trace->config.burst_size]; 461 size_t nb_packets;462 509 size_t i; 463 int ret; 510 //int ret; 511 /* The current reading position into the packets */ 512 int offset = 0; 513 /* The number of packets last read */ 514 int nb_packets = 0; 515 /* The offset to the first NULL packet upto offset */ 516 int empty = 0; 464 517 465 518 /* Wait until trace_pstart has been completed */ … … 473 526 } 474 527 //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace)); 528 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 529 475 530 if (trace->format->pregister_thread) { 476 531 trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace)); 477 532 } 478 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);479 533 480 534 /* Fill our buffer with empty packets */ … … 498 552 499 553 if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) { 554 int ret; 500 555 switch (message.code) { 501 556 case MESSAGE_DO_PAUSE: // This is internal 502 // Send message to say we are pausing, TODO consider sender 503 message.code = MESSAGE_PAUSING; 504 message.sender = t; 505 (*trace->per_pkt)(trace, NULL, &message, t); 506 // If a hasher thread is running empty input queues so we don't lose data 507 if (trace_has_dedicated_hasher(trace)) { 508 fprintf(stderr, "Trace is using a hasher thread emptying queues\n"); 509 // The hasher has stopped by this point, so the queue shouldn't be filling 510 while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 511 ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1); 512 if (dispatch_packets(trace, t, packets, 1) == -1) { 513 // EOF or error, either way we'll stop 514 while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 515 ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1); 516 // No packets after this should have any data in them 517 assert(packets[0]->error <= 0); 518 } 519 goto stop; 520 } 521 } 557 ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset); 558 if (ret == READ_EOF) { 559 fprintf(stderr, "PAUSE stop eof!!\n"); 560 goto eof; 561 } else if (ret == READ_ERROR) { 562 fprintf(stderr, "PAUSE stop error!!\n"); 563 goto error; 522 564 } 523 // Now we do the actual pause, this returns when we are done 524 trace_thread_pause(trace, t); 525 message.code = MESSAGE_RESUMING; 526 (*trace->per_pkt)(trace, NULL, &message, t); 527 // Check for new messages as soon as we return 565 assert(ret == 1); 528 566 continue; 529 567 case MESSAGE_DO_STOP: // This is internal 530 goto stop; 568 fprintf(stderr, "DO_STOP stop!!\n"); 569 goto eof; 531 570 } 532 571 (*trace->per_pkt)(trace, NULL, &message, t); 572 /* Continue and the empty messages out before packets */ 533 573 continue; 534 574 } 535 575 536 if (trace->perpkt_thread_count == 1) { 537 assert(packets[0]); 538 packets[0]->error = trace_read_packet(trace, packets[0]); 539 if (dispatch_packet(trace, t, &packets[0]) != 0) 540 break; 541 if (!packets[0]) { 542 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1); 576 577 /* Do we need to read a new set of packets MOST LIKELY we do */ 578 if (offset == nb_packets) { 579 /* Refill the packet buffer */ 580 if (empty != nb_packets) { 581 // Refill the empty packets 582 libtrace_ocache_alloc(&trace->packet_freelist, 583 (void **) &packets[empty], 584 nb_packets - empty, 585 nb_packets - empty); 586 } 587 if (!trace->pread) { 588 assert(packets[0]); 589 nb_packets = trace_read_packet(trace, packets[0]); 590 packets[0]->error = nb_packets; 591 if (nb_packets > 0) 592 nb_packets = 1; 593 } else { 594 nb_packets = trace->pread(trace, t, packets, trace->config.burst_size); 595 } 596 offset = 0; 597 empty = 0; 598 } 599 600 /* Handle error/message cases */ 601 if (nb_packets > 0) { 602 /* Store the first packet */ 603 if (packets[0]->error > 0) { 604 store_first_packet(trace, packets[0], t); 605 } 606 for (;offset < nb_packets; ++offset) { 607 int ret; 608 ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime); 609 if (ret == 0) { 610 /* Move full slots to front as we go */ 611 if (packets[offset]) { 612 if (empty != offset) { 613 packets[empty] = packets[offset]; 614 packets[offset] = NULL; 615 } 616 ++empty; 617 } 618 } else { 619 assert(ret == READ_MESSAGE); 620 /* Loop around and process the message, note */ 621 continue; 622 } 543 623 } 544 624 } else { 545 nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size); 546 // Loop through the packets we just read and refill 547 ret = dispatch_packets(trace, t, packets, nb_packets); 548 if (ret == -1) 549 break; 550 else if (ret != nb_packets) { 551 // Refill the empty packets 552 //printf("Refilling packets ret=%d nb_packets=%zd\n", ret, nb_packets); 553 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[ret], nb_packets - ret, nb_packets - ret); 625 switch (nb_packets) { 626 case READ_EOF: 627 fprintf(stderr, "EOF stop %d!!\n", nb_packets); 628 goto eof; 629 case READ_ERROR: 630 fprintf(stderr, "ERROR stop %d!!\n", nb_packets); 631 goto error; 632 case READ_MESSAGE: 633 nb_packets = 0; 634 continue; 635 default: 636 fprintf(stderr, "Unexpected error %d!!\n", nb_packets); 637 goto error; 554 638 } 555 639 } 556 } 557 558 559 stop: 640 641 } 642 643 error: 644 fprintf(stderr, "An error occured in trace\n"); 645 eof: 646 fprintf(stderr, "An eof occured in trace\n"); 560 647 /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */ 561 648 … … 610 697 libtrace_packet_t * packet; 611 698 libtrace_message_t message = {0}; 699 int pkt_skipped = 0; 612 700 613 701 assert(trace_has_dedicated_hasher(trace)); … … 623 711 624 712 printf("Hasher Thread started\n"); 713 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 714 625 715 if (trace->format->pregister_thread) { 626 716 trace->format->pregister_thread(trace, t, true); 627 717 } 628 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 629 int pkt_skipped = 0; 718 630 719 /* Read all packets in then hash and queue against the correct thread */ 631 720 while (1) { … … 750 839 } 751 840 752 /**753 * @brief Move NULLs to the end of an array.754 * @param values755 * @param len756 * @return The location the first NULL, aka the number of non NULL elements757 */758 static inline size_t move_nulls_back(void *arr[], size_t len) {759 size_t fr=0, en = len-1;760 // Shift all non NULL elements to the front of the array, and NULLs to the761 // end, traverses every element at most once762 for (;fr < en; ++fr) {763 if (arr[fr] == NULL) {764 for (;en > fr; --en) {765 if(arr[en]) {766 arr[fr] = arr[en];767 arr[en] = NULL;768 break;769 }770 }771 }772 }773 // This is the index of the first NULL774 en = MIN(fr, en);775 // Or the end of the array if this special case776 if (arr[en])777 en++;778 return en;779 }780 781 /** returns the number of packets successfully allocated in the final array782 these will all be at the front of the array */783 inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {784 size_t nb;785 nb = move_nulls_back((void **) packets, nb_packets);786 mem_hits.read.recycled += nb;787 nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb);788 assert(nb_packets == nb);789 return nb;790 }791 792 793 inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {794 size_t nb;795 nb = move_nulls_back((void **) packets, nb_packets);796 mem_hits.write.recycled += nb_packets - nb;797 nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb);798 memset(packets, 0, nb); // XXX make better, maybe do this in ocache??799 return nb;800 }801 802 841 /* Our simplest case when a thread becomes ready it can obtain an exclusive 803 842 * lock to read packets from the underlying trace. 804 843 */ 805 inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets) 806 { 844 static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, 845 libtrace_thread_t *t, 846 libtrace_packet_t *packets[], 847 size_t nb_packets) { 807 848 size_t i = 0; 808 849 //bool tick_hit = false; … … 812 853 for (i = 0; i < nb_packets; ++i) { 813 854 packets[i]->error = trace_read_packet(libtrace, packets[i]); 855 814 856 if (packets[i]->error <= 0) { 815 ++i; 816 break; 857 /* We'll catch this next time if we have already got packets */ 858 if ( i==0 ) { 859 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 860 return packets[i]->error; 861 } else { 862 break; 863 } 817 864 } 818 865 /* … … 842 889 * 2. Move that into the packet provided (packet) 843 890 */ 844 inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) 845 { 891 inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, 892 libtrace_thread_t *t, 893 libtrace_packet_t *packets[], 894 size_t nb_packets) { 846 895 size_t i; 896 897 /* We store the last error message here */ 898 if (t->format_data) { 899 fprintf(stderr, "Hit me, ohh yeah got error %d\n", 900 ((libtrace_packet_t *)t->format_data)->error); 901 return ((libtrace_packet_t *)t->format_data)->error; 902 } 847 903 848 904 // Always grab at least one … … 851 907 packets[0] = libtrace_ringbuffer_read(&t->rbuffer); 852 908 853 if (packets[0]->error < 0) 854 return 1; 909 if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) { 910 fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error); 911 return packets[0]->error; 912 } 855 913 856 914 for (i = 1; i < nb_packets; i++) { … … 861 919 break; 862 920 } 863 // These are typically urgent 864 if (packets[i]->error < 0) 921 922 /* We will return an error or EOF the next time around */ 923 if (packets[i]->error <= 0 && packets[0]->error != READ_TICK) { 924 /* The message case will be checked automatically - 925 However other cases like EOF and error will only be 926 sent once*/ 927 if (packets[i]->error != READ_MESSAGE) { 928 assert(t->format_data == NULL); 929 t->format_data = packets[i]; 930 fprintf(stderr, "Hit me, ohh yeah set error %d\n", 931 ((libtrace_packet_t *)t->format_data)->error); 932 } 865 933 break; 934 } 866 935 } 867 936 … … 1222 1291 1223 1292 /** 1224 * Delays a packets playback so the playback will be in trace time 1225 */ 1226 static inline void delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) { 1293 * Delays a packets playback so the playback will be in trace time. 1294 * This may break early if a message becomes available. 1295 * 1296 * Requires the first packet for this thread to be received. 1297 * @param libtrace The trace 1298 * @param packet The packet to delay 1299 * @param t The current thread 1300 * @return Either READ_MESSAGE(-2) or 0 is successful 1301 */ 1302 static inline int delay_tracetime(libtrace_t *libtrace, libtrace_packet_t *packet, libtrace_thread_t *t) { 1227 1303 struct timeval curr_tv, pkt_tv; 1228 uint64_t next_release = t->tracetime_offset_usec; // Time at which to release the packet1304 uint64_t next_release = t->tracetime_offset_usec; 1229 1305 uint64_t curr_usec; 1230 /* Tracetime we might delay releasing this packet */ 1306 1231 1307 if (!t->tracetime_offset_usec) { 1232 libtrace_packet_t * 1308 libtrace_packet_t *first_pkt; 1233 1309 struct timeval *sys_tv; 1234 1310 int64_t initial_offset; … … 1237 1313 pkt_tv = trace_get_timeval(first_pkt); 1238 1314 initial_offset = (int64_t)tv_to_usec(sys_tv) - (int64_t)tv_to_usec(&pkt_tv); 1315 /* In the unlikely case offset is 0, change it to 1 */ 1239 1316 if (stable) 1240 // 0->1 because 0 is used to mean unset1241 1317 t->tracetime_offset_usec = initial_offset ? initial_offset: 1; 1242 1318 next_release = initial_offset; … … 1248 1324 curr_usec = tv_to_usec(&curr_tv); 1249 1325 if (next_release > curr_usec) { 1326 int ret, mesg_fd = libtrace_message_queue_get_fd(&t->messages); 1327 struct timeval delay_tv = usec_to_tv(next_release-curr_usec); 1328 fd_set rfds; 1329 FD_ZERO(&rfds); 1330 FD_SET(mesg_fd, &rfds); 1250 1331 // We need to wait 1251 struct timeval delay_tv = usec_to_tv(next_release-curr_usec); 1332 1252 1333 //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet)); 1253 select(0, NULL, NULL, NULL, &delay_tv); 1254 } 1255 } 1256 1257 /* Read one packet from the trace into a buffer. Note that this function will 1258 * block until a packet is read (or EOF is reached). 1259 * 1260 * @param libtrace the trace 1261 * @param t The thread 1262 * @param packets an array of packets 1263 * @param nb_packets 1264 * @returns The number of packets read or 0 on EOF, negative value on error 1265 * 1266 * Note this is identical to read_packet but calls pread_packet instead of 1267 * read packet in the format. 1268 * 1269 */ 1270 static inline int trace_pread_packet_wrapper(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) { 1334 ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv); 1335 if (ret == 0) { 1336 return 0; 1337 } else if (ret > 0) { 1338 return READ_MESSAGE; 1339 } else { 1340 fprintf(stderr, "I thnik we broke select\n"); 1341 } 1342 } 1343 return 0; 1344 } 1345 1346 /* Discards packets that don't match the filter. 1347 * Discarded packets are emptied and then moved to the end of the packet list. 1348 * 1349 * @param trace The trace format, containing the filter 1350 * @param packets An array of packets 1351 * @param nb_packets The number of valid items in packets 1352 * 1353 * @return The number of packets that passed the filter, which are moved to 1354 * the start of the packets array 1355 */ 1356 static inline size_t filter_packets(libtrace_t *trace, 1357 libtrace_packet_t **packets, 1358 size_t nb_packets) { 1359 size_t offset = 0; 1271 1360 size_t i; 1361 1362 for (i = 0; i < nb_packets; ++i) { 1363 // The filter needs the trace attached to receive the link type 1364 packets[i]->trace = trace; 1365 if (trace_apply_filter(trace->filter, packets[i])) { 1366 libtrace_packet_t *tmp; 1367 tmp = packets[offset]; 1368 packets[offset++] = packets[i]; 1369 packets[i] = tmp; 1370 } else { 1371 trace_fin_packet(packets[i]); 1372 } 1373 } 1374 1375 return offset; 1376 } 1377 1378 /* Read a batch of packets from the trace into a buffer. 1379 * Note that this function will block until a packet is read (or EOF is reached) 1380 * 1381 * @param libtrace The trace 1382 * @param t The thread 1383 * @param packets An array of packets 1384 * @param nb_packets The number of empty packets in packets 1385 * @return The number of packets read, 0 on EOF (or an error/message -1,-2). 1386 */ 1387 static int trace_pread_packet_wrapper(libtrace_t *libtrace, 1388 libtrace_thread_t *t, 1389 libtrace_packet_t *packets[], 1390 size_t nb_packets) { 1391 int i; 1272 1392 assert(nb_packets); 1273 assert(libtrace && " You called trace_read_packet() with a NULL libtrace parameter!\n");1393 assert(libtrace && "libtrace is NULL in trace_read_packet()"); 1274 1394 if (trace_is_err(libtrace)) 1275 1395 return -1; 1276 1396 if (!libtrace->started) { 1277 trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"You must call libtrace_start() before trace_read_packet()\n"); 1397 trace_set_err(libtrace, TRACE_ERR_BAD_STATE, 1398 "You must call libtrace_start() before trace_read_packet()\n"); 1278 1399 return -1; 1279 1400 } 1280 1401 1281 1282 1402 if (libtrace->format->pread_packets) { 1283 for (i = 0; i < nb_packets; ++i) { 1284 assert(packets[i]); 1285 if (!(packets[i]->buf_control==TRACE_CTRL_PACKET || packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) { 1286 trace_set_err(libtrace,TRACE_ERR_BAD_STATE,"Packet passed to trace_read_packet() is invalid\n"); 1403 int ret; 1404 for (i = 0; i < (int) nb_packets; ++i) { 1405 assert(i[packets]); 1406 if (!(packets[i]->buf_control==TRACE_CTRL_PACKET || 1407 packets[i]->buf_control==TRACE_CTRL_EXTERNAL)) { 1408 trace_set_err(libtrace,TRACE_ERR_BAD_STATE, 1409 "Packet passed to trace_read_packet() is invalid\n"); 1287 1410 return -1; 1288 1411 } 1289 /* Finalise the packets, freeing any resources the format module1290 * may have allocated it and zeroing all data associated with it.1291 */1292 //trace_fin_packet(packets[i]);1293 /* Store the trace we are reading from into the packet opaque1294 * structure */1295 packets[i]->trace = libtrace;1296 1412 } 1297 1413 do { 1298 int ret; 1299 ret=libtrace->format->pread_packets(libtrace, t, packets, nb_packets); 1414 ret=libtrace->format->pread_packets(libtrace, t, 1415 packets, 1416 nb_packets); 1417 /* Error, EOF or message? */ 1300 1418 if (ret <= 0) { 1301 1419 return ret; 1302 1420 } 1421 1303 1422 if (libtrace->filter) { 1304 /* 1305 * Discard packets that don't match the filter 1306 * If that is all of the packets then pread again 1307 */ 1308 int nb_filtered = 0; 1309 libtrace_packet_t *filtered_pkts[ret]; 1310 int offset; 1311 for (i = 0; i < ret; ++i) { 1312 if (!trace_apply_filter(libtrace->filter, packets[i])){ 1313 trace_fin_packet(packets[i]); 1314 packets[i]->trace = libtrace; 1315 filtered_pkts[nb_filtered++] = packets[i]; 1316 packets[i] = NULL; 1317 } else { 1318 if (libtrace->snaplen>0) 1319 /* Snap the packet */ 1320 trace_set_capture_length(packets[i], 1321 libtrace->snaplen); 1322 trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i])); 1323 } 1324 } 1325 // TODO this aint thread safe 1326 libtrace->filtered_packets += nb_filtered; 1327 for (i = 0, offset = 0; i < ret; ++i) { 1328 if (packets[i]) 1329 packets[offset++] = packets[i]; 1330 } 1331 assert (ret - offset == nb_filtered); 1332 memcpy(&packets[offset], filtered_pkts, nb_filtered * sizeof(libtrace_packet_t *)); 1333 t->accepted_packets -= nb_filtered; 1334 } else { 1335 for (i = 0; i < ret; ++i) { 1336 if (libtrace->snaplen>0) 1337 trace_set_capture_length(packets[i], 1338 libtrace->snaplen); 1339 trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i])); 1340 } 1423 int remaining; 1424 remaining = filter_packets(libtrace, 1425 packets, ret); 1426 t->filtered_packets += ret - remaining; 1427 ret = remaining; 1428 } 1429 for (i = 0; i < ret; ++i) { 1430 packets[i]->trace = libtrace; 1431 /* TODO IN FORMAT?? Like traditional libtrace */ 1432 if (libtrace->snaplen>0) 1433 trace_set_capture_length(packets[i], 1434 libtrace->snaplen); 1435 trace_packet_set_order(packets[i], trace_get_erf_timestamp(packets[i])); 1341 1436 } 1342 1437 t->accepted_packets += ret; 1343 //++libtrace->accepted_packets;1344 1345 } while(1);1346 }1347 trace_set_err(libtrace,TRACE_ERR_UNSUPPORTED,"This format does not support reading packets\n");1438 } while(ret == 0); 1439 return ret; 1440 } 1441 trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED, 1442 "This format does not support reading packets\n"); 1348 1443 return ~0U; 1349 }1350 1351 /**1352 * Selects the correct source for packets, either a parallel source1353 * or internal splitting1354 *1355 * @param libtrace1356 * @param t1357 * @param packets An array pre-filled with empty finilised packets1358 * @param nb_packets The number of packets in the array1359 *1360 * @return the number of packets read, null packets indicate messages. Check packet->error before1361 * assuming a packet is valid.1362 */1363 static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t,1364 libtrace_packet_t *packets[], size_t nb_packets)1365 {1366 size_t ret;1367 size_t i;1368 assert(nb_packets);1369 1370 if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {1371 ret = trace_pread_packet_wrapper(libtrace, t, packets, nb_packets);1372 /* Put the error into the first packet */1373 if ((int) ret <= 0) {1374 packets[0]->error = ret;1375 ret = 1;1376 }1377 } else if (trace_has_dedicated_hasher(libtrace)) {1378 ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);1379 } else if (!trace_has_dedicated_hasher(libtrace)) {1380 /* We don't care about which core a packet goes to */1381 ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets);1382 } /* else {1383 ret = trace_pread_packet_hash_locked(libtrace, packet);1384 }*/1385 1386 // Formats can also optionally do this internally to ensure the first1387 // packet is always reported correctly1388 assert(ret);1389 assert(ret <= nb_packets);1390 if (packets[0]->error > 0) {1391 store_first_packet(libtrace, packets[0], t);1392 if (libtrace->tracetime)1393 delay_tracetime(libtrace, packets[0], t);1394 }1395 1396 return ret;1397 1444 } 1398 1445 … … 1588 1635 printf("This format has direct support for p's\n"); 1589 1636 ret = libtrace->format->pstart_input(libtrace); 1637 libtrace->pread = trace_pread_packet_wrapper; 1590 1638 } else { 1591 1639 if (libtrace->format->start_input) { 1592 1640 ret = libtrace->format->start_input(libtrace); 1593 1641 } 1642 if (libtrace->perpkt_thread_count > 1) 1643 libtrace->pread = trace_pread_packet_first_in_first_served; 1644 else 1645 libtrace->pread = NULL; 1594 1646 } 1595 1647 … … 1616 1668 goto cleanup_started; 1617 1669 } 1670 libtrace->pread = trace_pread_packet_hasher_thread; 1618 1671 } else { 1619 1672 libtrace->hasher_thread.type = THREAD_EMPTY;
Note: See TracChangeset
for help on using the changeset viewer.