- Timestamp:
- 11/13/17 17:24:27 (3 years ago)
- Branches:
- cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
- Children:
- 5d8280a
- Parents:
- b34f924
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_ndag.c
rb34f924 r639d952 736 736 return 0; 737 737 } 738 /* 738 739 if (ssock->nextread - ssock->saved[ssock->nextreadind] >= 739 740 ssock->savedsize[ssock->nextreadind]) { 740 741 return 0; 741 742 } 743 */ 742 744 return 1; 743 745 … … 862 864 int avail, ret, ndagstat, i; 863 865 int toret = 0; 864 struct timespec ts;865 866 866 867 if (ssock->sock == -1) { … … 876 877 } 877 878 878 ts.tv_sec = 0;879 ts.tv_nsec = 10000;880 881 879 ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail, 882 MSG_DONTWAIT, &ts); 883 884 /* 885 ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw], 886 ENCAP_BUFSIZE, MSG_DONTWAIT, 887 rt->sources[i].srcaddr->ai_addr, 888 &(rt->sources[i].srcaddr->ai_addrlen)); 889 */ 880 MSG_DONTWAIT, NULL); 881 890 882 if (ret < 0) { 891 883 /* Nothing to receive right now, but we should still … … 955 947 956 948 957 static int receive_encap_records (libtrace_t *libtrace, recvstream_t *rt,958 libtrace_packet_t *packet , int block) {949 static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, 950 libtrace_packet_t *packet) { 959 951 960 952 int iserr = 0; … … 972 964 973 965 /* Check for any messages from the control thread */ 974 if (block) { 975 iserr = receiver_read_messages(rt); 976 977 if (iserr <= 0) { 978 return iserr; 979 } 980 } 981 982 /* If non-blocking and there are no sources, just break */ 983 if (!block && rt->sourcecount == 0) { 984 iserr = 0; 985 break; 966 iserr = receiver_read_messages(rt); 967 968 if (iserr <= 0) { 969 return iserr; 986 970 } 987 971 … … 989 973 * checking for messages again. 990 974 */ 991 if ( block &&rt->sourcecount == 0) {975 if (rt->sourcecount == 0) { 992 976 usleep(10000); 993 977 continue; … … 1005 989 * a short break rather than immediately trying again. 1006 990 */ 1007 if ( block &&iserr == 0) {991 if (iserr == 0) { 1008 992 usleep(100); 1009 993 } 1010 994 1011 } while ( block);995 } while (1); 1012 996 1013 997 return iserr; 998 } 999 1000 static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt, 1001 libtrace_packet_t *packet) { 1002 1003 int iserr = 0; 1004 1005 if (packet->buf_control == TRACE_CTRL_PACKET) { 1006 free(packet->buffer); 1007 packet->buffer = NULL; 1008 } 1009 1010 /* Make sure we shouldn't be halting */ 1011 if ((iserr = is_halted(libtrace)) != -1) { 1012 return iserr; 1013 } 1014 1015 /* If non-blocking and there are no sources, just break */ 1016 if (rt->sourcecount == 0) { 1017 return 0; 1018 } 1019 1020 return receive_from_sockets(rt); 1014 1021 } 1015 1022 … … 1047 1054 int rem; 1048 1055 streamsock_t *nextavail = NULL; 1049 rem = receive_encap_records (libtrace, &(FORMAT_DATA->receivers[0]),1050 packet , 1);1056 rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]), 1057 packet); 1051 1058 1052 1059 if (rem <= 0) { … … 1077 1084 rt = (recvstream_t *)t->format_data; 1078 1085 1079 /* Only check for messages once per batch */1080 rem = receiver_read_messages(rt);1081 if (rem <= 0) {1082 return rem;1083 }1084 1086 1085 1087 do { 1086 rem = receive_encap_records(libtrace, rt, 1087 packets[read_packets], 1088 read_packets == 0 ? 1 : 0); 1088 /* Only check for messages once per batch */ 1089 if (read_packets == 0) { 1090 rem = receive_encap_records_block(libtrace, rt, 1091 packets[read_packets]); 1092 } else { 1093 rem = receive_encap_records_nonblock(libtrace, rt, 1094 packets[read_packets]); 1095 } 1096 1089 1097 if (rem < 0) { 1090 1098 return rem; … … 1122 1130 streamsock_t *nextavail = NULL; 1123 1131 1124 /* Only check for messages once per batch*/1132 /* Only check for messages once per call */ 1125 1133 rem = receiver_read_messages(&(FORMAT_DATA->receivers[0])); 1126 1134 if (rem <= 0) { … … 1130 1138 1131 1139 do { 1132 rem = receive_encap_records (libtrace,1133 &(FORMAT_DATA->receivers[0]), packet , 0);1140 rem = receive_encap_records_nonblock(libtrace, 1141 &(FORMAT_DATA->receivers[0]), packet); 1134 1142 1135 1143 if (rem < 0) {
Note: See TracChangeset
for help on using the changeset viewer.