Changeset 9d5f464
- Timestamp:
- 11/08/18 17:15:19 (4 years ago)
- Branches:
- develop
- Children:
- 6984c96
- Parents:
- 93f4c64
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
examples/tutorial/ipdist-parallel.c
r93f4c64 r9d5f464 15 15 uint64_t src[256]; 16 16 uint64_t dst[256]; 17 uint64_t lastkey; 18 uint64_t packets; 17 19 }; 18 20 /* Structure to hold the result from a processing thread */ … … 20 22 uint64_t src[256]; 21 23 uint64_t dst[256]; 24 uint64_t packets; 22 25 }; 23 26 /* Structure to hold counters the report has one of these, it combines … … 27 30 uint64_t dst[256]; 28 31 uint64_t lastkey; 32 uint64_t packets; 29 33 }; 30 34 … … 40 44 }; 41 45 46 uint64_t tickrate; 47 42 48 /* Start callback function - This is run for each thread when it starts */ 43 49 static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) { … … 50 56 local->dst[i] = 0; 51 57 } 58 local->lastkey = 0; 59 local->packets = 0; 52 60 53 61 /* return the local storage so it is available for all other callbacks for the thread*/ … … 114 122 struct addr_local *local = (struct addr_local *)tls; 115 123 124 /* Store the timestamp of the last packet in erf format 125 * We use the timestamp in the packet for processing non live traces */ 126 local->lastkey = trace_get_erf_timestamp(packet); 127 /* Increment the packet count */ 128 local->packets += 1; 129 116 130 /* Regain access to excluded networks pointer */ 117 131 struct exclude_networks *exclude = (struct exclude_networks *)global; … … 138 152 } 139 153 140 /* Stopping callback function */154 /* Stopping callback function - When a thread closes */ 141 155 static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) { 142 156 143 /* create a structure to hold the result from the processing thread */144 struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));145 157 /* cast the local storage structure */ 146 158 struct addr_local *local = (struct addr_local *)tls; 147 148 /* Populate the result structure */ 159 /* Create structure to store the result */ 160 struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result)); 161 162 /* Populate the result */ 149 163 int i; 150 164 for(i=0;i<256;i++) { 151 165 result->src[i] = local->src[i]; 152 result->dst[i] = local->dst[i]; 153 } 154 155 /* Publish the result to the reporter thread */ 166 result->src[i] = local->src[i]; 167 } 168 result->packets = local->packets; 169 170 /* This will not cause the result to be printed but will atleast end up going into our tally 171 * The reporter thread can then deal with it when it closes */ 156 172 trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER); 157 173 … … 159 175 free(local); 160 176 } 161 162 163 164 177 165 178 /* Starting callback for reporter thread */ … … 175 188 } 176 189 tally->lastkey = 0; 190 tally->packets = 0; 177 191 178 192 return tally; 179 193 } 180 194 181 static void plot_results(struct addr_tally *tally ) {195 static void plot_results(struct addr_tally *tally, uint64_t tick) { 182 196 183 197 /* Get the current time */ 184 198 time_t current_time = time(NULL); 185 char* time_string = ctime(¤t_time); 186 187 /* Push all the data into a tmp file for gnuplot */ 199 188 200 char outputfile[255]; 189 201 char outputplot[255]; 190 snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", current_time); 191 snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", current_time); 192 FILE *tmp = fopen(outputfile, "w"); 202 snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", tick); 203 snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", tick); 204 205 /* Push all data into data file */ 206 FILE *tmp = fopen(outputfile, "w"); 193 207 int i; 194 208 for(i=0;i<255;i++) { … … 196 210 } 197 211 fclose(tmp); 212 printf("wrote out to file %s\n", outputfile); 198 213 199 214 /* Open pipe to gnuplot */ … … 204 219 fprintf(gnuplot, "set xrange[0:255]\n"); 205 220 fprintf(gnuplot, "set xlabel 'Prefix'\n"); 206 fprintf(gnuplot, "set xlabel 'Hits'\n");221 fprintf(gnuplot, "set ylabel 'Hits'\n"); 207 222 fprintf(gnuplot, "set xtics 0,10,255\n"); 208 223 fprintf(gnuplot, "set output '%s'\n", outputplot); 209 224 fprintf(gnuplot, "plot '%s' using 1:2 title 'Source address' with boxes, '%s' using 1:3 title 'Destination address' with boxes\n", outputfile, outputfile); 225 fprintf(gnuplot, "replot"); 210 226 pclose(gnuplot); 211 227 } … … 217 233 218 234 struct addr_result *results; 219 uint64_t key;220 235 struct addr_tally *tally; 236 uint64_t key; 221 237 222 238 /* We only want to handle results containing our user-defined structure */ … … 233 249 /* Grab our tally out of thread local storage */ 234 250 tally = (struct addr_tally *)tls; 235 /* increment tally with the new results */ 236 int i; 237 for(i=0;i<256;i++) { 238 tally->src[i] += results->src[i]; 239 tally->dst[i] += results->dst[i]; 251 252 /* Add all the results to the tally */ 253 int i; 254 for(i=0;i<256;i++) { 255 tally->src[i] += results->src[i]; 256 tally->dst[i] += results->dst[i]; 257 } 258 tally->packets += results->packets; 259 260 /* If the current timestamp is greater than the last printed plus the interval, output a result */ 261 if((key >> 32) >= (tally->lastkey >> 32) + (tickrate/1000)) { 262 263 /* update last key */ 240 264 tally->lastkey = key; 241 } 242 243 /* Plot the result */ 244 plot_results(tally); 265 266 /* Plot the result with the key in epoch seconds*/ 267 plot_results(tally, key >> 32); 268 269 /* clear the tally */ 270 for(i=0;i<256;i++) { 271 tally->src[i] = 0; 272 tally->dst[i] = 0; 273 } 274 tally->packets = 0; 275 } 245 276 246 277 /* Cleanup the thread results */ … … 254 285 struct addr_tally *tally = (struct addr_tally *)tls; 255 286 256 //OUTPUT DATA 257 //plot_results(tally); 258 287 /* If there is any remaining data in the tally plot it */ 288 if(tally->packets > 0) { 289 plot_results(tally, (tally->lastkey >> 32) + tickrate); 290 } 259 291 /* Cleanup tally results*/ 260 292 free(tally); … … 264 296 265 297 struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result)); 298 /* Proccessing thread local storage */ 266 299 struct addr_local *local = (struct addr_local *)tls; 267 300 … … 276 309 local->dst[i] = 0; 277 310 } 278 279 printf("tick: %u\n", tick); 280 281 /* Push the result to the combiner */ 282 trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER); 311 result->packets = local->packets; 312 313 /* only use the tick timestamp if running against a live capture */ 314 uint64_t key; 315 if(trace_get_information(trace)->live) { 316 key = tick; 317 } else { 318 key = local->lastkey; 319 } 320 321 /* Push result to the combiner */ 322 trace_publish_result(trace, thread, key, (libtrace_generic_t){.ptr=result}, RESULT_USER); 283 323 } 284 324 … … 296 336 } 297 337 /* Convert tick into correct format */ 298 uint64_ttickrate = atoi(argv[2]);338 tickrate = atoi(argv[2]); 299 339 300 340 /* Create the trace */ … … 324 364 /* Try to balance the load across all processing threads */ 325 365 trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL); 366 326 367 /* Set the tick interval */ 327 368 trace_set_tick_interval(trace, tickrate); 328 369 /* Do not buffer the reports */ 370 trace_set_reporter_thold(trace, 1); 329 371 330 372
Note: See TracChangeset
for help on using the changeset viewer.