Changeset c3cb9f9 for tools/tracertstats/tracertstats_single.c
- Timestamp:
- 09/10/15 16:39:46 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 9346e4a
- Parents:
- 9e48735
- File:
-
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
tools/tracertstats/tracertstats_single.c
r9e48735 rc3cb9f9 52 52 #include <lt_inttypes.h> 53 53 54 #include "libtrace _parallel.h"54 #include "libtrace.h" 55 55 #include "output.h" 56 56 #include "rt_protocol.h" … … 67 67 68 68 int merge_inputs = 0; 69 int threadcount = 4;70 69 71 70 struct filter_t { … … 121 120 } 122 121 123 uint64_t count; 124 uint64_t bytes; 125 126 typedef struct statistic { 127 uint64_t count; 128 uint64_t bytes; 129 } statistic_t; 130 131 typedef struct result { 132 struct statistic total; 133 struct statistic filters[0]; 134 } result_t; 135 136 static uint64_t glob_last_ts = 0; 137 static void process_result(libtrace_t *trace UNUSED, int mesg, 138 libtrace_generic_t data, 139 libtrace_thread_t *sender UNUSED) { 140 static uint64_t ts = 0; 141 int j; 142 result_t *res; 143 144 switch (mesg) { 145 case MESSAGE_RESULT: 146 ts = data.res->key; 147 res = data.res->value.ptr; 148 if (glob_last_ts == 0) 149 glob_last_ts = ts; 150 while ((glob_last_ts >> 32) < (ts >> 32)) { 151 report_results(glob_last_ts >> 32, count, bytes); 152 count = 0; 153 bytes = 0; 154 for (j = 0; j < filter_count; j++) 155 filters[j].count = filters[j].bytes = 0; 156 glob_last_ts = ts; 157 } 158 count += res->total.count; 159 bytes += res->total.bytes; 160 for (j = 0; j < filter_count; j++) { 161 filters[j].count += res->filters[j].count; 162 filters[j].bytes += res->filters[j].bytes; 163 } 164 free(res); 165 } 166 } 167 168 typedef struct timestamp_sync { 169 int64_t difference_usecs; 170 uint64_t first_interval_number; 171 } timestamp_sync_t; 172 173 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 174 int mesg, libtrace_generic_t data, 175 libtrace_thread_t *sender UNUSED) 122 /* Process a trace, counting packets that match filter(s) */ 123 static void run_trace(char *uri) 176 124 { 125 struct libtrace_packet_t *packet = trace_create_packet(); 177 126 int i; 178 static __thread result_t * results = NULL; 179 uint64_t key; 180 static __thread uint64_t last_key = 0; 181 182 switch(mesg) { 183 case MESSAGE_PACKET: 184 key = trace_get_erf_timestamp(data.pkt); 185 if ((key >> 32) > (last_key >> 32) + packet_interval) { 186 libtrace_generic_t tmp = {.ptr = results}; 187 trace_publish_result(trace, t, key, 188 tmp, RESULT_USER); 189 trace_post_reporter(trace); 190 last_key = key; 191 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 192 193 } 194 195 for(i=0;i<filter_count;++i) { 196 if(trace_apply_filter(filters[i].filter, data.pkt)) { 197 results->filters[i].count++; 198 results->filters[i].bytes+=trace_get_wire_length(data.pkt); 199 } 200 } 201 202 results->total.count++; 203 results->total.bytes +=trace_get_wire_length(data.pkt); 204 return data.pkt; 205 206 case MESSAGE_STARTING: 207 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 208 break; 209 210 case MESSAGE_STOPPING: 211 // Should we always post this? 212 if (results->total.count) { 213 libtrace_generic_t tmp = {.ptr = results}; 214 trace_publish_result(trace, t, last_key, tmp, RESULT_USER); 215 trace_post_reporter(trace); 216 free(results); 217 results = NULL; 218 } 219 break; 220 221 case MESSAGE_TICK_INTERVAL: 222 case MESSAGE_TICK_COUNT: 223 { 224 if (data.uint64 > last_key) { 225 libtrace_generic_t tmp = {.ptr = results}; 226 trace_publish_result(trace, t, data.uint64, 227 tmp, RESULT_USER); 228 trace_post_reporter(trace); 229 last_key = data.uint64; 230 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 231 } 232 break; 233 } 234 } 235 return NULL; 236 } 237 238 /* Process a trace, counting packets that match filter(s) */ 239 static void run_trace(char *uri) 240 { 127 uint64_t count = 0; 128 uint64_t bytes = 0; 129 double last_ts = 0; 130 double ts = 0; 131 241 132 if (!merge_inputs) 242 133 create_output(uri); … … 245 136 return; 246 137 247 138 trace = trace_create(uri); 248 139 if (trace_is_err(trace)) { 249 140 trace_perror(trace,"trace_create"); … … 251 142 if (!merge_inputs) 252 143 output_destroy(output); 253 return; 254 } 255 /* 144 return; 145 } 256 146 if (trace_start(trace)==-1) { 257 147 trace_perror(trace,"trace_start"); … … 260 150 output_destroy(output); 261 151 return; 262 }*/ 263 trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0}); 264 trace_set_tracetime(trace, true); 265 trace_set_perpkt_threads(trace, threadcount); 266 267 //trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL); 268 269 if (trace_get_information(trace)->live) { 270 trace_set_tick_interval(trace, (int) (packet_interval * 1000)); 271 } 272 273 if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) { 274 trace_perror(trace,"Failed to start trace"); 275 trace_destroy(trace); 276 if (!merge_inputs) 277 output_destroy(output); 278 return; 279 } 280 281 282 // Wait for all threads to stop 283 trace_join(trace); 284 285 // Flush the last one out 286 report_results((glob_last_ts >> 32), count, bytes); 152 } 153 154 for (;;) { 155 int psize; 156 if ((psize = trace_read_packet(trace, packet)) <1) { 157 break; 158 } 159 160 if (trace_get_packet_buffer(packet,NULL,NULL) == NULL) { 161 continue; 162 } 163 164 ts = trace_get_seconds(packet); 165 166 if (last_ts == 0) 167 last_ts = ts; 168 169 while (packet_interval != UINT64_MAX && last_ts<ts) { 170 report_results(last_ts,count,bytes); 171 count=0; 172 bytes=0; 173 last_ts+=packet_interval; 174 } 175 for(i=0;i<filter_count;++i) { 176 if(trace_apply_filter(filters[i].filter,packet)) { 177 ++filters[i].count; 178 filters[i].bytes+=trace_get_wire_length(packet); 179 } 180 } 181 182 ++count; 183 bytes+=trace_get_wire_length(packet); 184 185 186 if (count >= packet_count) { 187 report_results(ts,count,bytes); 188 count=0; 189 bytes=0; 190 } 191 } 192 report_results(ts,count,bytes); 193 287 194 if (trace_is_err(trace)) 288 195 trace_perror(trace,"%s",uri); … … 292 199 if (!merge_inputs) 293 200 output_destroy(output); 294 295 } 296 // TODO Decide what to do with -c option 201 202 trace_destroy_packet(packet); 203 } 204 297 205 static void usage(char *argv0) 298 206 { … … 301 209 "-i --interval=seconds Duration of reporting interval in seconds\n" 302 210 "-c --count=packets Exit after count packets received\n" 303 "-t --threads=max Create 'max' processing threads (default: 4)\n"304 211 "-o --output-format=txt|csv|html|png Reporting output format\n" 305 212 "-f --filter=bpf Apply BPF filter. Can be specified multiple times\n" … … 322 229 { "libtrace-help", 0, 0, 'H' }, 323 230 { "merge-inputs", 0, 0, 'm' }, 324 { "threads", 1, 0, 't' },325 231 { NULL, 0, 0, 0 }, 326 232 }; 327 233 328 int c=getopt_long(argc, argv, "c:f:i:o: t:Hm",234 int c=getopt_long(argc, argv, "c:f:i:o:Hm", 329 235 long_options, &option_index); 330 236 … … 341 247 filters[filter_count-1].bytes=0; 342 248 break; 343 case 't':344 threadcount = atoi(optarg);345 if (threadcount <= 0)346 threadcount = 1;347 break;348 249 case 'i': 349 250 packet_interval=atof(optarg); … … 383 284 384 285 286 385 287 if (merge_inputs) { 386 288 /* If we're merging the inputs, we only want to create all … … 393 295 if (output == NULL) 394 296 return 0; 297 395 298 } 396 299 … … 405 308 406 309 407 408 } 310 return 0; 311 }
Note: See TracChangeset
for help on using the changeset viewer.