Changeset 3004d6c for lib


Ignore:
Timestamp:
02/14/18 17:11:08 (3 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, etsilive, master, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
a857389
Parents:
7ebf768
Message:

format_ndag: use select to skip sockets with no data

A non-blocking call to recvmmsg still requires a lot of initial
effort, e.g. setting up buffers etc., but there is no need to do
so if we use select() to tell us in advance which sockets are
worth trying.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    r7ebf768 r3004d6c  
    113113        uint64_t missing_records;
    114114        uint64_t received_packets;
     115
     116        fd_set allsocks;
     117        int maxfd;
    115118} recvstream_t;
    116119
     
    319322
    320323}
    321        
     324
    322325static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
    323326                int msgsize, uint16_t *ptmap) {
     
    485488                FORMAT_DATA->receivers[i].received_packets = 0;
    486489                FORMAT_DATA->receivers[i].missing_records = 0;
     490                FD_ZERO(&(FORMAT_DATA->receivers[i].allsocks));
     491                FORMAT_DATA->receivers[i].maxfd = -1;
    487492
    488493                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
     
    766771        ssock->recordcount = 0;
    767772        rt->sourcecount += 1;
     773        FD_SET(ssock->sock, &(rt->allsocks));
     774        if (ssock->sock > rt->maxfd) {
     775                rt->maxfd = ssock->sock;
     776        }
    768777
    769778        fprintf(stderr, "Added new stream %s:%u to thread %d\n",
     
    872881                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
    873882                                ssock->groupaddr, ssock->port);
     883                FD_CLR(ssock->sock, &(rt->allsocks));
    874884                close(ssock->sock);
    875885                ssock->sock = -1;
     
    977987                                        ssock->port);
    978988
     989                                FD_CLR(ssock->sock, &(rt->allsocks));
    979990                                close(ssock->sock);
    980991                                ssock->sock = -1;
     
    986997                                ssock->groupaddr, ssock->port,
    987998                                strerror(errno));
     999                        FD_CLR(ssock->sock, &(rt->allsocks));
    9881000                        close(ssock->sock);
    9891001                        ssock->sock = -1;
     
    10221034        int i, readybufs, gottime;
    10231035        struct timeval tv;
     1036        fd_set fds;
     1037        struct timeval zerotv;
    10241038
    10251039        readybufs = 0;
    10261040        gottime = 0;
    10271041
    1028         for (i = 0; i < rt->sourcecount; i ++) {
     1042        fds = rt->allsocks;
     1043
     1044        if (rt->maxfd == -1) {
     1045                return 0;
     1046        }
     1047
     1048        zerotv.tv_sec = 0;
     1049        zerotv.tv_usec = 0;
     1050
     1051        if (select(rt->maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
     1052                /* log the error? XXX */
     1053                return -1;
     1054        }
     1055
     1056        for (i = 0; i < rt->sourcecount; i++) {
     1057                if (!FD_ISSET(rt->sources[i].sock, &fds)) {
     1058                        continue;
     1059                }
    10291060                readybufs += receive_from_single_socket(&(rt->sources[i]),
    10301061                                &tv, &gottime, rt);
     
    11251156                }
    11261157                return NULL;
    1127         }       
     1158        }
    11281159
    11291160
     
    12201251        } while (1);
    12211252
    1222        
    12231253        for (i = 0; i < rt->sourcecount; i++) {
    12241254                streamsock_t *src = &(rt->sources[i]);
Note: See TracChangeset for help on using the changeset viewer.