Changeset de825b5


Ignore:
Timestamp:
11/07/17 15:31:47 (4 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
e68325b
Parents:
4e427da
Message:

Add idle timeout for ndag receiving sockets.

Remove extraneous read_required checks in receive_from_sockets().
This used to make sense when we had a select() that we wanted to
skip whenever possible, but now it is just extra effort for no
benefit.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    r4e427da rde825b5  
    2323#include "format_ndag.h"
    2424
     25#define NDAG_IDLE_TIMEOUT (600)
    2526#define ENCAP_BUFSIZE (10000)
    2627#define CTRL_BUF_SIZE (10000)
     
    5354        int nextwriteind;
    5455        int savedsize[ENCAP_BUFFERS];
     56        uint32_t startidle;
    5557        uint64_t recordcount;
    5658
     
    615617        int i;
    616618
     619        /* TODO consider replacing this with a list or vector so we can
     620         * easily remove sources that are no longer in use, rather than
     621         * just setting the sock to -1 and having to check them every
     622         * time we want to read a packet.
     623         */
    617624        if (rt->sourcecount == 0) {
    618625                rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
     
    636643        ssock->monitorid = src.monitor;
    637644        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
    638 
     645        ssock->startidle = 0;
    639646
    640647        for (i = 0; i < ENCAP_BUFFERS; i++) {
     
    708715static int receive_from_sockets(recvstream_t *rt) {
    709716
    710         int i, ret, readybufs, availsocks, successrecv;
    711 
    712         /* TODO maybe need a way to tidy up "dead" sockets and signal back
    713          * to the control thread that we've killed the socket for a particular
    714          * port.
    715          */
     717        int i, ret, readybufs, gottime;
     718        struct timeval tv;
    716719
    717720        readybufs = 0;
    718         availsocks = 0;
    719         successrecv = 0;
    720 
    721         for (i = 0; i < rt->sourcecount; i ++) {
    722                 if (read_required(rt->sources[i])) {
    723                         availsocks += 1;
    724                 } else if (rt->sources[i].sock != -1) {
    725                         readybufs += 1;
    726                         availsocks += 1;
    727                 }
    728         }
    729 
    730         /* If all of our sockets already have data sitting in their
    731          * buffers then we can save ourselves some 'select'ing.
    732          */
    733         if (availsocks == readybufs) {
    734                 return readybufs;
    735         }
    736 
    737         /* Otherwise, at least one active socket has an empty buffer so
    738          * we better try to read some data from those sockets (just in
    739          * case the correct 'next' packet is waiting on one of those
    740          * sockets.
    741          */
     721        gottime = 0;
     722
    742723        for (i = 0; i < rt->sourcecount; i ++) {
    743724                int nw;
     
    764745                                        readybufs ++;
    765746                                }
     747                                if (!gottime) {
     748                                        gettimeofday(&tv, NULL);
     749                                }
     750                                if (rt->sources[i].startidle == 0) {
     751                                        rt->sources[i].startidle = tv.tv_sec;
     752                                } else if (tv.tv_sec - rt->sources[i].startidle > NDAG_IDLE_TIMEOUT) {
     753                                        fprintf(stderr, "Closing channel %s:%u due to inactivity.\n",
     754                                                        rt->sources[i].groupaddr,
     755                                                        rt->sources[i].port);
     756
     757                                        close(rt->sources[i].sock);
     758                                        rt->sources[i].sock = -1;
     759                                }
    766760                                continue;
    767761                        }
     
    776770                        continue;
    777771                }
    778 
    779                 if (ret == 0) {
    780                         fprintf(stderr, "Received zero bytes on the channel for %s:%u.\n",
    781                                         rt->sources[i].groupaddr,
    782                                         rt->sources[i].port);
    783                         close(rt->sources[i].sock);
    784                         rt->sources[i].sock = -1;
    785                         continue;
    786                 }
    787 
    788                 rt->sources[i].savedsize[nw] = ret;
    789                 rt->sources[i].nextwriteind =
    790                                 (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS;
    791                 successrecv ++;
     772                rt->sources[i].startidle = 0;
    792773
    793774                /* Check that we have a valid nDAG encap record */
     
    801782                        continue;
    802783                }
     784
     785                rt->sources[i].savedsize[nw] = ret;
     786                rt->sources[i].nextwriteind =
     787                                (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS;
    803788
    804789                /* Save the useful info from the encap header */
Note: See TracChangeset for help on using the changeset viewer.