Changeset 69ae5a9
- Timestamp:
- 02/25/15 17:09:02 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 2adc1d0
- Parents:
- 2e22196d
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/trace_parallel.c
r4338f97 r69ae5a9 394 394 * Sends a packet to the user, expects either a valid packet or a TICK packet. 395 395 * 396 * Note READ_MESSAGE will only be returned if tracetime is true. 397 * 398 * @brief dispatch_packet 399 * @param trace 400 * @param t 396 * @param trace The trace 397 * @param t The current thread 401 398 * @param packet A pointer to the packet storage, which may be set to null upon 402 399 * return, or a packet to be finished. 403 400 * @return 0 is successful, otherwise if playing back in tracetime 404 401 * READ_MESSAGE(-2) can be returned in which case the packet is not sent. 402 * 403 * @note READ_MESSAGE will only be returned if tracetime is true. 405 404 */ 406 405 static inline int dispatch_packet(libtrace_t *trace, … … 428 427 429 428 /** 429 * Sends a batch of packets to the user, expects either a valid packet or a 430 * TICK packet. 431 * 432 * @param trace The trace 433 * @param t The current thread 434 * @param packets [in,out] An array of packets, these may be null upon return 435 * @param nb_packets The total number of packets in the list 436 * @param empty [in,out] A pointer to an integer storing the first empty slot, 437 * upon return this is updated 438 * @param offset [in,out] The offset into the array, upon return this is updated 439 * @return 0 is successful, otherwise if playing back in tracetime 440 * READ_MESSAGE(-2) can be returned in which case the packet is not sent. 441 * 442 * @note READ_MESSAGE will only be returned if tracetime is true. 443 */ 444 static inline int dispatch_packets(libtrace_t *trace, 445 libtrace_thread_t *t, 446 libtrace_packet_t *packets[], 447 int nb_packets, int *empty, int *offset, 448 bool tracetime) { 449 for (;*offset < nb_packets; ++*offset) { 450 int ret; 451 ret = dispatch_packet(trace, t, &packets[*offset], tracetime); 452 if (ret == 0) { 453 /* Move full slots to front as we go */ 454 if (packets[*offset]) { 455 if (*empty != *offset) { 456 packets[*empty] = packets[*offset]; 457 packets[*offset] = NULL; 458 } 459 ++*empty; 460 } 461 } else { 462 /* Break early */ 463 assert(ret == READ_MESSAGE); 464 return READ_MESSAGE; 465 } 466 } 467 468 return 0; 469 } 470 471 /** 430 472 * Pauses a per packet thread, messages will not be processed when the thread 431 473 * is paused. … … 441 483 static int trace_perpkt_thread_pause(libtrace_t *trace, libtrace_thread_t *t, 442 484 libtrace_packet_t *packets[], 443 int *nb_packets, int *empty, int *offset) {485 int nb_packets, int *empty, int *offset) { 444 486 libtrace_message_t message = {0}; 445 487 libtrace_packet_t * packet = NULL; … … 454 496 /* First send those packets already read, as fast as possible 455 497 * This should never fail or check for messages etc. */ 456 for (;*offset < *nb_packets; ++*offset) { 457 ASSERT_RET(dispatch_packet(trace, t, &packets[*offset], false), == 0); 458 /* Move full slots to front as we go */ 459 if (packets[*offset]) { 460 if (*empty != *offset) { 461 packets[*empty] = packets[*offset]; 462 packets[*offset] = NULL; 463 } 464 ++*empty; 465 } 466 } 498 ASSERT_RET(dispatch_packets(trace, t, packets, nb_packets, empty, 499 offset, false), == 0); 467 500 468 501 libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1); … … 545 578 546 579 /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */ 547 // Send a message to say we've started 548 549 // Let the per_packet function know we have started 580 581 /* Let the per_packet function know we have started */ 550 582 message.code = MESSAGE_STARTING; 551 583 message.sender = t; … … 561 593 switch (message.code) { 562 594 case MESSAGE_DO_PAUSE: // This is internal 563 ret = trace_perpkt_thread_pause(trace, t, packets, &nb_packets, &empty, &offset); 595 ret = trace_perpkt_thread_pause(trace, t, 596 packets, nb_packets, &empty, &offset); 564 597 if (ret == READ_EOF) { 565 598 fprintf(stderr, "PAUSE stop eof!!\n"); … … 610 643 store_first_packet(trace, packets[0], t); 611 644 } 612 for (;offset < nb_packets; ++offset) { 613 int ret; 614 ret = dispatch_packet(trace, t, &packets[offset], trace->tracetime); 615 if (ret == 0) { 616 /* Move full slots to front as we go */ 617 if (packets[offset]) { 618 if (empty != offset) { 619 packets[empty] = packets[offset]; 620 packets[offset] = NULL; 621 } 622 ++empty; 623 } 624 } else { 625 assert(ret == READ_MESSAGE); 626 /* Loop around and process the message, note */ 627 continue; 628 } 629 } 645 dispatch_packets(trace, t, packets, nb_packets, &empty, 646 &offset, trace->tracetime); 630 647 } else { 631 648 switch (nb_packets) { … … 672 689 } 673 690 } 674 675 691 676 692 thread_change_state(trace, t, THREAD_FINISHED, true); … … 865 881 /* Read nb_packets */ 866 882 for (i = 0; i < nb_packets; ++i) { 883 if (libtrace_halt) { 884 break; 885 } 867 886 packets[i]->error = trace_read_packet(libtrace, packets[i]); 868 887
Note: See TracChangeset
for help on using the changeset viewer.