Changeset de825b5
- Timestamp:
- 11/07/17 15:31:47 (3 years ago)
- Branches:
- cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- e68325b
- Parents:
- 4e427da
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
r4e427da rde825b5 23 23 #include "format_ndag.h" 24 24 25 #define NDAG_IDLE_TIMEOUT (600) 25 26 #define ENCAP_BUFSIZE (10000) 26 27 #define CTRL_BUF_SIZE (10000) … … 53 54 int nextwriteind; 54 55 int savedsize[ENCAP_BUFFERS]; 56 uint32_t startidle; 55 57 uint64_t recordcount; 56 58 … … 615 617 int i; 616 618 619 /* TODO consider replacing this with a list or vector so we can 620 * easily remove sources that are no longer in use, rather than 621 * just setting the sock to -1 and having to check them every 622 * time we want to read a packet. 623 */ 617 624 if (rt->sourcecount == 0) { 618 625 rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10); … … 636 643 ssock->monitorid = src.monitor; 637 644 ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS); 638 645 ssock->startidle = 0; 639 646 640 647 for (i = 0; i < ENCAP_BUFFERS; i++) { … … 708 715 static int receive_from_sockets(recvstream_t *rt) { 709 716 710 int i, ret, readybufs, availsocks, successrecv; 711 712 /* TODO maybe need a way to tidy up "dead" sockets and signal back 713 * to the control thread that we've killed the socket for a particular 714 * port. 715 */ 717 int i, ret, readybufs, gottime; 718 struct timeval tv; 716 719 717 720 readybufs = 0; 718 availsocks = 0; 719 successrecv = 0; 720 721 for (i = 0; i < rt->sourcecount; i ++) { 722 if (read_required(rt->sources[i])) { 723 availsocks += 1; 724 } else if (rt->sources[i].sock != -1) { 725 readybufs += 1; 726 availsocks += 1; 727 } 728 } 729 730 /* If all of our sockets already have data sitting in their 731 * buffers then we can save ourselves some 'select'ing. 732 */ 733 if (availsocks == readybufs) { 734 return readybufs; 735 } 736 737 /* Otherwise, at least one active socket has an empty buffer so 738 * we better try to read some data from those sockets (just in 739 * case the correct 'next' packet is waiting on one of those 740 * sockets. 741 */ 721 gottime = 0; 722 742 723 for (i = 0; i < rt->sourcecount; i ++) { 743 724 int nw; … … 764 745 readybufs ++; 765 746 } 747 if (!gottime) { 748 gettimeofday(&tv, NULL); 749 } 750 if (rt->sources[i].startidle == 0) { 751 rt->sources[i].startidle = tv.tv_sec; 752 } else if (tv.tv_sec - rt->sources[i].startidle > NDAG_IDLE_TIMEOUT) { 753 fprintf(stderr, "Closing channel %s:%u due to inactivity.\n", 754 rt->sources[i].groupaddr, 755 rt->sources[i].port); 756 757 close(rt->sources[i].sock); 758 rt->sources[i].sock = -1; 759 } 766 760 continue; 767 761 } … … 776 770 continue; 777 771 } 778 779 if (ret == 0) { 780 fprintf(stderr, "Received zero bytes on the channel for %s:%u.\n", 781 rt->sources[i].groupaddr, 782 rt->sources[i].port); 783 close(rt->sources[i].sock); 784 rt->sources[i].sock = -1; 785 continue; 786 } 787 788 rt->sources[i].savedsize[nw] = ret; 789 rt->sources[i].nextwriteind = 790 (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS; 791 successrecv ++; 772 rt->sources[i].startidle = 0; 792 773 793 774 /* Check that we have a valid nDAG encap record */ … … 801 782 continue; 802 783 } 784 785 rt->sources[i].savedsize[nw] = ret; 786 rt->sources[i].nextwriteind = 787 (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS; 803 788 804 789 /* Save the useful info from the encap header */
Note: See TracChangeset
for help on using the changeset viewer.