- Timestamp:
- 09/10/15 14:10:46 (5 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 9e48735
- Parents:
- a2dcdad
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
tools/tracertstats/tracertstats_parallel.c
r58bfabf r49b0537 133 133 } result_t; 134 134 135 static uint64_t last_ts = 0;135 static uint64_t glob_last_ts = 0; 136 136 static void process_result(libtrace_t *trace UNUSED, int mesg, 137 137 libtrace_generic_t data, … … 145 145 ts = data.res->key; 146 146 res = data.res->value.ptr; 147 if ( last_ts == 0)148 last_ts = ts;149 while ( last_ts < ts) {150 report_results( (double) last_ts * (double) packet_interval, count, bytes);147 if (glob_last_ts == 0) 148 glob_last_ts = ts; 149 while ((glob_last_ts >> 32) < (ts >> 32)) { 150 report_results(glob_last_ts >> 32, count, bytes); 151 151 count = 0; 152 152 bytes = 0; 153 153 for (j = 0; j < filter_count; j++) 154 154 filters[j].count = filters[j].bytes = 0; 155 last_ts++;155 glob_last_ts = ts; 156 156 } 157 157 count += res->total.count; … … 175 175 { 176 176 int i; 177 static __thread uint64_t last_ts = 0, ts = 0;178 177 static __thread result_t * results = NULL; 178 uint64_t key; 179 static __thread uint64_t last_key = 0; 179 180 180 181 switch(mesg) { 181 182 case MESSAGE_PACKET: 182 // Unsure when we would hit this case but the old code had it, I 183 // guess we should keep it 184 assert(trace_get_packet_buffer(data.pkt,NULL,NULL) != NULL); 185 ts = trace_get_seconds(data.pkt) / packet_interval; 186 if (last_ts == 0) 187 last_ts = ts; 188 189 while (packet_interval != UINT64_MAX && last_ts<ts) { 190 libtrace_generic_t tmp = {.ptr = results}; 191 // Publish and make a new one new 192 //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts); 193 trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER); 194 trace_post_reporter(trace); 195 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 196 last_ts++; 197 } 183 key = trace_get_erf_timestamp(data.pkt); 184 if ((key >> 32) > (last_key >> 32) + packet_interval) { 185 libtrace_generic_t tmp = {.ptr = results}; 186 trace_publish_result(trace, t, key, 187 tmp, RESULT_USER); 188 trace_post_reporter(trace); 189 last_key = key; 190 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 191 192 } 198 193 199 194 for(i=0;i<filter_count;++i) { … … 206 201 results->total.count++; 207 202 results->total.bytes +=trace_get_wire_length(data.pkt); 208 /*if (count >= packet_count) {209 report_results(ts,count,bytes);210 count=0;211 bytes=0;212 }*/ // TODO what was happening here doesn't match up with any of the documentations!!!213 203 return data.pkt; 214 204 … … 221 211 if (results->total.count) { 222 212 libtrace_generic_t tmp = {.ptr = results}; 223 trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER);213 trace_publish_result(trace, t, last_key, tmp, RESULT_USER); 224 214 trace_post_reporter(trace); 215 free(results); 225 216 results = NULL; 226 217 } … … 230 221 case MESSAGE_TICK_COUNT: 231 222 { 232 int64_t offset; 233 const struct timeval *tv; 234 struct timeval tv_real; 235 const libtrace_packet_t *first_packet = NULL; 236 trace_get_first_packet(trace, NULL, &first_packet, &tv); 237 if (first_packet != NULL) { 238 // So figure out our running offset 239 tv_real = trace_get_timeval(first_packet); 240 offset = tv_to_usec(tv) - tv_to_usec(&tv_real); 241 // Get time of day and do this stuff 242 uint64_t next_update_time; 243 next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset; 244 if (next_update_time <= data.uint64) { 245 libtrace_generic_t tmp = {.ptr = results}; 246 //fprintf(stderr, "Got a tick and publishing early!!\n"); 247 trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER); 248 trace_post_reporter(trace); 249 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 250 last_ts++; 251 } else { 252 //fprintf(stderr, "Got a tick but no publish ...\n"); 253 } 254 } else { 255 //fprintf(stderr, "Got a tick but no packets seen yet!!!\n"); 256 } 223 if (data.uint64 > last_key) { 224 libtrace_generic_t tmp = {.ptr = results}; 225 trace_publish_result(trace, t, data.uint64, 226 tmp, RESULT_USER); 227 trace_post_reporter(trace); 228 last_key = data.uint64; 229 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 230 } 231 break; 257 232 } 258 233 } 259 234 return NULL; 260 }261 262 static uint64_t bad_hash(const libtrace_packet_t * pkt UNUSED, void *data UNUSED) {263 return 0;264 235 } 265 236 … … 267 238 static void run_trace(char *uri) 268 239 { 269 int j;270 271 240 if (!merge_inputs) 272 241 create_output(uri); … … 297 266 298 267 if (trace_get_information(trace)->live) { 299 268 trace_set_tick_interval(trace, (int) (packet_interval * 1000)); 300 269 } 301 270 … … 313 282 314 283 // Flush the last one out 315 report_results((double) last_ts * (double) packet_interval, count, bytes); 316 //count = 0; 317 //bytes = 0; 318 for (j = 0; j < filter_count; j++) 319 filters[j].count = filters[j].bytes = 0; 320 (last_ts)++; 321 284 report_results((glob_last_ts >> 32), count, bytes); 322 285 if (trace_is_err(trace)) 323 286 trace_perror(trace,"%s",uri);
Note: See TracChangeset
for help on using the changeset viewer.