Changeset 7c33187 for lib


Ignore:
Timestamp:
03/12/18 17:09:34 (3 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, etsilive, master, rc-4.0.4, ringdecrementfix, ringperformance
Children:
7bdf6d1
Parents:
c1205bd
Message:

Fix problems with buffers filling up in format_ndag.c

If there is no data available for reading on the fd, we still
need to treat the stream as "ready" for reading if there are any
filled buffers. Failure to do so can lead to incorrectly assuming
that all sockets are inactive, which will put the thread to sleep.

Also added a slight performance enhancement where we avoid
calling select() if all of the buffers are nearly full.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    r3004d6c r7c33187  
    114114        uint64_t received_packets;
    115115
    116         fd_set allsocks;
    117116        int maxfd;
    118117} recvstream_t;
     
    488487                FORMAT_DATA->receivers[i].received_packets = 0;
    489488                FORMAT_DATA->receivers[i].missing_records = 0;
    490                 FD_ZERO(&(FORMAT_DATA->receivers[i].allsocks));
    491489                FORMAT_DATA->receivers[i].maxfd = -1;
    492490
     
    771769        ssock->recordcount = 0;
    772770        rt->sourcecount += 1;
    773         FD_SET(ssock->sock, &(rt->allsocks));
    774771        if (ssock->sock > rt->maxfd) {
    775772                rt->maxfd = ssock->sock;
     
    881878                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
    882879                                ssock->groupaddr, ssock->port);
    883                 FD_CLR(ssock->sock, &(rt->allsocks));
    884880                close(ssock->sock);
    885881                ssock->sock = -1;
     
    943939#if HAVE_RECVMMSG
    944940        int i;
    945 #endif
    946 
    947         if (ssock->sock == -1) {
    948                 return 0;
    949         }
    950 
    951 #if HAVE_RECVMMSG
    952         /* Plenty of full buffers, just use the packets in those */
    953         if (ssock->bufavail < RECV_BATCH_SIZE / 2) {
    954                 return 1;
    955         }
    956 #else
    957         if (ssock->bufavail == 0) {
    958                 return 1;
    959         }
    960941#endif
    961942
     
    987968                                        ssock->port);
    988969
    989                                 FD_CLR(ssock->sock, &(rt->allsocks));
    990970                                close(ssock->sock);
    991971                                ssock->sock = -1;
     
    997977                                ssock->groupaddr, ssock->port,
    998978                                strerror(errno));
    999                         FD_CLR(ssock->sock, &(rt->allsocks));
    1000979                        close(ssock->sock);
    1001980                        ssock->sock = -1;
     
    10351014        struct timeval tv;
    10361015        fd_set fds;
     1016        int maxfd = 0;
    10371017        struct timeval zerotv;
    10381018
     
    10401020        gottime = 0;
    10411021
    1042         fds = rt->allsocks;
     1022        FD_ZERO(&fds);
    10431023
    10441024        if (rt->maxfd == -1) {
     
    10491029        zerotv.tv_usec = 0;
    10501030
    1051         if (select(rt->maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
     1031        for (i = 0; i < rt->sourcecount; i++) {
     1032                if (rt->sources[i].sock == -1) {
     1033                        continue;
     1034                }
     1035
     1036#if HAVE_RECVMMSG
     1037                /* Plenty of full buffers, just use the packets in those */
     1038                if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
     1039                        readybufs ++;
     1040                        continue;
     1041                }
     1042#else
     1043                if (rt->sources[i].bufavail == 0) {
     1044                        readybufs ++;
     1045                        continue;
     1046                }
     1047#endif
     1048                FD_SET(rt->sources[i].sock, &fds);
     1049                if (maxfd < rt->sources[i].sock) {
     1050                        maxfd = rt->sources[i].sock;
     1051                }
     1052        }
     1053
     1054
     1055        if (maxfd <= 0) {
     1056                return readybufs;
     1057        }
     1058
     1059        if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
    10521060                /* log the error? XXX */
    10531061                return -1;
     
    10561064        for (i = 0; i < rt->sourcecount; i++) {
    10571065                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
     1066                        if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
     1067                                readybufs ++;
     1068                        }
    10581069                        continue;
    10591070                }
Note: See TracChangeset for help on using the changeset viewer.