Changeset 2498008 for tools/tracertstats/tracertstats_parallel.c
- Timestamp:
- 09/16/14 02:35:10 (8 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- d994324
- Parents:
- 50b1bee
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
tools/tracertstats/tracertstats_parallel.c
rf051c1b r2498008 136 136 } result_t; 137 137 138 139 static int reduce(libtrace_t* trace, void* global_blob, uint64_t *last_ts) 140 { 141 int i,j; 142 //uint64_t count=0, bytes=0; 138 static uint64_t last_ts = 0; 139 static void process_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg UNUSED) { 143 140 static uint64_t ts = 0; 144 libtrace_vector_t results; 145 libtrace_vector_init(&results, sizeof(libtrace_result_t)); 146 trace_get_results(trace, &results); 147 //uint64_t packets; 148 149 /* Get the results from each core and sum 'em up */ 150 for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) { 151 libtrace_result_t result; 152 153 assert(libtrace_vector_get(&results, i, (void *) &result) == 1); 154 ts = libtrace_result_get_key(&result); 155 if (*last_ts == 0) 156 *last_ts = ts; 157 158 result_t * res = libtrace_result_get_value(&result); 159 static result_t * last_res = NULL; 160 // Memory manager might falsely trigger this 161 assert(res != last_res); 162 last_res = res; 163 //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count); 164 while (*last_ts < ts) { 165 report_results((double) *last_ts * (double) packet_interval, count, bytes); 141 142 if (result) { 143 int j; 144 result_t *res; 145 ts = libtrace_result_get_key(result); 146 res = libtrace_result_get_value(result); 147 if (last_ts == 0) 148 last_ts = ts; 149 while (last_ts < ts) { 150 report_results((double) last_ts * (double) packet_interval, count, bytes); 166 151 count = 0; 167 152 bytes = 0; 168 153 for (j = 0; j < filter_count; j++) 169 154 filters[j].count = filters[j].bytes = 0; 170 (*last_ts)++; 171 } 172 155 last_ts++; 156 } 173 157 count += res->total.count; 174 158 bytes += res->total.bytes; … … 179 163 free(res); 180 164 } 181 // Done with these results - Free internally and externally182 libtrace_vector_destroy(&results);183 184 return 0;185 165 } 186 166 … … 276 256 } 277 257 278 static uint64_t bad_hash(const libtrace_packet_t * pkt , void *data) {258 static uint64_t bad_hash(const libtrace_packet_t * pkt UNUSED, void *data UNUSED) { 279 259 return 0; 280 260 } … … 284 264 { 285 265 int j; 286 uint64_t last_ts = 0;287 266 288 267 if (!merge_inputs) … … 318 297 } 319 298 320 if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {299 if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) { 321 300 trace_perror(trace,"Failed to start trace"); 322 301 trace_destroy(trace); … … 327 306 328 307 329 // reduce330 while (!trace_finished(trace)) {331 // Read messages332 libtrace_message_t message;333 334 // We just release and do work currently, maybe if something335 // interesting comes through we'd deal with that336 libtrace_thread_get_message(trace, &message);337 338 while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }339 reduce(trace, NULL, &last_ts);340 }341 342 308 // Wait for all threads to stop 343 309 trace_join(trace); 344 310 345 reduce(trace, NULL, &last_ts);346 311 // Flush the last one out 347 312 report_results((double) last_ts * (double) packet_interval, count, bytes);
Note: See TracChangeset
for help on using the changeset viewer.