- Timestamp:
- 02/27/15 17:31:03 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 98dc1ba
- Parents:
- 7718e54
- Location:
- tools
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
tools/traceanon/traceanon_parallel.c
rd994324 r0ec8a7c 154 154 155 155 156 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t) 157 { 158 if (pkt) { 159 struct libtrace_ip *ipptr; 160 libtrace_udp_t *udp = NULL; 161 libtrace_tcp_t *tcp = NULL; 162 163 ipptr = trace_get_ip(pkt); 156 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 157 int mesg, libtrace_generic_t data, 158 libtrace_thread_t *sender UNUSED) 159 { 160 struct libtrace_ip *ipptr; 161 libtrace_udp_t *udp = NULL; 162 libtrace_tcp_t *tcp = NULL; 163 164 switch (mesg) { 165 case MESSAGE_PACKET: 166 ipptr = trace_get_ip(data.pkt); 164 167 165 168 if (ipptr && (enc_source || enc_dest)) { … … 173 176 /* XXX replace with nice use of trace_get_transport() */ 174 177 175 udp = trace_get_udp( pkt);178 udp = trace_get_udp(data.pkt); 176 179 if (udp && (enc_source || enc_dest)) { 177 180 udp->check = 0; 178 181 } 179 182 180 tcp = trace_get_tcp( pkt);183 tcp = trace_get_tcp(data.pkt); 181 184 if (tcp && (enc_source || enc_dest)) { 182 185 tcp->check = 0; … … 191 194 //trace_publish_result(trace, trace_packet_get_order(pkt), pkt); 192 195 193 trace_publish_result(trace, t, trace_packet_get_order(pkt), (libtrace_generic_types_t){.pkt=pkt}, RESULT_PACKET); 194 //return ; 195 } 196 if (mesg) { 197 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 198 switch (mesg->code) { 199 case MESSAGE_STARTING: 200 enc_init(enc_type,key); 201 break; 202 case MESSAGE_TICK: 203 trace_publish_result(trace, t, mesg->additional.uint64, (libtrace_generic_types_t){.pkt=NULL}, RESULT_TICK); 204 } 196 trace_publish_result(trace, t, trace_packet_get_order(data.pkt), data, RESULT_PACKET); 197 break; 198 case MESSAGE_STARTING: 199 enc_init(enc_type,key); 200 break; 201 case MESSAGE_TICK: 202 trace_publish_result(trace, t, data.uint64, (libtrace_generic_t){0}, RESULT_TICK); 203 break; 205 204 } 206 205 return NULL; … … 423 422 424 423 int i = 1; 425 trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t ypes_t){0});424 trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0}); 426 425 trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc); 427 426 -
tools/tracertstats/tracertstats_parallel.c
r2e22196d r0ec8a7c 171 171 } timestamp_sync_t; 172 172 173 static void* per_packet(libtrace_t *trace, libtrace_ packet_t *pkt,174 libtrace_message_t *mesg,175 libtrace_thread_t *t)173 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 174 int mesg, libtrace_generic_t data, 175 libtrace_thread_t *sender UNUSED) 176 176 { 177 177 int i; 178 178 static __thread uint64_t last_ts = 0, ts = 0; 179 179 static __thread result_t * results = NULL; 180 // Unsure when we would hit this case but the old code had it, I 181 // guess we should keep it 182 if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) { 183 //fprintf(stderr, "Got packet t=%x\n", t); 184 ts = trace_get_seconds(pkt) / packet_interval; 180 181 switch(mesg) { 182 case MESSAGE_PACKET: 183 // Unsure when we would hit this case but the old code had it, I 184 // guess we should keep it 185 assert(trace_get_packet_buffer(pkt,NULL,NULL) != NULL); 186 ts = trace_get_seconds(data.pkt) / packet_interval; 185 187 if (last_ts == 0) 186 188 last_ts = ts; 187 189 188 190 while (packet_interval != UINT64_MAX && last_ts<ts) { 191 libtrace_generic_t tmp = {.ptr = results}; 189 192 // Publish and make a new one new 190 193 //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts); 191 trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL);194 trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL); 192 195 trace_post_reporter(trace); 193 196 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 194 197 last_ts++; 195 198 } 196 199 197 200 for(i=0;i<filter_count;++i) { 198 if(trace_apply_filter(filters[i].filter, pkt)) {201 if(trace_apply_filter(filters[i].filter, data.pkt)) { 199 202 results->filters[i].count++; 200 results->filters[i].bytes+=trace_get_wire_length( pkt);203 results->filters[i].bytes+=trace_get_wire_length(data.pkt); 201 204 } 202 205 } 203 206 204 207 results->total.count++; 205 results->total.bytes +=trace_get_wire_length( pkt);208 results->total.bytes +=trace_get_wire_length(data.pkt); 206 209 /*if (count >= packet_count) { 207 210 report_results(ts,count,bytes); … … 209 212 bytes=0; 210 213 }*/ // TODO what was happening here doesn't match up with any of the documentations!!! 211 } 212 213 if (mesg) { 214 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 215 switch (mesg->code) { 216 case MESSAGE_STARTING: 217 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 218 break; 219 case MESSAGE_STOPPING: 220 // Should we always post this? 221 if (results->total.count) { 222 trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL); 214 return data.pkt; 215 216 case MESSAGE_STARTING: 217 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 218 break; 219 220 case MESSAGE_STOPPING: 221 // Should we always post this? 222 if (results->total.count) { 223 libtrace_generic_t tmp = {.ptr = results}; 224 trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL); 225 trace_post_reporter(trace); 226 results = NULL; 227 } 228 break; 229 230 case MESSAGE_TICK: 231 { 232 int64_t offset; 233 struct timeval *tv, tv_real; 234 libtrace_packet_t *first_packet = NULL; 235 retrive_first_packet(trace, &first_packet, &tv); 236 if (first_packet != NULL) { 237 // So figure out our running offset 238 tv_real = trace_get_timeval(first_packet); 239 offset = tv_to_usec(tv) - tv_to_usec(&tv_real); 240 // Get time of day and do this stuff 241 uint64_t next_update_time; 242 next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset; 243 if (next_update_time <= data.uint64) { 244 libtrace_generic_t tmp = {.ptr = results}; 245 //fprintf(stderr, "Got a tick and publishing early!!\n"); 246 trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_NORMAL); 223 247 trace_post_reporter(trace); 224 results = NULL; 248 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 249 last_ts++; 250 } else { 251 //fprintf(stderr, "Got a tick but no publish ...\n"); 225 252 } 226 break; 227 case MESSAGE_TICK: 228 { 229 int64_t offset; 230 struct timeval *tv, tv_real; 231 libtrace_packet_t *first_packet = NULL; 232 retrive_first_packet(trace, &first_packet, &tv); 233 if (first_packet != NULL) { 234 // So figure out our running offset 235 tv_real = trace_get_timeval(first_packet); 236 offset = tv_to_usec(tv) - tv_to_usec(&tv_real); 237 // Get time of day and do this stuff 238 uint64_t next_update_time; 239 next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset; 240 if (next_update_time <= mesg->additional.uint64) { 241 //fprintf(stderr, "Got a tick and publishing early!!\n"); 242 trace_publish_result(trace, t, (uint64_t) last_ts, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL); 243 trace_post_reporter(trace); 244 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count); 245 last_ts++; 246 } else { 247 //fprintf(stderr, "Got a tick but no publish ...\n"); 248 } 249 } else { 250 //fprintf(stderr, "Got a tick but no packets seen yet!!!\n"); 251 } 253 } else { 254 //fprintf(stderr, "Got a tick but no packets seen yet!!!\n"); 252 255 } 253 256 } 254 257 } 255 return pkt;258 return NULL; 256 259 } 257 260 -
tools/tracestats/tracestats_parallel.c
r2adc1d0 r0ec8a7c 102 102 103 103 104 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t) 104 //libtrace_message_t mesg 105 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 106 int mesg, libtrace_generic_t data, 107 libtrace_thread_t *sender) 105 108 { 106 109 // Using first entry as total and those after for filter counts 107 110 static __thread statistics_t * results = NULL; 108 int i; 109 110 if (pkt) { 111 int wlen = trace_get_wire_length(pkt); 111 int i, wlen; 112 libtrace_stat_t *stats; 113 114 115 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet)); 116 switch (mesg) { 117 case MESSAGE_PACKET: 118 wlen = trace_get_wire_length(data.pkt); 112 119 for(i=0;i<filter_count;++i) { 113 120 if (filters[i].filter == NULL) 114 121 continue; 115 if(trace_apply_filter(filters[i].filter, pkt) > 0) {122 if(trace_apply_filter(filters[i].filter,data.pkt) > 0) { 116 123 results[i+1].count++; 117 124 results[i+1].bytes+=wlen; … … 126 133 results[0].count++; 127 134 results[0].bytes +=wlen; 128 }129 if (mesg) {130 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));131 switch (mesg->code) {132 case MESSAGE_STOPPING:133 trace_publish_result(trace, t, 0, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0134 //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");135 break;136 case MESSAGE_STARTING:137 results = calloc(1, sizeof(statistics_t) * (filter_count + 1));138 break;139 case MESSAGE_DO_PAUSE:140 assert(!"GOT Asked to pause!!!\n");141 break;142 case MESSAGE_PAUSING:143 //fprintf(stderr, "tracestats_parallel:\t pausing thread\n");144 break;145 case MESSAGE_RESUMING:146 //fprintf(stderr, "tracestats_parallel:\t resuming thread\n");147 break;148 }149 } 150 return pkt;135 return data.pkt; 136 case MESSAGE_STOPPING: 137 stats = trace_create_statistics(); 138 trace_get_thread_statistics(trace, t, stats); 139 trace_print_statistics(stats, stderr, NULL); 140 free(stats); 141 trace_publish_result(trace, t, 0, (libtrace_generic_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0 142 //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n"); 143 break; 144 case MESSAGE_STARTING: 145 results = calloc(1, sizeof(statistics_t) * (filter_count + 1)); 146 break; 147 case MESSAGE_DO_PAUSE: 148 assert(!"GOT Asked to pause!!!\n"); 149 break; 150 case MESSAGE_PAUSING: 151 //fprintf(stderr, "tracestats_parallel:\t pausing thread\n"); 152 break; 153 case MESSAGE_RESUMING: 154 //fprintf(stderr, "tracestats_parallel:\t resuming thread\n"); 155 break; 156 } 157 return NULL; 151 158 } 152 159
Note: See TracChangeset
for help on using the changeset viewer.