source: examples/tutorial/ipdist-parallel.c @ 49a047f

develop
Last change on this file since 49a047f was 49a047f, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Added Zipf's Distribution to plot

  • Property mode set to 100644
File size: 25.5 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9#include <math.h>
10
11/* Structure to hold the counters each thread has its own one of these */
12struct addr_local {
13        /* Holds the counts of each number occurance per octet, These are cleared after every output. */
14        uint64_t src[4][256];
15        uint64_t dst[4][256];
16        /* Holds the results from the previous output */
17        uint64_t src_lastoutput[4][256];
18        uint64_t dst_lastoutput[4][256];
19        /* Holds the timestamp */
20        uint64_t lastkey;
21        /* Is the count of the number of packets processed, This is cleared after every output. */
22        uint64_t packets;
23        uint64_t output_count;
24        /* Pointer to stats structure */
25        struct addr_stats *stats;
26};
27struct addr_stats {
28        /* Holds the percentage change compared to the previous output */
29        float src[4][256];
30        float dst[4][256];
31        //double mean_src[4];
32        //double mean_dst[4];
33        //double stdev_src[4];
34        //double stdev_dst[4];
35        //double variance_src[4];
36        //double variance_dst[4];
37        //double variation_src[4];
38        //double variation_dst[4];
39        struct addr_rank *rank_src[4];
40        struct addr_rank *rank_dst[4];
41};
42struct addr_rank {
43        uint8_t addr;
44        /* count is the priority */
45        uint64_t count;
46        /* pointer to next ranking item */
47        struct addr_rank* next;
48};
49
50/* Structure to hold excluded networks */
51struct exclude_networks {
52        int count;
53        struct network *networks;
54};
55struct network {
56        uint32_t address;
57        uint32_t mask;
58        uint32_t network;
59};
60
61/* interval between outputs in seconds */
62uint64_t tickrate;
63
64char *stats_outputdir = "/home/jcv9/output-spectre/";
65/* Calculate and plot the percentage change from the previous plot */
66int stats_percentage_change = 0;
67
68/*************************************************************************
69Priority queue linked list */
70
71static struct addr_rank *rank_new(uint8_t addr, uint64_t count) {
72        struct addr_rank *tmp = malloc(sizeof(struct addr_rank));
73        tmp->addr = addr;
74        tmp->count = count;
75        tmp->next = NULL;
76
77        return tmp;
78}
79static uint8_t peak(struct addr_rank **head) {
80        return (*head)->addr;
81}
82static uint64_t peak_count(struct addr_rank **head) {
83        return (*head)->count;
84}
85static void pop(struct addr_rank **head) {
86        struct addr_rank* tmp = *head;
87        (*head) = (*head)->next;
88        free(tmp);
89}
90static void push(struct addr_rank **head, uint8_t addr, uint64_t count) {
91        struct addr_rank *curr = (*head);
92        struct addr_rank *tmp = rank_new(addr, count);
93
94        /* Check if the new node has a greater priority than the head */
95        if((*head)->count < count) {
96                tmp->next = *head;
97                (*head) = tmp;
98        } else {
99                /* Jump through the list until we find the correct position */
100                while (curr->next != NULL && curr->next->count > count) {
101                        curr = curr->next;
102                }
103
104                tmp->next = curr->next;
105                curr->next = tmp;
106        }
107}
108/*************************************************************************/
109
110
111static void compute_stats(struct addr_local *tally) {
112        int i, j, k;
113
114        /* Calculates the percentage change from the last output. NEED TO MAKE THIS WEIGHTED */
115        if(stats_percentage_change) {
116                for(i=0;i<256;i++) {
117                        for(j=0;j<4;j++) {
118                                tally->stats->src[j][i] = 0;
119                                tally->stats->dst[j][i] = 0;
120                                if(tally->src[j][i] != 0) {
121                                        tally->stats->src[j][i] = (((float)tally->src[j][i] - (float)tally->src_lastoutput[j][i]) / (float)tally->src[j][i]) * 100;
122                                }
123                                if(tally->dst[j][i] != 0) {
124                                        tally->stats->dst[j][i] = (((float)tally->dst[j][i] - (float)tally->dst_lastoutput[j][i]) / (float)tally->dst[j][i]) * 100;
125                                }
126                        }
127                }
128        }
129
130        /* To get ranking we push everything into the priority queue at pop things off 1 by one which returns them in high to lowest priority */
131        for(i=0;i<4;i++) {
132                tally->stats->rank_src[i] = rank_new(0, tally->src[i][0]);
133                tally->stats->rank_dst[i] = rank_new(0, tally->dst[i][0]);
134                for(j=1;j<256;j++) {
135                        /* Push everything into the priority queue
136                         * each item will be popped off in the correct order */
137                        push(&tally->stats->rank_src[i], j, tally->src[i][j]);
138                        push(&tally->stats->rank_dst[i], j, tally->dst[i][j]);
139                }
140        }
141
142
143
144        /* This will all result in overflows, needs to be a rolling average?? stdev etc */
145        /* Calculate mean */
146//      for(i=0;i<4;i++) {
147//              uint64_t count_src = 0;
148//              uint64_t count_dst = 0;
149//              uint64_t tmp_src = 0;
150//              uint64_t tmp_dst = 0;
151//              for(j=0;j<256;j++) {
152//                      tmp_src += (j * tally->src[i][j]);
153//                      count_src += tally->src[i][j];
154//                      tmp_dst += (j * tally->dst[i][j]);
155//                      count_dst += tally->dst[i][j];
156//              }
157//
158//              tally->stats->mean_src[i] = tmp_src / count_src;
159//              tally->stats->mean_dst[i] = tmp_dst / count_dst;
160//      }
161//      printf("mean: %f\n", tally->stats->mean_src[0]);
162//
163//
164//      /* Calculate variance, standard deviation and variation*/
165//      for(i=0;i<4;i++) {
166//              uint64_t count_src = 0;
167//              uint64_t count_dst = 0;
168//              uint64_t tmp_src = 0;
169//              uint64_t tmp_dst = 0;
170//              for(j=0;j<256;j++) {
171//                      tmp_src += (j * pow((tally->src[i][j] - tally->stats->mean_src[i]), 2));
172//                      count_src += tally->src[i][j];
173//                      tmp_dst += (j * pow((tally->dst[i][j] - tally->stats->mean_dst[i]), 2));
174//                      count_dst += tally->dst[i][j];
175//              }
176//              //printf("total: %u count: %u dd: %f\n", tmp_src, count_src, tmp_src/count_src);
177//              tally->stats->variance_src[i] = (double)tmp_src / (double)count_src;
178//              tally->stats->variance_dst[i] = (double)tmp_dst / (double)count_dst;
179//              tally->stats->stdev_src[i] = sqrt(tally->stats->variance_src[i]);
180//              tally->stats->stdev_dst[i] = sqrt(tally->stats->variance_dst[i]);
181//
182//              /* Calculate coefficient of variation */
183//              tally->stats->variation_src[i] = tally->stats->stdev_src[i] / tally->stats->mean_src[i];
184//              tally->stats->variation_dst[i] = tally->stats->stdev_dst[i] / tally->stats->mean_dst[i];
185//      }
186//
187//
188//      printf("stdev: %f variance: %f variation: %f\n", tally->stats->stdev_src[0], tally->stats->variance_src[0], tally->stats->variation_src[0]);
189
190}
191
192static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
193
194        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
195        /* Proccessing thread local storage */
196        struct addr_local *local = (struct addr_local *)tls;
197
198        /* Populate the result structure from the threads local storage and clear threads local storage*/
199        int i, j;
200        for(i=0;i<4;i++) {
201                for(j=0;j<256;j++) {
202                        result->src[i][j] = local->src[i][j];
203                        result->dst[i][j] = local->dst[i][j];
204                        /* clear local storage */
205                        local->src[i][j] = 0;
206                        local->dst[i][j] = 0;
207                }
208        }
209        result->packets = local->packets;
210        local->packets = 0;
211
212        /* Push result to the combiner */
213        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
214}
215
216/* Start callback function - This is run for each thread when it starts */
217static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
218
219        /* Create and initialize the local counter struct */
220        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
221        int i, j;
222        for(i=0;i<4;i++) {
223                for(j=0;j<256;j++) {
224                        local->src[i][j] = 0;
225                        local->dst[i][j] = 0;
226                }
227        }
228        local->lastkey = 0;
229        local->packets = 0;
230
231        /* return the local storage so it is available for all other callbacks for the thread*/
232        return local;
233}
234
235/* Checks if address is part of a excluded subnet. */
236static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
237
238        int i;
239        for(i=0;i<exclude->count;i++) {
240                /* Convert address into a network address */
241                uint32_t net_addr = address & exclude->networks[i].mask;
242
243                /* If this matches the network address from the excluded list we need to exclude this
244                   address. */
245                if(net_addr == exclude->networks[i].network) {
246                        return 1;
247                }
248        }
249
250        /* If we got this far the address should not be excluded */
251        return 0;
252}
253
254static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
255
256        /* Checks if the ip is of type IPv4 */
257        if (ip->sa_family == AF_INET) {
258
259                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
260                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
261                /* Get in_addr from sockaddr */
262                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
263                /* Ensure the address is in network byte order */
264                uint32_t address = htonl(ip4.s_addr);
265
266                /* Check if the address is part of an excluded network. */
267                if(network_excluded(address, exclude) == 0) {
268
269                        /* Split the IPv4 address into each octet */
270                        uint8_t octet[4];
271                        octet[0] = (address & 0xff000000) >> 24;
272                        octet[1] = (address & 0x00ff0000) >> 16;
273                        octet[2] = (address & 0x0000ff00) >> 8;
274                        octet[3] = (address & 0x000000ff);
275
276                        /* check if the supplied address was a source or destination,
277                           increment the correct one */
278                        if(srcaddr) {
279                                int i;
280                                for(i=0;i<4;i++) {
281                                        local->src[i][octet[i]] += 1;
282                                }
283                        } else {
284                                int i;
285                                for(i=0;i<4;i++) {
286                                        local->dst[i][octet[i]] += 1;
287                                }
288                        }
289                }
290        }
291}
292
293/* Per packet callback function run by each thread */
294static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
295        libtrace_packet_t *packet) {
296
297        /* Regain access to the address counter structure */
298        struct addr_local *local = (struct addr_local *)tls;
299
300        /* If this is the first packet set the lastkey to the packets timestamp */
301        if(local->lastkey == 0) {
302                local->lastkey = trace_get_erf_timestamp(packet);
303        }
304
305        /* Increment the packet count */
306        local->packets += 1;
307
308        /* Regain access to excluded networks pointer */
309        struct exclude_networks *exclude = (struct exclude_networks *)global;
310
311        struct sockaddr_storage addr;
312        struct sockaddr *ip;
313
314        /* Get the source IP address */
315        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
316        /* If a source ip address was found */
317        if(ip != NULL) {
318                process_ip(ip, local, exclude, 1);
319        }
320
321        /* Get the destination IP address */
322        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
323        /* If a destination ip address was found */
324        if(ip != NULL) {
325                process_ip(ip, local, exclude, 0);
326        }
327
328        /* If this trace is not live we will manually call "per tick" */
329        if(!trace_get_information(trace)->live) {
330                /* get the current packets timestamp */
331                uint64_t timestamp = trace_get_erf_timestamp(packet);
332
333                /* We only want to call per_tick if we are due to output something
334                 * Right shifting these converts them to seconds, tickrate is in seconds */
335                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
336                        per_tick(trace, thread, global, local, timestamp);
337                        local->lastkey = timestamp;
338                }
339        }
340
341        /* Return the packet to libtrace */
342        return packet;
343}
344
345/* Stopping callback function - When a thread closes */
346static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
347
348        /* cast the local storage structure */
349        struct addr_local *local = (struct addr_local *)tls;
350        /* Create structure to store the result */
351        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
352
353        /* Populate the result */
354        int i, j;
355        for(i=0;i<4;i++) {
356                for(j=0;j<256;j++) {
357                        result->src[i][j] = local->src[i][j];
358                        result->dst[i][j] = local->dst[i][j];
359                }
360        }
361        result->packets = local->packets;
362
363        /* Send the final results to the combiner */
364        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
365
366        /* Cleanup the local storage */
367        free(local);
368}
369
370/* Starting callback for reporter thread */
371static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
372        /* Create tally structure */
373        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
374        tally->stats = malloc(sizeof(struct addr_stats));
375
376        /* Initialize the tally structure */
377        int i, j;
378        for(i=0;i<4;i++) {
379                for(j=0;j<256;j++) {
380                        tally->src[i][j] = 0;
381                        tally->dst[i][j] = 0;
382                        tally->src_lastoutput[i][j] = 0;
383                        tally->dst_lastoutput[i][j] = 0;
384                        tally->stats->src[i][j] = 0;
385                        tally->stats->dst[i][j] = 0;
386                }
387                /* Stats related varibles */
388                //tally->stats->mean_src[i] = 0;
389                //tally->stats->mean_dst[i] = 0;
390                //tally->stats->stdev_src[i] = 0;
391                //tally->stats->stdev_dst[i] = 0;
392                //tally->stats->variance_src[i] = 0;
393                //tally->stats->variance_dst[i] = 0;
394                //tally->stats->variation_src[i] = 0;
395                //tally->stats->variation_dst[i] = 0;
396        }
397        tally->lastkey = 0;
398        tally->packets = 0;
399        tally->output_count = 0;
400
401        return tally;
402}
403
404static void plot_results(struct addr_local *tally, uint64_t tick) {
405
406        int i, j, k;
407
408        /* Calculations before reporting the results */
409        /* Need to initialise lastoutput values on first pass,
410         * this is so we have a base line for percentage changed */
411        if(tally->output_count == 0) {
412                for(i=0;i<4;i++) {
413                        for(j=0;j<256;j++) {
414                                tally->src_lastoutput[i][j] = tally->src[i][j];
415                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
416                        }
417                }
418         }
419        /* Compute the stats */
420        compute_stats(tally);
421
422        /* Finaly output the results */
423        printf("Generating output \"%sipdist-%u\"\n", stats_outputdir, tick);
424
425        /* Output the results */
426        char outputfile[255];
427        snprintf(outputfile, sizeof(outputfile), "%sipdist-%u.data", stats_outputdir, tick);
428        FILE *tmp = fopen(outputfile, "w");
429        fprintf(tmp, "#time\t\trank\toctet1\t\t\t\toctet2\t\t\t\toctet3\t\t\t\toctet4\n");
430        fprintf(tmp, "#\t\t\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\n");
431        for(i=0;i<256;i++) {
432                fprintf(tmp, "%d\t%d", tick, i+1);
433                for(j=0;j<4;j++) {
434                        /* Get the highest ranking to lowest ranking octets */
435                        fprintf(tmp, "\t%d", peak(&tally->stats->rank_src[j]));
436                        fprintf(tmp, "\t%d", peak_count(&tally->stats->rank_src[j]));
437                        fprintf(tmp, "\t%d", peak(&tally->stats->rank_dst[j]));
438                        fprintf(tmp, "\t%d", peak_count(&tally->stats->rank_dst[j]));
439                        pop(&tally->stats->rank_src[j]);
440                        pop(&tally->stats->rank_dst[j]);
441                }
442                fprintf(tmp, "\n");
443        }
444        fclose(tmp);
445
446        /* Puts data into timeseries files that gnuplot likes */
447        char outputfile2[255];
448        for(k=0;k<2;k++) {
449                for(j=0;j<4;j++) {
450                        /* If k is 0 we are doing src else dst */
451                        if(k) {
452                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-dst-octet%d.data", stats_outputdir, j+1);
453                        } else {
454                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-src-octet%d.data", stats_outputdir, j+1);
455                        }
456                        if(tally->output_count == 0) {
457                                tmp = fopen(outputfile2, "w");
458                                fprintf(tmp, "prefix\t");
459                                for(i=0;i<256;i++) {
460                                        fprintf(tmp, "%d\t", i);
461                                }
462                                fprintf(tmp, "\n");
463                        } else {
464                                tmp = fopen(outputfile2, "a");
465                        }
466                        fprintf(tmp, "%d\t", tick);
467                        for(i=0;i<256;i++) {
468                                if(k) {
469                                        fprintf(tmp, "%d\t", tally->dst[j][i]);
470                                } else {
471                                        fprintf(tmp, "%d\t", tally->src[j][i]);
472                                }
473                        }
474                        fprintf(tmp, "\n");
475                        fclose(tmp);
476                }
477        }
478
479        /* Plot the results */
480        for(i=0;i<4;i++) {
481                char outputplot[255];
482                snprintf(outputplot, sizeof(outputplot), "%sipdist-%u-octet%d.png", stats_outputdir, tick, i+1);
483                /* Open pipe to gnuplot */
484                FILE *gnuplot = popen("gnuplot -persistent", "w");
485                /* send all commands to gnuplot */
486                fprintf(gnuplot, "set term pngcairo dashed enhanced size 1280,960\n");
487                fprintf(gnuplot, "set output '%s'\n", outputplot);
488                fprintf(gnuplot, "set multiplot layout 2,1\n");
489                fprintf(gnuplot, "set title 'IP Distribution'\n");
490                fprintf(gnuplot, "set xrange[0:255]\n");
491                fprintf(gnuplot, "set xlabel 'Prefix'\n");
492                fprintf(gnuplot, "set ylabel 'Hits'\n");
493                fprintf(gnuplot, "set xtics 0,10,255\n");
494                fprintf(gnuplot, "plot '%s' using %d:%d title 'Source octet %d' smooth unique with boxes,", outputfile, (i*4)+3,(i*4)+4, i+1);
495                fprintf(gnuplot, "'%s' using %d:%d title 'Destination octet %d' smooth unique with boxes\n", outputfile, (i*4)+5, (i*4)+6, i+1);
496                fprintf(gnuplot, "set title 'Zipf Distribution'\n");
497                fprintf(gnuplot, "set xlabel 'Rank'\n");
498                fprintf(gnuplot, "set ylabel 'Frequency'\n");
499                fprintf(gnuplot, "set xrange[1:255]\n");
500                fprintf(gnuplot, "set logscale xy 10\n");
501                fprintf(gnuplot, "plot '%s' using 2:%d title 'Source octet %d',", outputfile, (i*4)+4, i+1);
502                fprintf(gnuplot, "'%s' using 2:%d title 'Destination octet %d'\n", outputfile, (i*4)+6, i+1);
503                fprintf(gnuplot, "replot");
504                pclose(gnuplot);
505        }
506
507        /* Plot time series */
508        for(k=0;k<2;k++) {
509                for(i=0;i<4;i++) {
510                        char outputplot2[255];
511                        if(k) {
512                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-dst-octet%i.png", stats_outputdir, i+1);
513                        } else {
514                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-src-octet%i.png", stats_outputdir, i+1);
515                        }
516                        FILE *gnuplot = popen("gnuplot -persistent", "w");
517                        fprintf(gnuplot, "set term pngcairo size 1280,960 \n");
518                        fprintf(gnuplot, "set output '%s'\n", outputplot2);
519                        if(k) {
520                                fprintf(gnuplot, "set title 'Timeseries Dst Octet %i'\n", i+1);
521                        } else {
522                                fprintf(gnuplot, "set title 'Timeseries Src Octet %i'\n", i+1);
523                        }
524                        fprintf(gnuplot, "set xtics rotate\n");
525                        fprintf(gnuplot, "set key out vert\n");
526                        fprintf(gnuplot, "set key right\n");
527                        //fprintf(gnuplot, "set xdata time\n");
528                        //fprintf(gnuplot, "set timefmt '%%s'\n");
529                        //fprintf(gnuplot, "set format x '%%m/%%d/%%Y %%H:%%M:%%S'\n");
530                        fprintf(gnuplot, "set autoscale xy\n");
531                        if(k) {
532                                fprintf(gnuplot, "plot '%sipdist-timeseries-dst-octet%d.data' using 2:xtic(1) with lines title columnheader(2), for[i=3:257] '' using i with lines title columnheader(i)\n", stats_outputdir, i+1);
533                        } else {
534                                fprintf(gnuplot, "plot '%sipdist-timeseries-src-octet%d.data' using 2:xtic(1) with lines title columnheader(2), for[i=3:257] '' using i with lines title columnheader(i)\n", stats_outputdir, i+1);
535                        }
536                        fprintf(gnuplot, "replot");
537                        pclose(gnuplot);
538                }
539        }
540}
541
542
543/* Callback when a result is given to the reporter thread */
544static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
545        void *tls, libtrace_result_t *result) {
546
547        struct addr_local *results;
548        struct addr_local *tally;
549        uint64_t key;
550
551        /* We only want to handle results containing our user-defined structure  */
552        if(result->type != RESULT_USER) {
553                return;
554        }
555
556        /* This key is the key that was passed into trace_publish_results
557         * this will contain the erf timestamp for the packet */
558        key = result->key;
559
560        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
561        results = (struct addr_local *)result->value.ptr;
562
563        /* Grab our tally out of thread local storage */
564        tally = (struct addr_local *)tls;
565
566        /* Add all the results to the tally */
567        int i, j;
568        for(i=0;i<4;i++) {
569                for(j=0;j<256;j++) {
570                        tally->src[i][j] += results->src[i][j];
571                        tally->dst[i][j] += results->dst[i][j];
572                }
573        }
574        tally->packets += results->packets;
575
576        /* If the current timestamp is greater than the last printed plus the interval, output a result */
577        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
578
579                /* update last key */
580                tally->lastkey = key;
581
582                /* Plot the result with the key in epoch seconds*/
583                plot_results(tally, key >> 32);
584
585                /* increment the output counter */
586                tally->output_count++;
587
588                /* clear the tally but copy old values over first*/
589                for(i=0;i<4;i++) {
590                        for(j=0;j<256;j++) {
591                                tally->src_lastoutput[i][j] = tally->src[i][j];
592                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
593                                tally->src[i][j] = 0;
594                                tally->dst[i][j] = 0;
595                        }
596                        /* Clear all stats related data */
597                        //tally->stats->mean_src[i] = 0;
598                        //tally->stats->mean_dst[i] = 0;
599                        //tally->stats->stdev_src[i] = 0;
600                        //tally->stats->stdev_dst[i] = 0;
601                        //tally->stats->variance_src[i] = 0;
602                        //tally->stats->variance_dst[i] = 0;
603                        //tally->stats->variation_src[i] = 0;
604                        //tally->stats->variation_dst[i] = 0;
605
606                }
607                /* free the priority queue */
608                for(i=0;i<4;i++) {
609                        free(tally->stats->rank_src[i]);
610                        free(tally->stats->rank_dst[i]);
611                }
612
613                tally->packets = 0;
614
615        }
616
617        /* Cleanup the thread results */
618        free(results);
619}
620
621/* Callback when the reporter thread stops (essentially when the program ends) */
622static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
623
624        /* Get the tally from the thread local storage */
625        struct addr_local *tally = (struct addr_local *)tls;
626
627        /* If there is any remaining data in the tally plot it */
628        if(tally->packets > 0) {
629                /* Then plot the results */
630                plot_results(tally, (tally->lastkey >> 32) + 1);
631        }
632        /* Cleanup tally results*/
633        free(tally);
634}
635
636static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
637        libtrace_callback_set_t *reporting) {
638        /* Only destroy if the structure exists */
639        if(trace) {
640                trace_destroy(trace);
641        }
642        if(processing) {
643                trace_destroy_callback_set(processing);
644        }
645        if(reporting) {
646                trace_destroy_callback_set(reporting);
647        }
648}
649
650/* Converts a string representation eg 1.2.3.4/24 into a network structure */
651static int get_network(char *network_string, struct network *network) {
652
653        char delim[] = "/";
654        /* Split the address and mask portion of the string */
655        char *address = strtok(network_string, delim);
656        char *mask = strtok(NULL, delim);
657
658        /* Check the subnet mask is valid */
659        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
660                return 1;
661        }
662        /* right shift so netmask is in network byte order */
663        network->mask = 0xffffffff << (32 - atoi(mask));
664
665        struct in_addr addr;
666        /* Convert address string into uint32_t and check its valid */
667        if(inet_aton(address, &addr) == 0) {
668                return 2;
669        }
670        /* Ensure its saved in network byte order */
671        network->address = htonl(addr.s_addr);
672
673        /* Calculate the network address */
674        network->network = network->address & network->mask;
675
676        return 0;
677}
678
679int main(int argc, char *argv[]) {
680
681        libtrace_t *trace;
682        /* Callbacks for processing and reporting threads */
683        libtrace_callback_set_t *processing, *reporter;
684
685
686        /* Ensure the input URI was supplied */
687        if(argc < 3) {
688                fprintf(stderr, "Usage: %s inputURI [outputInterval (Seconds)] [excluded networks]\n", argv[0]);
689                fprintf(stderr, "       eg. ./ipdist input.erf 60 210.10.3.0/24 70.5.0.0/16\n");
690                return 1;
691        }
692        /* Convert tick into an int */
693        tickrate = atoi(argv[2]);
694
695
696        /* Create the trace */
697        trace = trace_create(argv[1]);
698        /* Ensure no error has occured creating the trace */
699        if(trace_is_err(trace)) {
700                trace_perror(trace, "Creating trace");
701                return 1;
702        }
703
704        /* Setup the processing threads */
705        processing = trace_create_callback_set();
706        trace_set_starting_cb(processing, start_callback);
707        trace_set_packet_cb(processing, per_packet);
708        trace_set_stopping_cb(processing, stop_processing);
709        trace_set_tick_interval_cb(processing, per_tick);
710        /* Setup the reporter threads */
711        reporter = trace_create_callback_set();
712        trace_set_starting_cb(reporter, start_reporter);
713        trace_set_result_cb(reporter, per_result);
714        trace_set_stopping_cb(reporter, stop_reporter);
715
716        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
717        trace_set_perpkt_threads(trace, 4);
718        /* Order the results by timestamp */
719        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
720        /* Try to balance the load across all processing threads */
721        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
722
723        /* Set the tick interval only if this is a live capture */
724        if(trace_get_information(trace)->live) {
725                /* tickrate is in seconds but tick_interval expects milliseconds */
726                trace_set_tick_interval(trace, tickrate*1000);
727        }
728        /* Do not buffer the reports */
729        trace_set_reporter_thold(trace, 1);
730
731
732        /* Setup excluded networks if any were supplied */
733        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
734        exclude->networks = malloc(sizeof(struct network)*(argc-3));
735        if(exclude == NULL || exclude->networks == NULL) {
736                fprintf(stderr, "Unable to allocate memory");
737                libtrace_cleanup(trace, processing, reporter);
738                return 1;
739        }
740        exclude->count = 0;
741        int i;
742        for(i=0;i<argc-3;i++) {
743                /* convert the network string into a network structure */
744                if(get_network(argv[i+3], &exclude->networks[i]) != 0) {
745                        fprintf(stderr, "Error creating excluded network");
746                        return 1;
747                }
748                /* increment the count of excluded networks */
749                exclude->count += 1;
750        }
751
752
753        /* Start the trace, if it errors return */
754        if(trace_pstart(trace, exclude, processing, reporter)) {
755                trace_perror(trace, "Starting parallel trace");
756                libtrace_cleanup(trace, processing, reporter);
757                return 1;
758        }
759
760        /* This will wait for all threads to complete */
761        trace_join(trace);
762
763        /* Clean up everything */
764        free(exclude->networks);
765        free(exclude);
766        libtrace_cleanup(trace, processing, reporter);
767
768        return 0;
769}
Note: See TracBrowser for help on using the repository browser.