Changeset b34f924
- Timestamp:
- 11/13/17 15:49:35 (3 years ago)
- Branches:
- cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- 639d952
- Parents:
- d39cd1e
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
rd39cd1e rb34f924 27 27 #define CTRL_BUF_SIZE (10000) 28 28 #define ENCAP_BUFFERS (100) 29 30 #define RECV_BATCH_SIZE (20) 29 31 30 32 #define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data) … … 62 64 uint64_t recordcount; 63 65 66 struct mmsghdr mmsgbufs[RECV_BATCH_SIZE]; 64 67 } streamsock_t; 65 68 … … 490 493 free(src.saved); 491 494 } 495 for (j = 0; j < RECV_BATCH_SIZE; j++) { 496 if (src.mmsgbufs[j].msg_hdr.msg_iov) { 497 free(src.mmsgbufs[j].msg_hdr.msg_iov); 498 } 499 } 492 500 close(src.sock); 493 501 } … … 684 692 } 685 693 694 for (i = 0; i < RECV_BATCH_SIZE; i++) { 695 ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *) 696 malloc(sizeof(struct iovec)); 697 } 698 686 699 ssock->nextread = NULL;; 687 700 ssock->nextreadind = 0; … … 743 756 } 744 757 758 static int init_receivers(streamsock_t *ssock) { 759 760 int wind = ssock->nextwriteind; 761 int i; 762 int avail = 0; 763 764 for (i = 0; i < RECV_BATCH_SIZE; i++) { 765 if (wind == ENCAP_BUFFERS) { 766 wind = 0; 767 } 768 769 if (ssock->savedsize[wind] != 0) { 770 /* No more empty buffers */ 771 break; 772 } 773 774 ssock->mmsgbufs[i].msg_len = 0; 775 ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr; 776 ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen; 777 ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind]; 778 ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE; 779 ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1; 780 ssock->mmsgbufs[i].msg_hdr.msg_control = NULL; 781 ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0; 782 ssock->mmsgbufs[i].msg_hdr.msg_flags = 0; 783 784 avail ++; 785 wind ++; 786 } 787 788 return avail; 789 } 790 791 static int check_ndag_received(streamsock_t *ssock, int index, 792 unsigned int msglen, recvstream_t *rt) { 793 794 ndag_encap_t *encaphdr; 795 ndag_monitor_t *mon; 796 uint8_t rectype; 797 798 /* Check that we have a valid nDAG encap record */ 799 rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen); 800 801 if (rectype == NDAG_PKT_KEEPALIVE) { 802 /* Keep-alive, reset startidle and carry on. Don't 803 * change nextwrite -- we want to overwrite the 804 * keep-alive with usable content. */ 805 return 0; 806 } else if (rectype != NDAG_PKT_ENCAPERF) { 807 fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", 808 ssock->groupaddr, ssock->port); 809 close(ssock->sock); 810 ssock->sock = -1; 811 return -1; 812 } 813 814 ssock->savedsize[index] = msglen; 815 ssock->nextwriteind ++; 816 817 if (ssock->nextwriteind >= ENCAP_BUFFERS) { 818 ssock->nextwriteind = 0; 819 } 820 821 /* Get the useful info from the encap header */ 822 encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t)); 823 824 mon = ssock->monitorptr; 825 826 if (mon->laststart == 0) { 827 mon->laststart = bswap_be_to_host64(encaphdr->started); 828 } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) { 829 mon->laststart = bswap_be_to_host64(encaphdr->started); 830 reset_expected_seqs(rt, mon); 831 832 /* TODO what is a good way to indicate this to clients? 833 * set the loss counter in the ERF header? a bit rude? 834 * use another bit in the ERF header? 835 * add a queryable flag to libtrace_packet_t? 836 */ 837 838 } 839 840 if (ssock->expectedseq != 0) { 841 rt->missing_records += seq_cmp( 842 ntohl(encaphdr->seqno), ssock->expectedseq); 843 } 844 ssock->expectedseq = ntohl(encaphdr->seqno) + 1; 845 if (ssock->expectedseq == 0) { 846 ssock->expectedseq ++; 847 } 848 849 if (ssock->nextread == NULL) { 850 /* If this is our first read, set up 'nextread' 851 * by skipping past the nDAG headers */ 852 ssock->nextread = ssock->saved[0] + 853 sizeof(ndag_common_t) + sizeof(ndag_encap_t); 854 } 855 return 1; 856 857 } 858 859 static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, 860 int *gottime, recvstream_t *rt) { 861 862 int avail, ret, ndagstat, i; 863 int toret = 0; 864 struct timespec ts; 865 866 if (ssock->sock == -1) { 867 return 0; 868 } 869 870 avail = init_receivers(ssock); 871 872 if (avail == 0) { 873 /* All buffers were full, so something must be 874 * available. */ 875 return 1; 876 } 877 878 ts.tv_sec = 0; 879 ts.tv_nsec = 10000; 880 881 ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail, 882 MSG_DONTWAIT, &ts); 883 884 /* 885 ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw], 886 ENCAP_BUFSIZE, MSG_DONTWAIT, 887 rt->sources[i].srcaddr->ai_addr, 888 &(rt->sources[i].srcaddr->ai_addrlen)); 889 */ 890 if (ret < 0) { 891 /* Nothing to receive right now, but we should still 892 * count as 'ready' if at least one buffer is full */ 893 if (errno == EAGAIN || errno == EWOULDBLOCK) { 894 if (readable_data(ssock)) { 895 toret = 1; 896 } 897 if (!(*gottime)) { 898 gettimeofday(tv, NULL); 899 *gottime = 1; 900 } 901 if (ssock->startidle == 0) { 902 ssock->startidle = tv->tv_sec; 903 } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) { 904 fprintf(stderr, 905 "Closing channel %s:%u due to inactivity.\n", 906 ssock->groupaddr, 907 ssock->port); 908 909 close(ssock->sock); 910 ssock->sock = -1; 911 } 912 } else { 913 914 fprintf(stderr, 915 "Error receiving encapsulated records from %s:%u -- %s \n", 916 ssock->groupaddr, ssock->port, 917 strerror(errno)); 918 close(ssock->sock); 919 ssock->sock = -1; 920 } 921 return toret; 922 } 923 ssock->startidle = 0; 924 for (i = 0; i < ret; i++) { 925 ndagstat = check_ndag_received(ssock, ssock->nextwriteind, 926 ssock->mmsgbufs[i].msg_len, rt); 927 if (ndagstat == -1) { 928 break; 929 } 930 931 if (ndagstat == 1) { 932 toret = 1; 933 } 934 } 935 936 return toret; 937 } 938 745 939 static int receive_from_sockets(recvstream_t *rt) { 746 940 747 int i, re t, readybufs, gottime;941 int i, readybufs, gottime; 748 942 struct timeval tv; 749 uint8_t rectype;750 943 751 944 readybufs = 0; … … 753 946 754 947 for (i = 0; i < rt->sourcecount; i ++) { 755 int nw; 756 ndag_encap_t *encaphdr; 757 ndag_monitor_t *mon; 758 759 if (rt->sources[i].sock == -1) { 760 continue; 761 } else if (rt->sources[i].savedsize[rt->sources[i].nextwriteind] != 0) { 762 readybufs ++; 763 continue; 764 } 765 766 nw = rt->sources[i].nextwriteind; 767 768 ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw], 769 ENCAP_BUFSIZE, MSG_DONTWAIT, 770 rt->sources[i].srcaddr->ai_addr, 771 &(rt->sources[i].srcaddr->ai_addrlen)); 772 if (ret < 0) { 773 /* Nothing to receive right now, but we should still 774 * count as 'ready' if at least one buffer is full */ 775 if (errno == EAGAIN || errno == EWOULDBLOCK) { 776 if (readable_data(&(rt->sources[i]))) { 777 readybufs ++; 778 } 779 if (!gottime) { 780 gettimeofday(&tv, NULL); 781 } 782 if (rt->sources[i].startidle == 0) { 783 rt->sources[i].startidle = tv.tv_sec; 784 } else if (tv.tv_sec - rt->sources[i].startidle > NDAG_IDLE_TIMEOUT) { 785 fprintf(stderr, "Closing channel %s:%u due to inactivity.\n", 786 rt->sources[i].groupaddr, 787 rt->sources[i].port); 788 789 close(rt->sources[i].sock); 790 rt->sources[i].sock = -1; 791 } 792 continue; 793 } 794 795 fprintf(stderr, 796 "Error receiving encapsulated records from %s:%u -- %s \n", 797 rt->sources[i].groupaddr, 798 rt->sources[i].port, 799 strerror(errno)); 800 close(rt->sources[i].sock); 801 rt->sources[i].sock = -1; 802 continue; 803 } 804 rt->sources[i].startidle = 0; 805 806 /* Check that we have a valid nDAG encap record */ 807 rectype = check_ndag_header(rt->sources[i].saved[nw], ret); 808 809 if (rectype == NDAG_PKT_KEEPALIVE) { 810 /* Keep-alive, reset startidle and carry on. Don't 811 * change nextwrite -- we want to overwrite the 812 * keep-alive with usable content. */ 813 continue; 814 } else if (rectype != NDAG_PKT_ENCAPERF) { 815 fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", 816 rt->sources[i].groupaddr, 817 rt->sources[i].port); 818 close(rt->sources[i].sock); 819 rt->sources[i].sock = -1; 820 continue; 821 } 822 823 rt->sources[i].savedsize[nw] = ret; 824 rt->sources[i].nextwriteind = 825 (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS; 826 827 /* Get the useful info from the encap header */ 828 encaphdr = (ndag_encap_t *)(rt->sources[i].saved[nw] + 829 sizeof(ndag_common_t)); 830 831 mon = rt->sources[i].monitorptr; 832 assert(mon); 833 834 if (mon->laststart == 0) { 835 mon->laststart = bswap_be_to_host64(encaphdr->started); 836 } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) { 837 mon->laststart = bswap_be_to_host64(encaphdr->started); 838 reset_expected_seqs(rt, mon); 839 840 /* TODO what is a good way to indicate this to clients? 841 * set the loss counter in the ERF header? a bit rude? 842 * use another bit in the ERF header? 843 * add a queryable flag to libtrace_packet_t? 844 */ 845 846 } 847 848 if (rt->sources[i].expectedseq != 0) { 849 rt->missing_records += seq_cmp( 850 ntohl(encaphdr->seqno), 851 rt->sources[i].expectedseq); 852 } 853 rt->sources[i].expectedseq = ntohl(encaphdr->seqno) + 1; 854 if (rt->sources[i].expectedseq == 0) { 855 rt->sources[i].expectedseq ++; 856 } 857 858 if (rt->sources[i].nextread == NULL) { 859 /* If this is our first read, set up 'nextread' 860 * by skipping past the nDAG headers */ 861 rt->sources[i].nextread = rt->sources[i].saved[0] + 862 sizeof(ndag_common_t) + sizeof(ndag_encap_t); 863 } 864 865 readybufs ++; 948 readybufs += receive_from_single_socket(&(rt->sources[i]), 949 &tv, &gottime, rt); 866 950 } 867 951
Note: See TracChangeset
for help on using the changeset viewer.