Changeset 6445c52 for lib


Ignore:
Timestamp:
02/13/18 13:23:11 (3 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, etsilive, master, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
7ebf768
Parents:
fd1b63e
Message:

Fixed ndag packet corruption bug

Ensure we only mark receive buffers as available *after* we've
finished fetching a batch of packets -- otherwise we run the
risk of overwriting an earlier packet in the batch by reading
fresh data into the "available" buffer.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    r4f0f93f r6445c52  
    9191
    9292        int bufavail;
     93        int bufwaiting;
    9394
    9495#if HAVE_RECVMMSG
     
    633634        ssock->nextread += ntohs(erfptr->rlen);
    634635
     636        assert(ssock->nextread - ssock->saved[nr] <= ssock->savedsize[nr]);
     637
    635638        if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
    636639                /* Read everything from this buffer, mark as empty and
    637640                 * move on. */
    638641                ssock->savedsize[nr] = 0;
    639                 ssock->bufavail ++;
    640 
    641                 assert(ssock->bufavail > 0 && ssock->bufavail <= ENCAP_BUFFERS);
     642                ssock->bufwaiting ++;
     643
    642644                nr ++;
    643645                if (nr == ENCAP_BUFFERS) {
     
    652654        packet->error = packet->payload ? ntohs(erfptr->rlen) :
    653655                        erf_get_framing_length(packet);
    654 
    655656        return ntohs(erfptr->rlen);
    656657}
     
    731732        ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
    732733        ssock->bufavail = ENCAP_BUFFERS;
     734        ssock->bufwaiting = 0;
    733735        ssock->startidle = 0;
    734736
     
    901903                rt->missing_records += seq_cmp(
    902904                                ntohl(encaphdr->seqno), ssock->expectedseq);
     905
    903906        }
    904907        ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
     
    11211124                        ssock = &(rt->sources[i]);
    11221125                }
    1123                 /*
    1124                 fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,
    1125                                 i, currentts,
    1126                                 rt->sources[i].recordcount,
    1127                                 rt->missing_records);
    1128                 */
    11291126        }
    11301127        return ssock;
     
    11331130static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    11341131
    1135         int rem;
     1132        int rem, ret;
    11361133        streamsock_t *nextavail = NULL;
    11371134        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
     
    11501147         * a libtrace ERF packet. */
    11511148
    1152         return ndag_prepare_packet_stream(libtrace,
     1149        ret = ndag_prepare_packet_stream(libtrace,
    11531150                        &(FORMAT_DATA->receivers[0]), nextavail,
    11541151                        packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
     1152        nextavail->bufavail += nextavail->bufwaiting;
     1153        nextavail->bufwaiting = 0;
     1154        return ret;
    11551155}
    11561156
     
    11591159
    11601160        recvstream_t *rt;
    1161         int rem;
     1161        int rem, i;
    11621162        size_t read_packets = 0;
    11631163        streamsock_t *nextavail = NULL;
     
    11991199        } while (1);
    12001200
     1201       
     1202        for (i = 0; i < rt->sourcecount; i++) {
     1203                streamsock_t *src = &(rt->sources[i]);
     1204                src->bufavail += src->bufwaiting;
     1205                src->bufwaiting = 0;
     1206                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
     1207        }
     1208
    12011209        return read_packets;
    12021210
     
    12081216
    12091217        libtrace_eventobj_t event = {0,0,0.0,0};
    1210         int rem;
     1218        int rem, i;
    12111219        streamsock_t *nextavail = NULL;
    12121220
     
    12801288                break;
    12811289        } while (1);
     1290
     1291        for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
     1292                streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
     1293                src->bufavail += src->bufwaiting;
     1294                src->bufwaiting = 0;
     1295                assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS);
     1296        }
    12821297
    12831298        return event;
Note: See TracChangeset for help on using the changeset viewer.