- Timestamp:
- 03/30/15 14:22:09 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 8370482
- Parents:
- 773a2a3
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
tools/tracestats/tracestats_parallel.c
r7c95027 rdfbdda7a 59 59 struct libtrace_t *trace = NULL; 60 60 61 static void cleanup_signal(int signal )61 static void cleanup_signal(int signal UNUSED) 62 62 { 63 static int s = 0; 64 (void)signal; 65 // trace_interrupt(); 66 // trace_pstop isn't really signal safe because its got lots of locks in it 67 trace_pstop(trace); 68 /*if (s == 0) { 69 if (trace_ppause(trace) == -1) 70 trace_perror(trace, "Pause failed"); 71 } 72 else { 73 if (trace_pstart(trace, NULL, NULL, NULL) == -1) 74 trace_perror(trace, "Start failed"); 75 }*/ 76 s = !s; 63 if (trace) 64 trace_pstop(trace); 77 65 } 78 66 … … 99 87 100 88 101 //libtrace_message_t mesg102 89 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 103 90 int mesg, libtrace_generic_t data, 104 91 libtrace_thread_t *sender UNUSED) 105 92 { 106 / / Using first entry as total and those after for filter counts93 /* Using first entry as total and those after for filter counts */ 107 94 static __thread statistics_t * results = NULL; 108 95 int i, wlen; 109 96 libtrace_stat_t *stats; 110 111 112 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 97 libtrace_generic_t gen; 98 113 99 switch (mesg) { 114 100 case MESSAGE_PACKET: 101 /* Apply filters to every packet note the result */ 115 102 wlen = trace_get_wire_length(data.pkt); 116 103 for(i=0;i<filter_count;++i) { … … 124 111 trace_perror(trace, "trace_apply_filter"); 125 112 fprintf(stderr, "Removing filter from filterlist\n"); 126 / / XXX might be a problem across threads below113 /* This is a race, but will be atomic */ 127 114 filters[i].filter = NULL; 128 115 } … … 131 118 results[0].bytes +=wlen; 132 119 return data.pkt; 133 case MESSAGE_STOPPING:134 stats = trace_create_statistics();135 trace_get_thread_statistics(trace, t, stats);136 trace_print_statistics(stats, stderr, NULL);137 free(stats);138 trace_publish_result(trace, t, 0, (libtrace_generic_t){.ptr = results}, RESULT_USER); // Only ever using a single key 0139 //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");140 break;141 120 case MESSAGE_STARTING: 121 /* Allocate space to hold a total count and one for each filter */ 142 122 results = calloc(1, sizeof(statistics_t) * (filter_count + 1)); 143 123 break; 144 case MESSAGE_DO_PAUSE: 145 assert(!"GOT Asked to pause!!!\n"); 124 case MESSAGE_STOPPING: 125 /* We only output one result per thread with the key 0 when the 126 * trace is over. */ 127 gen.ptr = results; 128 trace_publish_result(trace, t, 0, gen, RESULT_USER); 146 129 break; 147 case MESSAGE_PAUSING: 148 //fprintf(stderr, "tracestats_parallel:\t pausing thread\n"); 130 default: 149 131 break; 150 case MESSAGE_RESUMING:151 //fprintf(stderr, "tracestats_parallel:\t resuming thread\n");152 break;153 132 } 154 133 return NULL; 155 134 } 156 135 157 static void report_result(libtrace_t *trace UNUSED, int mesg,136 static void report_result(libtrace_t *trace, int mesg, 158 137 libtrace_generic_t data, 159 138 libtrace_thread_t *sender UNUSED) { … … 176 155 break; 177 156 case MESSAGE_STOPPING: 157 /* We are done, print out results */ 178 158 stats = trace_get_statistics(trace, NULL); 179 159 printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%"); … … 204 184 } 205 185 206 static uint64_t rand_hash(libtrace_packet_t * pkt UNUSED, void *data UNUSED) {207 return rand();208 }209 210 static uint64_t bad_hash(libtrace_packet_t * pkt UNUSED, void *data UNUSED) {211 return 0;212 }213 214 186 /* Process a trace, counting packets that match filter(s) */ 215 187 static void run_trace(char *uri, char *config, char *config_file) … … 224 196 return; 225 197 } 226 227 //libtrace_filter_t *f = trace_create_filter("udp");228 //trace_config(trace, TRACE_OPTION_FILTER, f);229 230 //trace_config(trace, TRACE_OPTION_META_FREQ, &option);231 //option = 10000;232 trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);233 //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);234 198 235 199 /* Apply config */ … … 249 213 } 250 214 251 trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0}); 252 253 //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option); 254 255 /* OPTIONALLY SETUP CORES HERE BUT WE DON'T CARE ABOUT THAT YET XXX */ 256 257 /*if (trace_start(trace)==-1) { 258 trace_perror(trace,"Failed to start trace"); 259 return; 260 }*/ 261 global_blob_t blob; 262 263 264 if (trace_pstart(trace, (void *)&blob, &per_packet, report_result)==-1) { 215 /* Start the trace as a parallel trace */ 216 if (trace_pstart(trace, NULL, &per_packet, report_result)==-1) { 265 217 trace_perror(trace,"Failed to start trace"); 266 218 return; 267 219 } 268 220 269 / / Wait for all threads to stop221 /* Wait for all threads to stop */ 270 222 trace_join(trace); 271 223 272 //map_pair_iterator_t * results = NULL;273 //trace_get_results(trace, &results);274 275 //if (results != NULL) {276 // reduce(trace, global_blob, results);277 //}278 224 if (trace_is_err(trace)) 279 225 trace_perror(trace,"%s",uri);
Note: See TracChangeset
for help on using the changeset viewer.