Changeset 3004d6c
- Timestamp:
- 02/14/18 17:11:08 (3 years ago)
- Branches:
- cachetimestamps, develop, etsilive, master, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- a857389
- Parents:
- 7ebf768
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
r7ebf768 r3004d6c 113 113 uint64_t missing_records; 114 114 uint64_t received_packets; 115 116 fd_set allsocks; 117 int maxfd; 115 118 } recvstream_t; 116 119 … … 319 322 320 323 } 321 324 322 325 static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf, 323 326 int msgsize, uint16_t *ptmap) { … … 485 488 FORMAT_DATA->receivers[i].received_packets = 0; 486 489 FORMAT_DATA->receivers[i].missing_records = 0; 490 FD_ZERO(&(FORMAT_DATA->receivers[i].allsocks)); 491 FORMAT_DATA->receivers[i].maxfd = -1; 487 492 488 493 libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue), … … 766 771 ssock->recordcount = 0; 767 772 rt->sourcecount += 1; 773 FD_SET(ssock->sock, &(rt->allsocks)); 774 if (ssock->sock > rt->maxfd) { 775 rt->maxfd = ssock->sock; 776 } 768 777 769 778 fprintf(stderr, "Added new stream %s:%u to thread %d\n", … … 872 881 fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", 873 882 ssock->groupaddr, ssock->port); 883 FD_CLR(ssock->sock, &(rt->allsocks)); 874 884 close(ssock->sock); 875 885 ssock->sock = -1; … … 977 987 ssock->port); 978 988 989 FD_CLR(ssock->sock, &(rt->allsocks)); 979 990 close(ssock->sock); 980 991 ssock->sock = -1; … … 986 997 ssock->groupaddr, ssock->port, 987 998 strerror(errno)); 999 FD_CLR(ssock->sock, &(rt->allsocks)); 988 1000 close(ssock->sock); 989 1001 ssock->sock = -1; … … 1022 1034 int i, readybufs, gottime; 1023 1035 struct timeval tv; 1036 fd_set fds; 1037 struct timeval zerotv; 1024 1038 1025 1039 readybufs = 0; 1026 1040 gottime = 0; 1027 1041 1028 for (i = 0; i < rt->sourcecount; i ++) { 1042 fds = rt->allsocks; 1043 1044 if (rt->maxfd == -1) { 1045 return 0; 1046 } 1047 1048 zerotv.tv_sec = 0; 1049 zerotv.tv_usec = 0; 1050 1051 if (select(rt->maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) { 1052 /* log the error? XXX */ 1053 return -1; 1054 } 1055 1056 for (i = 0; i < rt->sourcecount; i++) { 1057 if (!FD_ISSET(rt->sources[i].sock, &fds)) { 1058 continue; 1059 } 1029 1060 readybufs += receive_from_single_socket(&(rt->sources[i]), 1030 1061 &tv, &gottime, rt); … … 1125 1156 } 1126 1157 return NULL; 1127 } 1158 } 1128 1159 1129 1160 … … 1220 1251 } while (1); 1221 1252 1222 1223 1253 for (i = 0; i < rt->sourcecount; i++) { 1224 1254 streamsock_t *src = &(rt->sources[i]);
Note: See TracChangeset
for help on using the changeset viewer.