Changeset 4e427da


Ignore:
Timestamp:
11/07/17 14:38:27 (4 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
de825b5
Parents:
aa7db84
Message:

Increase number of packets that can be buffered at once.

100 packets per socket seems to work fairly well, but going to
10,000 introduces a severe performance penalty for some reason.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    raa7db84 r4e427da  
    2525#define ENCAP_BUFSIZE (10000)
    2626#define CTRL_BUF_SIZE (10000)
     27#define ENCAP_BUFFERS (100)
    2728
    2829#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
     
    4748        uint16_t monitorid;
    4849
    49         char *saved;
     50        char **saved;
    5051        char *nextread;
    51         int savedsize;
    52         ndag_encap_t *encaphdr;
     52        int nextreadind;
     53        int nextwriteind;
     54        int savedsize[ENCAP_BUFFERS];
    5355        uint64_t recordcount;
    5456
     
    474476
    475477static void halt_ndag_receiver(recvstream_t *receiver) {
    476         int i;
     478        int j, i;
    477479        libtrace_message_queue_destroy(&(receiver->mqueue));
    478480
     
    482484                streamsock_t src = receiver->sources[i];
    483485                if (src.saved) {
     486                        for (j = 0; i < ENCAP_BUFFERS; j++) {
     487                                if (src.saved[j]) {
     488                                        free(src.saved[j]);
     489                                }
     490                        }
    484491                        free(src.saved);
    485492                }
     
    526533
    527534        dag_record_t *erfptr;
     535        ndag_encap_t *encaphdr;
    528536        uint16_t ndag_reccount = 0;
     537        int nr;
    529538
    530539        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
     
    562571        ssock->recordcount += 1;
    563572
    564         ndag_reccount = ntohs(ssock->encaphdr->recordcount);
     573        nr = ssock->nextreadind;
     574        encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
     575                        sizeof(ndag_common_t));
     576
     577        ndag_reccount = ntohs(encaphdr->recordcount);
    565578        if ((ndag_reccount & 0x8000) != 0) {
    566579                /* Record was truncated -- update rlen appropriately */
    567                 erfptr->rlen = htons(ssock->savedsize -
    568                                 (ssock->nextread - ssock->saved));
     580                erfptr->rlen = htons(ssock->savedsize[nr] -
     581                                (ssock->nextread - ssock->saved[nr]));
    569582        }
    570583        ssock->nextread += ntohs(erfptr->rlen);
     584
     585        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
     586                /* Read everything from this buffer, mark as empty and
     587                 * move on. */
     588                ssock->savedsize[nr] = 0;
     589                nr = (nr + 1) % ENCAP_BUFFERS;
     590                ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
     591                                sizeof(ndag_encap_t);
     592                ssock->nextreadind = nr;
     593        }
    571594
    572595        packet->order = erf_get_erf_timestamp(packet);
     
    590613
    591614        streamsock_t *ssock = NULL;
     615        int i;
    592616
    593617        if (rt->sourcecount == 0) {
     
    611635        ssock->expectedseq = 0;
    612636        ssock->monitorid = src.monitor;
    613         ssock->saved = (char *)malloc(ENCAP_BUFSIZE);
    614         ssock->nextread = ssock->saved;
    615         ssock->savedsize = 0;
    616         ssock->encaphdr = NULL;
     637        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
     638
     639
     640        for (i = 0; i < ENCAP_BUFFERS; i++) {
     641                ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
     642                ssock->savedsize[i] = 0;
     643        }
     644
     645        ssock->nextread = NULL;;
     646        ssock->nextreadind = 0;
    617647        ssock->recordcount = 0;
    618648        rt->sourcecount += 1;
     
    648678}
    649679
     680static inline int readable_data(streamsock_t ssock) {
     681
     682        if (ssock.sock == -1) {
     683                return 0;
     684        }
     685        if (ssock.savedsize[ssock.nextreadind] == 0) {
     686                return 0;
     687        }
     688        if (ssock.nextread - ssock.saved[ssock.nextreadind] >=
     689                        ssock.savedsize[ssock.nextreadind]) {
     690                return 0;
     691        }
     692        return 1;
     693
     694
     695}
     696
    650697static inline int read_required(streamsock_t ssock) {
    651698        if (ssock.sock == -1)
    652699                return 0;
    653         if (ssock.savedsize == 0)
     700        if (ssock.savedsize[ssock.nextwriteind] == 0)
    654701                return 1;
    655         if (ssock.nextread - ssock.saved >= ssock.savedsize)
    656                 return 1;
     702        //if (ssock.nextread - ssock.saved >= ssock.savedsize)
     703        //        return 1;
    657704        return 0;
    658705}
     
    694741         */
    695742        for (i = 0; i < rt->sourcecount; i ++) {
     743                int nw;
     744                ndag_encap_t *encaphdr;
     745
    696746                if (!read_required(rt->sources[i])) {
    697747                        if (rt->sources[i].sock != -1) {
     
    701751                }
    702752
    703                 rt->sources[i].savedsize = 0;
    704                 rt->sources[i].nextread = rt->sources[i].saved;
    705 
    706                 ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved,
     753                nw = rt->sources[i].nextwriteind;
     754
     755                ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw],
    707756                                ENCAP_BUFSIZE, MSG_DONTWAIT,
    708757                                rt->sources[i].srcaddr->ai_addr,
    709758                                &(rt->sources[i].srcaddr->ai_addrlen));
    710759                if (ret < 0) {
     760                        /* Nothing to receive right now, but we should still
     761                         * count as 'ready' if at least one buffer is full */
    711762                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
     763                                if (readable_data(rt->sources[i])) {
     764                                        readybufs ++;
     765                                }
    712766                                continue;
    713767                        }
     
    732786                }
    733787
    734                 rt->sources[i].savedsize = ret;
     788                rt->sources[i].savedsize[nw] = ret;
     789                rt->sources[i].nextwriteind =
     790                                (rt->sources[i].nextwriteind + 1) % ENCAP_BUFFERS;
    735791                successrecv ++;
    736792
    737793                /* Check that we have a valid nDAG encap record */
    738                 if (check_ndag_header(rt->sources[i].saved, ret) !=
     794                if (check_ndag_header(rt->sources[i].saved[nw], ret) !=
    739795                                        NDAG_PKT_ENCAPERF) {
    740796                        fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
     
    747803
    748804                /* Save the useful info from the encap header */
    749                 rt->sources[i].encaphdr = (ndag_encap_t *)
    750                                 (rt->sources[i].saved + sizeof(ndag_common_t));
     805                encaphdr = (ndag_encap_t *)(rt->sources[i].saved[nw] +
     806                                sizeof(ndag_common_t));
    751807
    752808                if (rt->sources[i].expectedseq != 0) {
    753809                        rt->missing_records += seq_cmp(
    754                                         ntohl(rt->sources[i].encaphdr->seqno),
     810                                        ntohl(encaphdr->seqno),
    755811                                        rt->sources[i].expectedseq);
    756812                }
    757                 rt->sources[i].expectedseq =
    758                                 ntohl(rt->sources[i].encaphdr->seqno) + 1;
     813                rt->sources[i].expectedseq = ntohl(encaphdr->seqno) + 1;
    759814
    760815                /* If all good, skip past the nDAG headers */
    761                 rt->sources[i].nextread = rt->sources[i].saved +
    762                                 sizeof(ndag_common_t) + sizeof(ndag_encap_t);
     816                if (rt->sources[i].nextread == NULL) {
     817                        rt->sources[i].nextread = rt->sources[i].saved[0] +
     818                                        sizeof(ndag_common_t) + sizeof(ndag_encap_t);
     819                }
    763820
    764821                readybufs += 1;
     
    835892
    836893        for (i = 0; i < rt->sourcecount; i ++) {
    837                 if (read_required(rt->sources[i])) {
     894                if (!readable_data(rt->sources[i])) {
    838895                        continue;
    839896                }
Note: See TracChangeset for help on using the changeset viewer.