- Timestamp:
- 11/14/17 11:47:17 (3 years ago)
- Branches:
- cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- 7e09388
- Parents:
- eb70703
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
reb70703 r07de3c6 64 64 uint64_t recordcount; 65 65 66 int bufavail; 67 66 68 struct mmsghdr mmsgbufs[RECV_BATCH_SIZE]; 67 69 } streamsock_t; … … 598 600 * move on. */ 599 601 ssock->savedsize[nr] = 0; 602 ssock->bufavail ++; 603 604 assert(ssock->bufavail > 0 && ssock->bufavail <= ENCAP_BUFFERS); 600 605 nr ++; 601 606 if (nr == ENCAP_BUFFERS) { … … 688 693 ssock->monitorptr = mon; 689 694 ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS); 695 ssock->bufavail = ENCAP_BUFFERS; 690 696 ssock->startidle = 0; 691 697 … … 767 773 } 768 774 769 static voidinit_receivers(streamsock_t *ssock, int required) {775 static int init_receivers(streamsock_t *ssock, int required) { 770 776 771 777 int wind = ssock->nextwriteind; … … 773 779 774 780 for (i = 0; i < required; i++) { 781 if (i >= RECV_BATCH_SIZE) { 782 break; 783 } 784 775 785 ssock->mmsgbufs[i].msg_len = 0; 776 786 ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind]; … … 780 790 wind ++; 781 791 } 782 } 783 784 static int count_receivers(streamsock_t *ssock) { 785 786 int wind = ssock->nextwriteind; 787 int i; 788 int avail = 0; 789 790 for (i = 0; i < RECV_BATCH_SIZE; i++) { 791 if (wind == ENCAP_BUFFERS) { 792 wind = 0; 793 } 794 795 if (ssock->savedsize[wind] != 0) { 796 /* No more empty buffers */ 797 break; 798 } 799 800 avail ++; 801 wind ++; 802 } 803 804 return avail; 792 793 return i; 805 794 } 806 795 … … 830 819 ssock->savedsize[index] = msglen; 831 820 ssock->nextwriteind ++; 821 ssock->bufavail --; 822 823 assert(ssock->bufavail >= 0); 832 824 833 825 if (ssock->nextwriteind >= ENCAP_BUFFERS) { … … 876 868 int *gottime, recvstream_t *rt) { 877 869 878 int avail, ret, ndagstat, i;870 int ret, ndagstat, i, avail; 879 871 int toret = 0; 880 872 … … 883 875 } 884 876 885 avail = count_receivers(ssock); 886 887 if (avail == 0) { 877 if (ssock->bufavail == 0) { 888 878 /* All buffers were full, so something must be 889 879 * available. */ … … 891 881 } 892 882 893 if ( avail < RECV_BATCH_SIZE / 2) {883 if (ssock->bufavail < RECV_BATCH_SIZE / 2) { 894 884 return 1; 895 885 } 896 886 897 init_receivers(ssock,avail);887 avail = init_receivers(ssock, ssock->bufavail); 898 888 899 889 ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
Note: See TracChangeset
for help on using the changeset viewer.