Changeset 6445c52
- Timestamp:
- 02/13/18 13:23:11 (3 years ago)
- Branches:
- cachetimestamps, develop, etsilive, master, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- 7ebf768
- Parents:
- fd1b63e
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
r4f0f93f r6445c52 91 91 92 92 int bufavail; 93 int bufwaiting; 93 94 94 95 #if HAVE_RECVMMSG … … 633 634 ssock->nextread += ntohs(erfptr->rlen); 634 635 636 assert(ssock->nextread - ssock->saved[nr] <= ssock->savedsize[nr]); 637 635 638 if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) { 636 639 /* Read everything from this buffer, mark as empty and 637 640 * move on. */ 638 641 ssock->savedsize[nr] = 0; 639 ssock->bufavail ++; 640 641 assert(ssock->bufavail > 0 && ssock->bufavail <= ENCAP_BUFFERS); 642 ssock->bufwaiting ++; 643 642 644 nr ++; 643 645 if (nr == ENCAP_BUFFERS) { … … 652 654 packet->error = packet->payload ? ntohs(erfptr->rlen) : 653 655 erf_get_framing_length(packet); 654 655 656 return ntohs(erfptr->rlen); 656 657 } … … 731 732 ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS); 732 733 ssock->bufavail = ENCAP_BUFFERS; 734 ssock->bufwaiting = 0; 733 735 ssock->startidle = 0; 734 736 … … 901 903 rt->missing_records += seq_cmp( 902 904 ntohl(encaphdr->seqno), ssock->expectedseq); 905 903 906 } 904 907 ssock->expectedseq = ntohl(encaphdr->seqno) + 1; … … 1121 1124 ssock = &(rt->sources[i]); 1122 1125 } 1123 /*1124 fprintf(stderr, "%d %d %lu %lu %lu\n", rt->threadindex,1125 i, currentts,1126 rt->sources[i].recordcount,1127 rt->missing_records);1128 */1129 1126 } 1130 1127 return ssock; … … 1133 1130 static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { 1134 1131 1135 int rem ;1132 int rem, ret; 1136 1133 streamsock_t *nextavail = NULL; 1137 1134 rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]), … … 1150 1147 * a libtrace ERF packet. */ 1151 1148 1152 ret urnndag_prepare_packet_stream(libtrace,1149 ret = ndag_prepare_packet_stream(libtrace, 1153 1150 &(FORMAT_DATA->receivers[0]), nextavail, 1154 1151 packet, TRACE_PREP_DO_NOT_OWN_BUFFER); 1152 nextavail->bufavail += nextavail->bufwaiting; 1153 nextavail->bufwaiting = 0; 1154 return ret; 1155 1155 } 1156 1156 … … 1159 1159 1160 1160 recvstream_t *rt; 1161 int rem ;1161 int rem, i; 1162 1162 size_t read_packets = 0; 1163 1163 streamsock_t *nextavail = NULL; … … 1199 1199 } while (1); 1200 1200 1201 1202 for (i = 0; i < rt->sourcecount; i++) { 1203 streamsock_t *src = &(rt->sources[i]); 1204 src->bufavail += src->bufwaiting; 1205 src->bufwaiting = 0; 1206 assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS); 1207 } 1208 1201 1209 return read_packets; 1202 1210 … … 1208 1216 1209 1217 libtrace_eventobj_t event = {0,0,0.0,0}; 1210 int rem ;1218 int rem, i; 1211 1219 streamsock_t *nextavail = NULL; 1212 1220 … … 1280 1288 break; 1281 1289 } while (1); 1290 1291 for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) { 1292 streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]); 1293 src->bufavail += src->bufwaiting; 1294 src->bufwaiting = 0; 1295 assert(src->bufavail >= 0 && src->bufavail <= ENCAP_BUFFERS); 1296 } 1282 1297 1283 1298 return event;
Note: See TracChangeset
for help on using the changeset viewer.