- Timestamp:
- 11/07/17 14:38:27 (3 years ago)
- Branches:
- cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- de825b5
- Parents:
- aa7db84
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
raa7db84 r4e427da 25 25 #define ENCAP_BUFSIZE (10000) 26 26 #define CTRL_BUF_SIZE (10000) 27 #define ENCAP_BUFFERS (100) 27 28 28 29 #define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data) … … 47 48 uint16_t monitorid; 48 49 49 char * saved;50 char **saved; 50 51 char *nextread; 51 int savedsize; 52 ndag_encap_t *encaphdr; 52 int nextreadind; 53 int nextwriteind; 54 int savedsize[ENCAP_BUFFERS]; 53 55 uint64_t recordcount; 54 56 … … 474 476 475 477 static void halt_ndag_receiver(recvstream_t *receiver) { 476 int i;478 int j, i; 477 479 libtrace_message_queue_destroy(&(receiver->mqueue)); 478 480 … … 482 484 streamsock_t src = receiver->sources[i]; 483 485 if (src.saved) { 486 for (j = 0; i < ENCAP_BUFFERS; j++) { 487 if (src.saved[j]) { 488 free(src.saved[j]); 489 } 490 } 484 491 free(src.saved); 485 492 } … … 526 533 527 534 dag_record_t *erfptr; 535 ndag_encap_t *encaphdr; 528 536 uint16_t ndag_reccount = 0; 537 int nr; 529 538 530 539 if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) { … … 562 571 ssock->recordcount += 1; 563 572 564 ndag_reccount = ntohs(ssock->encaphdr->recordcount); 573 nr = ssock->nextreadind; 574 encaphdr = (ndag_encap_t *)(ssock->saved[nr] + 575 sizeof(ndag_common_t)); 576 577 ndag_reccount = ntohs(encaphdr->recordcount); 565 578 if ((ndag_reccount & 0x8000) != 0) { 566 579 /* Record was truncated -- update rlen appropriately */ 567 erfptr->rlen = htons(ssock->savedsize -568 (ssock->nextread - ssock->saved ));580 erfptr->rlen = htons(ssock->savedsize[nr] - 581 (ssock->nextread - ssock->saved[nr])); 569 582 } 570 583 ssock->nextread += ntohs(erfptr->rlen); 584 585 if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) { 586 /* Read everything from this buffer, mark as empty and 587 * move on. */ 588 ssock->savedsize[nr] = 0; 589 nr = (nr + 1) % ENCAP_BUFFERS; 590 ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) + 591 sizeof(ndag_encap_t); 592 ssock->nextreadind = nr; 593 } 571 594 572 595 packet->order = erf_get_erf_timestamp(packet); … … 590 613 591 614 streamsock_t *ssock = NULL; 615 int i; 592 616 593 617 if (rt->sourcecount == 0) { … … 611 635 ssock->expectedseq = 0; 612 636 ssock->monitorid = src.monitor; 613 ssock->saved = (char *)malloc(ENCAP_BUFSIZE); 614 ssock->nextread = ssock->saved; 615 ssock->savedsize = 0; 616 ssock->encaphdr = NULL; 637 ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS); 638 639 640 for (i = 0; i < ENCAP_BUFFERS; i++) { 641 ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE); 642 ssock->savedsize[i] = 0; 643 } 644 645 ssock->nextread = NULL;; 646 ssock->nextreadind = 0; 617 647 ssock->recordcount = 0; 618 648 rt->sourcecount += 1; … … 648 678 } 649 679 680 static inline int readable_data(streamsock_t ssock) { 681 682 if (ssock.sock == -1) { 683 return 0; 684 } 685 if (ssock.savedsize[ssock.nextreadind] == 0) { 686 return 0; 687 } 688 if (ssock.nextread - ssock.saved[ssock.nextreadind] >= 689 ssock.savedsize[ssock.nextreadind]) { 690 return 0; 691 } 692 return 1; 693 694 695 } 696 650 697 static inline int read_required(streamsock_t ssock) { 651 698 if (ssock.sock == -1) 652 699 return 0; 653 if (ssock.savedsize == 0)700 if (ssock.savedsize[ssock.nextwriteind] == 0) 654 701 return 1; 655 if (ssock.nextread - ssock.saved >= ssock.savedsize)656 return 1;702 //if (ssock.nextread - ssock.saved >= ssock.savedsize) 703 // return 1; 657 704 return 0; 658 705 } … … 694 741 */ 695 742 for (i = 0; i < rt->sourcecount; i ++) { 743 int nw; 744 ndag_encap_t *encaphdr; 745 696 746 if (!read_required(rt->sources[i])) { 697 747 if (rt->sources[i].sock != -1) { … … 701 751 } 702 752 703 rt->sources[i].savedsize = 0; 704 rt->sources[i].nextread = rt->sources[i].saved; 705 706 ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved, 753 nw = rt->sources[i].nextwriteind; 754 755 ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw], 707 756 ENCAP_BUFSIZE, MSG_DONTWAIT, 708 757 rt->sources[i].srcaddr->ai_addr, 709 758 &(rt->sources[i].srcaddr->ai_addrlen)); 710 759 if (ret < 0) { 760 /* Nothing to receive right now, but we should still 761 * count as 'ready' if at least one buffer is full */ 711 762 if (errno == EAGAIN || errno == EWOULDBLOCK) { 763 if (readable_data(rt->sources[i])) { 764 readybufs ++; 765 } 712 766 continue; 713 767 } … … 732 786 } 733 787 734 rt->sources[i].savedsize = ret; 788 rt->sources[i].savedsize[nw] = ret; 789 rt->sources[i].nextwriteind = 790 (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS; 735 791 successrecv ++; 736 792 737 793 /* Check that we have a valid nDAG encap record */ 738 if (check_ndag_header(rt->sources[i].saved , ret) !=794 if (check_ndag_header(rt->sources[i].saved[nw], ret) != 739 795 NDAG_PKT_ENCAPERF) { 740 796 fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", … … 747 803 748 804 /* Save the useful info from the encap header */ 749 rt->sources[i].encaphdr = (ndag_encap_t *)750 (rt->sources[i].saved +sizeof(ndag_common_t));805 encaphdr = (ndag_encap_t *)(rt->sources[i].saved[nw] + 806 sizeof(ndag_common_t)); 751 807 752 808 if (rt->sources[i].expectedseq != 0) { 753 809 rt->missing_records += seq_cmp( 754 ntohl( rt->sources[i].encaphdr->seqno),810 ntohl(encaphdr->seqno), 755 811 rt->sources[i].expectedseq); 756 812 } 757 rt->sources[i].expectedseq = 758 ntohl(rt->sources[i].encaphdr->seqno) + 1; 813 rt->sources[i].expectedseq = ntohl(encaphdr->seqno) + 1; 759 814 760 815 /* If all good, skip past the nDAG headers */ 761 rt->sources[i].nextread = rt->sources[i].saved + 762 sizeof(ndag_common_t) + sizeof(ndag_encap_t); 816 if (rt->sources[i].nextread == NULL) { 817 rt->sources[i].nextread = rt->sources[i].saved[0] + 818 sizeof(ndag_common_t) + sizeof(ndag_encap_t); 819 } 763 820 764 821 readybufs += 1; … … 835 892 836 893 for (i = 0; i < rt->sourcecount; i ++) { 837 if ( read_required(rt->sources[i])) {894 if (!readable_data(rt->sources[i])) { 838 895 continue; 839 896 }
Note: See TracChangeset
for help on using the changeset viewer.