Changeset b34f924


Ignore:
Timestamp:
11/13/17 15:49:35 (4 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
639d952
Parents:
d39cd1e
Message:

Replace recvfrom with recvmmsg in format_ndag

This should save us some per-system-call related performance costs,
as now we can call one system call per batch of received packets
rather than always calling a system call per received packet.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    rd39cd1e rb34f924  
    2727#define CTRL_BUF_SIZE (10000)
    2828#define ENCAP_BUFFERS (100)
     29
     30#define RECV_BATCH_SIZE (20)
    2931
    3032#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
     
    6264        uint64_t recordcount;
    6365
     66        struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
    6467} streamsock_t;
    6568
     
    490493                        free(src.saved);
    491494                }
     495                for (j = 0; j < RECV_BATCH_SIZE; j++) {
     496                        if (src.mmsgbufs[j].msg_hdr.msg_iov) {
     497                                free(src.mmsgbufs[j].msg_hdr.msg_iov);
     498                        }
     499                }
    492500                close(src.sock);
    493501        }
     
    684692        }
    685693
     694        for (i = 0; i < RECV_BATCH_SIZE; i++) {
     695                ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
     696                                malloc(sizeof(struct iovec));
     697        }
     698
    686699        ssock->nextread = NULL;;
    687700        ssock->nextreadind = 0;
     
    743756}
    744757
     758static int init_receivers(streamsock_t *ssock) {
     759
     760        int wind = ssock->nextwriteind;
     761        int i;
     762        int avail = 0;
     763
     764        for (i = 0; i < RECV_BATCH_SIZE; i++) {
     765                if (wind == ENCAP_BUFFERS) {
     766                        wind = 0;
     767                }
     768
     769                if (ssock->savedsize[wind] != 0) {
     770                        /* No more empty buffers */
     771                        break;
     772                }
     773
     774                ssock->mmsgbufs[i].msg_len = 0;
     775                ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
     776                ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
     777                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
     778                ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
     779                ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
     780                ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
     781                ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
     782                ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
     783
     784                avail ++;
     785                wind ++;
     786        }
     787
     788        return avail;
     789}
     790
     791static int check_ndag_received(streamsock_t *ssock, int index,
     792                unsigned int msglen, recvstream_t *rt) {
     793
     794        ndag_encap_t *encaphdr;
     795        ndag_monitor_t *mon;
     796        uint8_t rectype;
     797
     798        /* Check that we have a valid nDAG encap record */
     799        rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
     800
     801        if (rectype == NDAG_PKT_KEEPALIVE) {
     802                /* Keep-alive, reset startidle and carry on. Don't
     803                 * change nextwrite -- we want to overwrite the
     804                 * keep-alive with usable content. */
     805                return 0;
     806        } else if (rectype != NDAG_PKT_ENCAPERF) {
     807                fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
     808                                ssock->groupaddr, ssock->port);
     809                close(ssock->sock);
     810                ssock->sock = -1;
     811                return -1;
     812        }
     813
     814        ssock->savedsize[index] = msglen;
     815        ssock->nextwriteind ++;
     816
     817        if (ssock->nextwriteind >= ENCAP_BUFFERS) {
     818                ssock->nextwriteind = 0;
     819        }
     820
     821        /* Get the useful info from the encap header */
     822        encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
     823
     824        mon = ssock->monitorptr;
     825
     826        if (mon->laststart == 0) {
     827                mon->laststart = bswap_be_to_host64(encaphdr->started);
     828        } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
     829                mon->laststart = bswap_be_to_host64(encaphdr->started);
     830                reset_expected_seqs(rt, mon);
     831
     832                /* TODO what is a good way to indicate this to clients?
     833                 * set the loss counter in the ERF header? a bit rude?
     834                 * use another bit in the ERF header?
     835                 * add a queryable flag to libtrace_packet_t?
     836                 */
     837
     838        }
     839
     840        if (ssock->expectedseq != 0) {
     841                rt->missing_records += seq_cmp(
     842                                ntohl(encaphdr->seqno), ssock->expectedseq);
     843        }
     844        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
     845        if (ssock->expectedseq == 0) {
     846                ssock->expectedseq ++;
     847        }
     848
     849        if (ssock->nextread == NULL) {
     850                /* If this is our first read, set up 'nextread'
     851                 * by skipping past the nDAG headers */
     852                ssock->nextread = ssock->saved[0] +
     853                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
     854        }
     855        return 1;
     856
     857}
     858
     859static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
     860                int *gottime, recvstream_t *rt) {
     861
     862        int avail, ret, ndagstat, i;
     863        int toret = 0;
     864        struct timespec ts;
     865
     866        if (ssock->sock == -1) {
     867                return 0;
     868        }
     869
     870        avail = init_receivers(ssock);
     871
     872        if (avail == 0) {
     873                /* All buffers were full, so something must be
     874                 * available. */
     875                return 1;
     876        }
     877
     878        ts.tv_sec = 0;
     879        ts.tv_nsec = 10000;
     880
     881        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
     882                        MSG_DONTWAIT, &ts);
     883
     884        /*
     885           ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw],
     886           ENCAP_BUFSIZE, MSG_DONTWAIT,
     887           rt->sources[i].srcaddr->ai_addr,
     888           &(rt->sources[i].srcaddr->ai_addrlen));
     889         */
     890        if (ret < 0) {
     891                /* Nothing to receive right now, but we should still
     892                 * count as 'ready' if at least one buffer is full */
     893                if (errno == EAGAIN || errno == EWOULDBLOCK) {
     894                        if (readable_data(ssock)) {
     895                                toret = 1;
     896                        }
     897                        if (!(*gottime)) {
     898                                gettimeofday(tv, NULL);
     899                                *gottime = 1;
     900                        }
     901                        if (ssock->startidle == 0) {
     902                                ssock->startidle = tv->tv_sec;
     903                        } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
     904                                fprintf(stderr,
     905                                        "Closing channel %s:%u due to inactivity.\n",
     906                                        ssock->groupaddr,
     907                                        ssock->port);
     908
     909                                close(ssock->sock);
     910                                ssock->sock = -1;
     911                        }
     912                } else {
     913
     914                        fprintf(stderr,
     915                                "Error receiving encapsulated records from %s:%u -- %s \n",
     916                                ssock->groupaddr, ssock->port,
     917                                strerror(errno));
     918                        close(ssock->sock);
     919                        ssock->sock = -1;
     920                }
     921                return toret;
     922        }
     923        ssock->startidle = 0;
     924        for (i = 0; i < ret; i++) {
     925                ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
     926                                ssock->mmsgbufs[i].msg_len, rt);
     927                if (ndagstat == -1) {
     928                        break;
     929                }
     930
     931                if (ndagstat == 1) {
     932                        toret = 1;
     933                }
     934        }
     935
     936        return toret;
     937}
     938
    745939static int receive_from_sockets(recvstream_t *rt) {
    746940
    747         int i, ret, readybufs, gottime;
     941        int i, readybufs, gottime;
    748942        struct timeval tv;
    749         uint8_t rectype;
    750943
    751944        readybufs = 0;
     
    753946
    754947        for (i = 0; i < rt->sourcecount; i ++) {
    755                 int nw;
    756                 ndag_encap_t *encaphdr;
    757                 ndag_monitor_t *mon;
    758 
    759                 if (rt->sources[i].sock == -1) {
    760                         continue;
    761                 } else if (rt->sources[i].savedsize[rt->sources[i].nextwriteind] != 0) {
    762                         readybufs ++;
    763                         continue;
    764                 }
    765 
    766                 nw = rt->sources[i].nextwriteind;
    767 
    768                 ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw],
    769                                 ENCAP_BUFSIZE, MSG_DONTWAIT,
    770                                 rt->sources[i].srcaddr->ai_addr,
    771                                 &(rt->sources[i].srcaddr->ai_addrlen));
    772                 if (ret < 0) {
    773                         /* Nothing to receive right now, but we should still
    774                          * count as 'ready' if at least one buffer is full */
    775                         if (errno == EAGAIN || errno == EWOULDBLOCK) {
    776                                 if (readable_data(&(rt->sources[i]))) {
    777                                         readybufs ++;
    778                                 }
    779                                 if (!gottime) {
    780                                         gettimeofday(&tv, NULL);
    781                                 }
    782                                 if (rt->sources[i].startidle == 0) {
    783                                         rt->sources[i].startidle = tv.tv_sec;
    784                                 } else if (tv.tv_sec - rt->sources[i].startidle > NDAG_IDLE_TIMEOUT) {
    785                                         fprintf(stderr, "Closing channel %s:%u due to inactivity.\n",
    786                                                         rt->sources[i].groupaddr,
    787                                                         rt->sources[i].port);
    788 
    789                                         close(rt->sources[i].sock);
    790                                         rt->sources[i].sock = -1;
    791                                 }
    792                                 continue;
    793                         }
    794 
    795                         fprintf(stderr,
    796                                         "Error receiving encapsulated records from %s:%u -- %s \n",
    797                                         rt->sources[i].groupaddr,
    798                                         rt->sources[i].port,
    799                                         strerror(errno));
    800                         close(rt->sources[i].sock);
    801                         rt->sources[i].sock = -1;
    802                         continue;
    803                 }
    804                 rt->sources[i].startidle = 0;
    805 
    806                 /* Check that we have a valid nDAG encap record */
    807                 rectype = check_ndag_header(rt->sources[i].saved[nw], ret);
    808 
    809                 if (rectype == NDAG_PKT_KEEPALIVE) {
    810                         /* Keep-alive, reset startidle and carry on. Don't
    811                          * change nextwrite -- we want to overwrite the
    812                          * keep-alive with usable content. */
    813                         continue;
    814                 } else if (rectype != NDAG_PKT_ENCAPERF) {
    815                         fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
    816                                         rt->sources[i].groupaddr,
    817                                         rt->sources[i].port);
    818                         close(rt->sources[i].sock);
    819                         rt->sources[i].sock = -1;
    820                         continue;
    821                 }
    822 
    823                 rt->sources[i].savedsize[nw] = ret;
    824                 rt->sources[i].nextwriteind =
    825                                 (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS;
    826 
    827                 /* Get the useful info from the encap header */
    828                 encaphdr = (ndag_encap_t *)(rt->sources[i].saved[nw] +
    829                                 sizeof(ndag_common_t));
    830 
    831                 mon = rt->sources[i].monitorptr;
    832                 assert(mon);
    833 
    834                 if (mon->laststart == 0) {
    835                         mon->laststart = bswap_be_to_host64(encaphdr->started);
    836                 } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
    837                         mon->laststart = bswap_be_to_host64(encaphdr->started);
    838                         reset_expected_seqs(rt, mon);
    839 
    840                         /* TODO what is a good way to indicate this to clients?
    841                          * set the loss counter in the ERF header? a bit rude?
    842                          * use another bit in the ERF header?
    843                          * add a queryable flag to libtrace_packet_t?
    844                          */
    845 
    846                 }
    847 
    848                 if (rt->sources[i].expectedseq != 0) {
    849                         rt->missing_records += seq_cmp(
    850                                         ntohl(encaphdr->seqno),
    851                                         rt->sources[i].expectedseq);
    852                 }
    853                 rt->sources[i].expectedseq = ntohl(encaphdr->seqno) + 1;
    854                 if (rt->sources[i].expectedseq == 0) {
    855                         rt->sources[i].expectedseq ++;
    856                 }
    857 
    858                 if (rt->sources[i].nextread == NULL) {
    859                         /* If this is our first read, set up 'nextread'
    860                          * by skipping past the nDAG headers */
    861                         rt->sources[i].nextread = rt->sources[i].saved[0] +
    862                                         sizeof(ndag_common_t) + sizeof(ndag_encap_t);
    863                 }
    864 
    865                 readybufs ++;
     948                readybufs += receive_from_single_socket(&(rt->sources[i]),
     949                                &tv, &gottime, rt);
    866950        }
    867951
Note: See TracChangeset for help on using the changeset viewer.