Changeset aa7db84
- Timestamp:
- 11/07/17 13:46:40 (3 years ago)
- Branches:
- cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- 4e427da
- Parents:
- 8fa0167
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
r8fa0167 raa7db84 89 89 90 90 91 static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) { 92 93 /* Calculate seq_a - seq_b, taking wraparound into account */ 94 if (seq_a == seq_b) return 0; 95 96 if (seq_a > seq_b) { 97 return (int) (seq_a - seq_b); 98 } 99 return (int) (0xffffffff - ((seq_b - seq_a) - 1)); 100 } 101 91 102 static inline uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) { 92 103 ndag_common_t *header = (ndag_common_t *)msgbuf; … … 123 134 char pstr[16]; 124 135 struct group_req greq; 136 int bufsize; 125 137 126 138 int sock; … … 182 194 fprintf(stderr, 183 195 "Failed to join multicast group %s:%s -- %s\n", 196 groupaddr, portstr, strerror(errno)); 197 close(sock); 198 sock = -1; 199 goto sockcreateover; 200 } 201 202 bufsize = 16 * 1024 * 1024; 203 if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize, 204 (socklen_t)sizeof(int)) < 0) { 205 206 fprintf(stderr, 207 "Failed to increase buffer size for multicast group %s:%s -- %s\n", 184 208 groupaddr, portstr, strerror(errno)); 185 209 close(sock); … … 637 661 static int receive_from_sockets(recvstream_t *rt) { 638 662 639 fd_set allgroups; 640 int maxfd, i, ret, readybufs, availsocks; 641 struct timeval timeout; 642 643 maxfd = 0; 644 timeout.tv_sec = 0; 645 timeout.tv_usec = 10000; 663 int i, ret, readybufs, availsocks, successrecv; 646 664 647 665 /* TODO maybe need a way to tidy up "dead" sockets and signal back … … 652 670 readybufs = 0; 653 671 availsocks = 0; 654 655 FD_ZERO(&allgroups); 672 successrecv = 0; 673 656 674 for (i = 0; i < rt->sourcecount; i ++) { 657 675 if (read_required(rt->sources[i])) { 658 FD_SET(rt->sources[i].sock, &allgroups);659 if (rt->sources[i].sock > maxfd) {660 maxfd = rt->sources[i].sock;661 }662 676 availsocks += 1; 663 677 } else if (rt->sources[i].sock != -1) { … … 679 693 * sockets. 680 694 */ 681 if (select(maxfd + 1, &allgroups, NULL, NULL, &timeout) < 0) {682 fprintf(stderr, "Error waiting to receive records: %s\n",683 strerror(errno));684 return -1;685 }686 687 688 695 for (i = 0; i < rt->sourcecount; i ++) { 689 696 if (!read_required(rt->sources[i])) { … … 697 704 rt->sources[i].nextread = rt->sources[i].saved; 698 705 699 if (!FD_ISSET(rt->sources[i].sock, &allgroups)) {700 continue;701 }702 703 706 ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved, 704 ENCAP_BUFSIZE, 0,707 ENCAP_BUFSIZE, MSG_DONTWAIT, 705 708 rt->sources[i].srcaddr->ai_addr, 706 709 &(rt->sources[i].srcaddr->ai_addrlen)); 707 710 if (ret < 0) { 711 if (errno == EAGAIN || errno == EWOULDBLOCK) { 712 continue; 713 } 714 708 715 fprintf(stderr, 709 "Error receiving encapsulated records from %s:%u -- %s \n",716 "Error receiving encapsulated records from %s:%u -- %s \n", 710 717 rt->sources[i].groupaddr, 711 718 rt->sources[i].port, … … 726 733 727 734 rt->sources[i].savedsize = ret; 735 successrecv ++; 728 736 729 737 /* Check that we have a valid nDAG encap record */ … … 743 751 744 752 if (rt->sources[i].expectedseq != 0) { 745 if (ntohl(rt->sources[i].encaphdr->seqno) != 746 rt->sources[i].expectedseq) { 747 /* TODO deal with wrapping */ 748 rt->missing_records += (ntohl(rt->sources[i].encaphdr->seqno) - rt->sources[i].expectedseq); 749 } 753 rt->missing_records += seq_cmp( 754 ntohl(rt->sources[i].encaphdr->seqno), 755 rt->sources[i].expectedseq); 750 756 } 751 757 rt->sources[i].expectedseq = … … 809 815 } 810 816 817 /* None of our sources have anything available, we can take 818 * a short break rather than immediately trying again. 819 */ 820 if (block && iserr == 0) { 821 usleep(100); 822 } 823 811 824 } while (block); 812 825 … … 826 839 } 827 840 828 daghdr = (dag_record_t *) rt->sources[i].nextread;841 daghdr = (dag_record_t *)(rt->sources[i].nextread); 829 842 currentts = bswap_le_to_host64(daghdr->ts); 830 843
Note: See TracChangeset
for help on using the changeset viewer.