Changeset aa7db84


Ignore:
Timestamp:
11/07/17 13:46:40 (4 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
4e427da
Parents:
8fa0167
Message:

Added support for seqno wrapping and improved performance.

Performance improvements are as follows:

  • Use larger receive buffer for the multicast recv socket.
  • Remove select in main receive function with a non-blocking recvfrom.
  • Only sleep if no sources have data available, thus avoiding constant 100% CPU usage due to the above change.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    r8fa0167 raa7db84  
    8989
    9090
     91static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
     92
     93        /* Calculate seq_a - seq_b, taking wraparound into account */
     94        if (seq_a == seq_b) return 0;
     95
     96        if (seq_a > seq_b) {
     97                return (int) (seq_a - seq_b);
     98        }
     99        return (int) (0xffffffff - ((seq_b - seq_a) - 1));
     100}
     101
    91102static inline uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
    92103        ndag_common_t *header = (ndag_common_t *)msgbuf;
     
    123134        char pstr[16];
    124135        struct group_req greq;
     136        int bufsize;
    125137
    126138        int sock;
     
    182194                fprintf(stderr,
    183195                        "Failed to join multicast group %s:%s -- %s\n",
     196                                groupaddr, portstr, strerror(errno));
     197                close(sock);
     198                sock = -1;
     199                goto sockcreateover;
     200        }
     201
     202        bufsize = 16 * 1024 * 1024;
     203        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
     204                                (socklen_t)sizeof(int)) < 0) {
     205
     206                fprintf(stderr,
     207                        "Failed to increase buffer size for multicast group %s:%s -- %s\n",
    184208                                groupaddr, portstr, strerror(errno));
    185209                close(sock);
     
    637661static int receive_from_sockets(recvstream_t *rt) {
    638662
    639         fd_set allgroups;
    640         int maxfd, i, ret, readybufs, availsocks;
    641         struct timeval timeout;
    642 
    643         maxfd = 0;
    644         timeout.tv_sec = 0;
    645         timeout.tv_usec = 10000;
     663        int i, ret, readybufs, availsocks, successrecv;
    646664
    647665        /* TODO maybe need a way to tidy up "dead" sockets and signal back
     
    652670        readybufs = 0;
    653671        availsocks = 0;
    654 
    655         FD_ZERO(&allgroups);
     672        successrecv = 0;
     673
    656674        for (i = 0; i < rt->sourcecount; i ++) {
    657675                if (read_required(rt->sources[i])) {
    658                         FD_SET(rt->sources[i].sock, &allgroups);
    659                         if (rt->sources[i].sock > maxfd) {
    660                                 maxfd = rt->sources[i].sock;
    661                         }
    662676                        availsocks += 1;
    663677                } else if (rt->sources[i].sock != -1) {
     
    679693         * sockets.
    680694         */
    681         if (select(maxfd + 1, &allgroups, NULL, NULL, &timeout) < 0) {
    682                 fprintf(stderr, "Error waiting to receive records: %s\n",
    683                                 strerror(errno));
    684                 return -1;
    685         }
    686 
    687 
    688695        for (i = 0; i < rt->sourcecount; i ++) {
    689696                if (!read_required(rt->sources[i])) {
     
    697704                rt->sources[i].nextread = rt->sources[i].saved;
    698705
    699                 if (!FD_ISSET(rt->sources[i].sock, &allgroups)) {
    700                         continue;
    701                 }
    702 
    703706                ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved,
    704                                 ENCAP_BUFSIZE, 0,
     707                                ENCAP_BUFSIZE, MSG_DONTWAIT,
    705708                                rt->sources[i].srcaddr->ai_addr,
    706709                                &(rt->sources[i].srcaddr->ai_addrlen));
    707710                if (ret < 0) {
     711                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
     712                                continue;
     713                        }
     714
    708715                        fprintf(stderr,
    709                                         "Error receiving encapsulated records from %s:%u -- %s\n",
     716                                        "Error receiving encapsulated records from %s:%u -- %s \n",
    710717                                        rt->sources[i].groupaddr,
    711718                                        rt->sources[i].port,
     
    726733
    727734                rt->sources[i].savedsize = ret;
     735                successrecv ++;
    728736
    729737                /* Check that we have a valid nDAG encap record */
     
    743751
    744752                if (rt->sources[i].expectedseq != 0) {
    745                         if (ntohl(rt->sources[i].encaphdr->seqno) !=
    746                                         rt->sources[i].expectedseq) {
    747                                 /* TODO deal with wrapping */
    748                                 rt->missing_records += (ntohl(rt->sources[i].encaphdr->seqno) - rt->sources[i].expectedseq);
    749                         }
     753                        rt->missing_records += seq_cmp(
     754                                        ntohl(rt->sources[i].encaphdr->seqno),
     755                                        rt->sources[i].expectedseq);
    750756                }
    751757                rt->sources[i].expectedseq =
     
    809815                }
    810816
     817                /* None of our sources have anything available, we can take
     818                 * a short break rather than immediately trying again.
     819                 */
     820                if (block && iserr == 0) {
     821                        usleep(100);
     822                }
     823
    811824        } while (block);
    812825
     
    826839                }
    827840
    828                 daghdr = (dag_record_t *)rt->sources[i].nextread;
     841                daghdr = (dag_record_t *)(rt->sources[i].nextread);
    829842                currentts = bswap_le_to_host64(daghdr->ts);
    830843
Note: See TracChangeset for help on using the changeset viewer.