source: examples/tutorial/ipdist-parallel.c @ dbd745c

develop
Last change on this file since dbd745c was dbd745c, checked in by Jacob Van Walraven <jcv9@…>, 4 years ago

Convert command arguments to use getargs

  • Property mode set to 100644
File size: 31.3 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9#include <math.h>
10#include <getopt.h>
11
12/* Structure to hold the counters each thread has its own one of these */
13struct addr_local {
14        /* Holds the counts of each number occurance per octet, These are cleared after every output. */
15        uint64_t src[4][256];
16        uint64_t dst[4][256];
17        /* Holds the results from the previous output */
18        uint64_t src_lastoutput[4][256];
19        uint64_t dst_lastoutput[4][256];
20        /* Holds the timestamp */
21        uint64_t lastkey;
22        /* Is the count of the number of packets processed, This is cleared after every output. */
23        uint64_t packets;
24        /* Total number an output has been generated */
25        uint64_t output_count;
26        /* Pointer to stats structure */
27        struct addr_stats *stats;
28};
29struct addr_stats {
30        /* Holds the percentage change compared to the previous output */
31        float src[4][256];
32        float dst[4][256];
33        /* Stats calculated independently per output */
34        double mode_src[4];
35        double mode_dst[4];
36        double mean_src[4];
37        double mean_dst[4];
38        double median_src[4];
39        double median_dst[4];
40        double stddev_src[4];
41        double stddev_dst[4];
42        double variance_src[4];
43        double variance_dst[4];
44        double skewness_src[4];
45        double skewness_dst[4];
46        /* Stats calculated over entire trace */
47        double total_skewness_src[4];
48        double total_skewness_dst[4];
49
50        struct addr_rank *rank_src[4];
51        struct addr_rank *rank_dst[4];
52};
53struct addr_rank {
54        uint8_t addr;
55        /* count is the priority */
56        uint64_t count;
57        /* pointer to next ranking item */
58        struct addr_rank* next;
59};
60
61/* Structure to hold excluded networks */
62struct exclude_networks {
63        int count;
64        struct network *networks;
65};
66struct network {
67        uint32_t address;
68        uint32_t mask;
69        uint32_t network;
70};
71
72uint64_t tickrate;
73char *stats_outputdir = "";
74
75/*************************************************************************
76Priority queue linked list */
77
78static struct addr_rank *rank_new(uint8_t addr, uint64_t count) {
79        struct addr_rank *tmp = malloc(sizeof(struct addr_rank));
80        tmp->addr = addr;
81        tmp->count = count;
82        tmp->next = NULL;
83
84        return tmp;
85}
86static uint8_t peak(struct addr_rank **head) {
87        return (*head)->addr;
88}
89static uint64_t peak_count(struct addr_rank **head) {
90        return (*head)->count;
91}
92static void pop(struct addr_rank **head) {
93        struct addr_rank* tmp = *head;
94        (*head) = (*head)->next;
95        free(tmp);
96}
97static void push(struct addr_rank **head, uint8_t addr, uint64_t count) {
98        struct addr_rank *curr = (*head);
99        struct addr_rank *tmp = rank_new(addr, count);
100
101        /* Check if the new node has a greater priority than the head */
102        if((*head)->count < count) {
103                tmp->next = *head;
104                (*head) = tmp;
105        } else {
106                /* Jump through the list until we find the correct position */
107                while (curr->next != NULL && curr->next->count > count) {
108                        curr = curr->next;
109                }
110
111                tmp->next = curr->next;
112                curr->next = tmp;
113        }
114}
115/*************************************************************************/
116
117
118static void compute_stats(struct addr_local *tally) {
119        int i, j, k;
120
121        /* To get ranking we push everything into the priority queue at pop things off 1 by one which returns them in high to lowest priority */
122        for(i=0;i<4;i++) {
123                tally->stats->rank_src[i] = rank_new(0, tally->src[i][0]);
124                tally->stats->rank_dst[i] = rank_new(0, tally->dst[i][0]);
125                for(j=1;j<256;j++) {
126                        /* Push everything into the priority queue
127                         * each item will be popped off in the correct order */
128                        push(&tally->stats->rank_src[i], j, tally->src[i][j]);
129                        push(&tally->stats->rank_dst[i], j, tally->dst[i][j]);
130                }
131        }
132
133        /* Calculate mean, variance and standard deviation */
134        for(k=0;k<4;k++) {
135
136                double ex = 0;
137                double ex2 = 0;
138                double n = 0;
139                double m = 0;
140                for(i=0;i<256;i++) {
141                        for(j=0;j<tally->src[k][i];j++) {
142                                if(n == 0) {
143                                        m = i;
144                                }
145                                n += 1;
146                                ex += (i - m);
147                                ex2 += ((i - m) * (i - m));
148                        }
149                }
150
151                tally->stats->mean_src[k] = (k + (ex / n));
152                tally->stats->variance_src[k] = ((ex2 - (ex*ex)/n) / n);
153                tally->stats->stddev_src[k] = sqrt(tally->stats->variance_src[k]);
154
155                ex = 0;
156                ex2 = 0;
157                n = 0;
158                m = 0;
159                for(i=0;i<256;i++) {
160                        for(j=0;j<tally->dst[k][i];j++) {
161                                if(n == 0) {
162                                        m = i;
163                                }
164                                n += 1;
165                                ex += (i - m);
166                                ex2 += ((i - m) * (i - m));
167                        }
168                }
169                tally->stats->mean_dst[k] = (k + (ex / n));
170                tally->stats->variance_dst[k] = ((ex2 - (ex*ex)/n) / n);
171                tally->stats->stddev_dst[k] = sqrt(tally->stats->variance_dst[k]);
172                /* Get the median */
173                int c = (n/2) - tally->src[k][0];
174                int c2 = 0;
175                while(c > 0) {
176                        c2 += 1;
177                        c -= tally->src[k][c2];
178                }
179                tally->stats->median_src[k] = c2;
180                c = (n/2) - tally->dst[k][0];
181                c2 = 0;
182                while(c > 0) {
183                        c2 += 1;
184                        c -= tally->dst[k][c2];
185                }
186                tally->stats->median_dst[k] = c2;
187                /* Get the mode which is the first item in the priority queue */
188                tally->stats->mode_src[k] = peak(&tally->stats->rank_src[k]);
189                tally->stats->mode_dst[k] = peak(&tally->stats->rank_src[k]);
190                /* Calculate skewness */
191                tally->stats->skewness_src[k] = (tally->stats->mean_src[k] - tally->stats->median_src[k]) / tally->stats->stddev_src[k];
192                tally->stats->skewness_dst[k] = (tally->stats->mean_dst[k] - tally->stats->median_dst[k]) / tally->stats->stddev_dst[k];
193
194                /* Increment total skew */
195                tally->stats->total_skewness_src[k] += tally->stats->skewness_src[k];
196                tally->stats->total_skewness_dst[k] += tally->stats->skewness_dst[k];
197        }
198
199}
200
201static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
202
203        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
204        /* Proccessing thread local storage */
205        struct addr_local *local = (struct addr_local *)tls;
206
207        /* Populate the result structure from the threads local storage and clear threads local storage*/
208        int i, j;
209        for(i=0;i<4;i++) {
210                for(j=0;j<256;j++) {
211                        result->src[i][j] = local->src[i][j];
212                        result->dst[i][j] = local->dst[i][j];
213                        /* clear local storage */
214                        local->src[i][j] = 0;
215                        local->dst[i][j] = 0;
216                }
217        }
218        result->packets = local->packets;
219        local->packets = 0;
220
221        /* Push result to the combiner */
222        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
223}
224
225/* Start callback function - This is run for each thread when it starts */
226static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
227
228        /* Create and initialize the local counter struct */
229        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
230        int i, j;
231        for(i=0;i<4;i++) {
232                for(j=0;j<256;j++) {
233                        local->src[i][j] = 0;
234                        local->dst[i][j] = 0;
235                }
236        }
237        local->lastkey = 0;
238        local->packets = 0;
239
240        /* return the local storage so it is available for all other callbacks for the thread*/
241        return local;
242}
243
244/* Checks if address is part of a excluded subnet. */
245static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
246
247        int i;
248        for(i=0;i<exclude->count;i++) {
249                /* Convert address into a network address */
250                uint32_t net_addr = address & exclude->networks[i].mask;
251
252                /* If this matches the network address from the excluded list we need to exclude this
253                   address. */
254                if(net_addr == exclude->networks[i].network) {
255                        return 1;
256                }
257        }
258
259        /* If we got this far the address should not be excluded */
260        return 0;
261}
262
263static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
264
265        /* Checks if the ip is of type IPv4 */
266        if (ip->sa_family == AF_INET) {
267
268                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
269                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
270                /* Get in_addr from sockaddr */
271                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
272                /* Ensure the address is in network byte order */
273                uint32_t address = htonl(ip4.s_addr);
274
275                /* Check if the address is part of an excluded network. */
276                if(network_excluded(address, exclude) == 0) {
277
278                        /* Split the IPv4 address into each octet */
279                        uint8_t octet[4];
280                        octet[0] = (address & 0xff000000) >> 24;
281                        octet[1] = (address & 0x00ff0000) >> 16;
282                        octet[2] = (address & 0x0000ff00) >> 8;
283                        octet[3] = (address & 0x000000ff);
284
285                        /* check if the supplied address was a source or destination,
286                           increment the correct one */
287                        if(srcaddr) {
288                                int i;
289                                for(i=0;i<4;i++) {
290                                        local->src[i][octet[i]] += 1;
291                                }
292                        } else {
293                                int i;
294                                for(i=0;i<4;i++) {
295                                        local->dst[i][octet[i]] += 1;
296                                }
297                        }
298                }
299        }
300}
301
302/* Per packet callback function run by each thread */
303static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
304        libtrace_packet_t *packet) {
305
306        /* Regain access to the address counter structure */
307        struct addr_local *local = (struct addr_local *)tls;
308
309        /* If this is the first packet set the lastkey to the packets timestamp */
310        if(local->lastkey == 0) {
311                local->lastkey = trace_get_erf_timestamp(packet);
312        }
313
314        /* Increment the packet count */
315        local->packets += 1;
316
317        /* Regain access to excluded networks pointer */
318        struct exclude_networks *exclude = (struct exclude_networks *)global;
319
320        struct sockaddr_storage addr;
321        struct sockaddr *ip;
322
323        /* Get the source IP address */
324        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
325        /* If a source ip address was found */
326        if(ip != NULL) {
327                process_ip(ip, local, exclude, 1);
328        }
329
330        /* Get the destination IP address */
331        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
332        /* If a destination ip address was found */
333        if(ip != NULL) {
334                process_ip(ip, local, exclude, 0);
335        }
336
337        /* If this trace is not live we will manually call "per tick" */
338        if(!trace_get_information(trace)->live) {
339                /* get the current packets timestamp */
340                uint64_t timestamp = trace_get_erf_timestamp(packet);
341
342                /* We only want to call per_tick if we are due to output something
343                 * Right shifting these converts them to seconds, tickrate is in seconds */
344                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
345                        per_tick(trace, thread, global, local, timestamp);
346                        local->lastkey = timestamp;
347                }
348        }
349
350        /* Return the packet to libtrace */
351        return packet;
352}
353
354/* Stopping callback function - When a thread closes */
355static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
356
357        /* cast the local storage structure */
358        struct addr_local *local = (struct addr_local *)tls;
359        /* Create structure to store the result */
360        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
361
362        /* Populate the result */
363        int i, j;
364        for(i=0;i<4;i++) {
365                for(j=0;j<256;j++) {
366                        result->src[i][j] = local->src[i][j];
367                        result->dst[i][j] = local->dst[i][j];
368                }
369        }
370        result->packets = local->packets;
371
372        /* Send the final results to the combiner */
373        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
374
375        /* Cleanup the local storage */
376        free(local);
377}
378
379/* Starting callback for reporter thread */
380static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
381        /* Create tally structure */
382        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
383        tally->stats = malloc(sizeof(struct addr_stats));
384
385        /* Initialize the tally structure */
386        int i, j;
387        for(i=0;i<4;i++) {
388                for(j=0;j<256;j++) {
389                        tally->src[i][j] = 0;
390                        tally->dst[i][j] = 0;
391                        tally->src_lastoutput[i][j] = 0;
392                        tally->dst_lastoutput[i][j] = 0;
393                        tally->stats->src[i][j] = 0;
394                        tally->stats->dst[i][j] = 0;
395                }
396                /* Stats related varibles */
397                tally->stats->mode_src[i] = 0;
398                tally->stats->mode_dst[i] = 0;
399                tally->stats->mean_src[i] = 0;
400                tally->stats->mean_dst[i] = 0;
401                tally->stats->median_src[i] = 0;
402                tally->stats->median_dst[i] = 0;
403                tally->stats->stddev_src[i] = 0;
404                tally->stats->stddev_dst[i] = 0;
405                tally->stats->variance_src[i] = 0;
406                tally->stats->variance_dst[i] = 0;
407                tally->stats->skewness_src[i] = 0;
408                tally->stats->skewness_dst[i] = 0;
409
410                tally->stats->total_skewness_src[i] = 0;
411                tally->stats->total_skewness_dst[i] = 0;
412        }
413        tally->lastkey = 0;
414        tally->packets = 0;
415        tally->output_count = 0;
416
417        return tally;
418}
419
420static void plot_results(struct addr_local *tally, uint64_t tick) {
421
422        int i, j, k;
423
424        /* Calculations before reporting the results */
425        /* Need to initialise lastoutput values on first pass,
426         * this is so we have a base line for percentage changed */
427        if(tally->output_count == 0) {
428                for(i=0;i<4;i++) {
429                        for(j=0;j<256;j++) {
430                                tally->src_lastoutput[i][j] = tally->src[i][j];
431                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
432                        }
433                }
434         }
435        /* Compute the stats */
436        compute_stats(tally);
437
438        /* Finaly output the results */
439        printf("Generating output \"%sipdist-%lu\"\n", stats_outputdir, tick);
440
441        /* Output the results */
442        char outputfile[255];
443        snprintf(outputfile, sizeof(outputfile), "%sipdist-%lu.data", stats_outputdir, tick);
444        FILE *tmp = fopen(outputfile, "w");
445        fprintf(tmp, "#time\t\trank\toctet1\t\t\t\toctet2\t\t\t\toctet3\t\t\t\toctet4\n");
446        fprintf(tmp, "#\t\t\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\n");
447        for(i=0;i<256;i++) {
448                fprintf(tmp, "%lu\t%d", tick, i+1);
449                for(j=0;j<4;j++) {
450                        /* Get the highest ranking to lowest ranking octets */
451                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_src[j]));
452                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_src[j]));
453                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_dst[j]));
454                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_dst[j]));
455                        pop(&tally->stats->rank_src[j]);
456                        pop(&tally->stats->rank_dst[j]);
457                }
458                fprintf(tmp, "\n");
459        }
460        fclose(tmp);
461
462        char outputfile_stats[255];
463        snprintf(outputfile_stats, sizeof(outputfile_stats), "%sipdist-%lu.stats", stats_outputdir, tick);
464        tmp = fopen(outputfile_stats, "w");
465        /* append stats data to end of file */
466        fprintf(tmp, "#\tmean\tstddev\tvariance\tmedian\tmode\tskewness\n");
467        for(i=0;i<4;i++) {
468                fprintf(tmp, "src%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_src[i], tally->stats->stddev_src[i], tally->stats->variance_src[i], tally->stats->median_src[i], tally->stats->mode_src[i], tally->stats->skewness_src[i]);
469                fprintf(tmp, "dst%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_dst[i], tally->stats->stddev_dst[i], tally->stats->variance_dst[i], tally->stats->median_src[i], tally->stats->mode_dst[i], tally->stats->skewness_dst[i]);
470                fprintf(tmp, "\n\n");
471        }
472        fclose(tmp);
473
474        char outputfile_stats_timeseries[255];
475        snprintf(outputfile_stats_timeseries, sizeof(outputfile_stats_timeseries), "%sipdist-timeseries-skewness.stats", stats_outputdir);
476        if(tally->output_count == 0) {
477                tmp = fopen(outputfile_stats_timeseries, "w");
478                fprintf(tmp, "timestamp\tsrc1\t\tdst1\t\tsrc2\t\tdst2\t\tsrc3\t\tdst3\t\tsrc4\t\tdst4");
479        } else {
480                tmp = fopen(outputfile_stats_timeseries, "a");
481        }
482        fprintf(tmp, "\n%lu\t", tick);
483        for(k=0;k<4;k++) {
484                fprintf(tmp, "%f\t", tally->stats->total_skewness_src[k] / (tally->output_count+1));
485                fprintf(tmp, "%f\t", tally->stats->total_skewness_dst[k] / (tally->output_count+1));
486        }
487        fclose(tmp);
488
489        /* Puts data into timeseries files that gnuplot likes */
490        char outputfile2[255];
491        for(k=0;k<2;k++) {
492                for(j=0;j<4;j++) {
493                        /* If k is 0 we are doing src else dst */
494                        if(k) {
495                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-dst-octet%d.data", stats_outputdir, j+1);
496                        } else {
497                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-src-octet%d.data", stats_outputdir, j+1);
498                        }
499                        if(tally->output_count == 0) {
500                                tmp = fopen(outputfile2, "w");
501                                fprintf(tmp, "timestamp\t");
502                                for(i=0;i<256;i++) {
503                                        fprintf(tmp, "%d\t", i);
504                                }
505                                fprintf(tmp, "\n");
506                        } else {
507                                tmp = fopen(outputfile2, "a");
508                        }
509                        fprintf(tmp, "%lu\t", tick);
510                        for(i=0;i<256;i++) {
511                                if(k) {
512                                        fprintf(tmp, "%lu\t", tally->dst[j][i]);
513                                } else {
514                                        fprintf(tmp, "%lu\t", tally->src[j][i]);
515                                }
516                        }
517                        fprintf(tmp, "\n");
518                        fclose(tmp);
519                }
520        }
521
522        /* Plot the results */
523        for(i=0;i<4;i++) {
524                char outputplot[255];
525                snprintf(outputplot, sizeof(outputplot), "%sipdist-%lu-octet%d.png", stats_outputdir, tick, i+1);
526                /* Open pipe to gnuplot */
527                FILE *gnuplot = popen("gnuplot -persistent", "w");
528                /* send all commands to gnuplot */
529                fprintf(gnuplot, "set term pngcairo enhanced size 1280,960\n");
530                fprintf(gnuplot, "set output '%s'\n", outputplot);
531                fprintf(gnuplot, "set multiplot layout 2,1\n");
532                fprintf(gnuplot, "set title 'IP Distribution - %lu'\n", tick);
533                fprintf(gnuplot, "set xrange[0:255]\n");
534                fprintf(gnuplot, "set y2range[-1:1]\n");
535                fprintf(gnuplot, "set y2tics\n");
536                fprintf(gnuplot, "set xlabel 'Prefix'\n");
537                fprintf(gnuplot, "set ylabel 'Hits'\n");
538                fprintf(gnuplot, "set y2label 'Skewness'\n");
539                fprintf(gnuplot, "set xtics 0,10,255\n");
540                /* Setup labels that hold mean, standard deviation and variance */
541                fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 2 name 'SOURCEMEAN' nooutput\n", outputfile_stats, i);
542                //fprintf(gnuplot, "set label 1 gprintf(\"Source Mean %u\", SOURCEMEAN_min) at graph 0.02, 0.95\n");
543                fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 2 name 'DESTMEAN' nooutput\n", outputfile_stats, i);
544                //fprintf(gnuplot, "set label 2 gprintf(\"Destination Mean: %f\", DESTMEAN_min) at graph 0.02, 0.90\n");
545                //fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 3 name 'SOURCESTDDEV' nooutput\n", outputfile_stats, i);
546                //fprintf(gnuplot, "set label 3 sprintf('Source Standard Deviation: %f', SOURCESTDDEV_min) at graph 0.24, 0.95\n");
547                //fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 3 name 'DESTSTDDEV' nooutput\n", outputfile_stats, i);
548                //fprintf(gnuplot, "set label 4 sprintf('Destination Standard Deviation: %f', DESTSTDDEV_min) at graph 0.24, 0.90\n");
549                //fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 4 name 'SOURCEVAR' nooutput\n", outputfile_stats, i);
550                //fprintf(gnuplot, "set label 5 sprintf('Source Variance: %f', SOURCEVAR_min) at graph 0.55, 0.95\n");
551                //fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 4 name 'DESTVAR' nooutput\n", outputfile_stats, i);
552                //fprintf(gnuplot, "set label 6 sprintf('Destination Variance: %f', DESTVAR_min) at graph 0.55, 0.90\n");
553                fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 7 name 'SOURCESKEW' nooutput\n", outputfile_stats, i);
554                fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 7 name 'DESTSKEW' nooutput\n", outputfile_stats, i);
555                /* Plot the first graph of the multiplot */
556                fprintf(gnuplot, "set arrow from SOURCEMEAN_min, graph 0 to SOURCEMEAN_min, graph 1 nohead lt 1\n");
557                fprintf(gnuplot, "set arrow from DESTMEAN_min, graph 0 to DESTMEAN_min, graph 1 nohead lt 2\n");
558                fprintf(gnuplot, "plot '%s' using %d:%d index 0 title 'Source octet %d' smooth unique with boxes,", outputfile, (i*4)+3,(i*4)+4, i+1);
559                fprintf(gnuplot, "'%s' using %d:%d index 0 title 'Destination octet %d' smooth unique with boxes,", outputfile, (i*4)+5, (i*4)+6, i+1);
560                fprintf(gnuplot, "1/0 t 'Source mean' lt 1,");
561                fprintf(gnuplot, "1/0 t 'Destination mean' lt 2,");
562                fprintf(gnuplot, "SOURCESKEW_min title 'Source Skewness' axes x1y2,");
563                fprintf(gnuplot, "DESTSKEW_min title 'Destination Skewness' axes x1y2\n");
564                /* Unset labels for next plot */
565                fprintf(gnuplot, "unset y2tics\n");
566                fprintf(gnuplot, "unset y2label\n");
567                fprintf(gnuplot, "unset arrow\n");
568                fprintf(gnuplot, "unset label 1\nunset label 2\nunset label 3\nunset label 4\nunset label 5\nunset label 6\n");
569                fprintf(gnuplot, "set title 'Zipf Distribution'\n");
570                fprintf(gnuplot, "set xlabel 'Rank'\n");
571                fprintf(gnuplot, "set ylabel 'Frequency'\n");
572                fprintf(gnuplot, "set logscale xy 10\n");
573                fprintf(gnuplot, "set xrange[1:255]\n");
574                fprintf(gnuplot, "set xtics 0,10,255\n");
575                /* Plot the second graph of the multiplot */
576                fprintf(gnuplot, "plot '%s' using 2:%d index 0 title 'Source octet %d',", outputfile, (i*4)+4, i+1);
577                fprintf(gnuplot, "'%s' using 2:%d index 0 title 'Destination octet %d'\n", outputfile, (i*4)+6, i+1);
578                fprintf(gnuplot, "unset multiplot\n");
579                pclose(gnuplot);
580        }
581
582        /* Plot time series */
583        for(k=0;k<2;k++) {
584                for(i=0;i<4;i++) {
585                        char outputplot2[255];
586                        if(k) {
587                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-dst-octet%i.png", stats_outputdir, i+1);
588                        } else {
589                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-src-octet%i.png", stats_outputdir, i+1);
590                        }
591                        FILE *gnuplot = popen("gnuplot -persistent", "w");
592                        fprintf(gnuplot, "set term pngcairo size 1280,960 \n");
593                        fprintf(gnuplot, "set output '%s'\n", outputplot2);
594                        fprintf(gnuplot, "set multiplot layout 2,1\n");
595                        if(k) {
596                                fprintf(gnuplot, "set title 'Timeseries Dst Octet %i'\n", i+1);
597                        } else {
598                                fprintf(gnuplot, "set title 'Timeseries Src Octet %i'\n", i+1);
599                        }
600                        fprintf(gnuplot, "set xtics rotate\n");
601                        fprintf(gnuplot, "set y2tics\n");
602                        fprintf(gnuplot, "set xlabel 'Timestamp'\n");
603                        fprintf(gnuplot, "set ylabel 'Cumulative hits'\n");
604                        //fprintf(gnuplot, "set key out vert\n");
605                        fprintf(gnuplot, "set key off\n");
606                        //fprintf(gnuplot, "set xdata time\n");
607                        //fprintf(gnuplot, "set timefmt '%%s'\n");
608                        //fprintf(gnuplot, "set format x '%%m/%%d/%%Y %%H:%%M:%%S'\n");
609                        fprintf(gnuplot, "set autoscale xy\n");
610                        if(k) {
611                                fprintf(gnuplot, "plot '%sipdist-timeseries-dst-octet%d.data' using 2:xtic(1) with lines title columnheader(2) smooth cumulative, for[i=3:257] '' using i with lines title columnheader(i) smooth cumulative\n", stats_outputdir, i+1);
612                        } else {
613                                fprintf(gnuplot, "plot '%sipdist-timeseries-src-octet%d.data' using 2:xtic(1) with lines title columnheader(2) smooth cumulative, for[i=3:257] '' using i with lines title columnheader(i) smooth cumulative\n", stats_outputdir, i+1);
614                        }
615                        /* Draw the mean skewness line */
616                        fprintf(gnuplot, "set title 'Timeseries mean skewness\n");
617                        fprintf(gnuplot, "set yrange[-1:1]\n");
618                        fprintf(gnuplot, "set xlabel 'Timestamp'\n");
619                        fprintf(gnuplot, "set ylabel 'Skewness'\n");
620                        fprintf(gnuplot, "plot '%sipdist-timeseries-skewness.stats' using %d:xtic(1) with lines title columnheader(%d)\n", stats_outputdir, (i*2)+2+k);
621                        fprintf(gnuplot, "unset multiplot");
622                        pclose(gnuplot);
623                }
624        }
625}
626
627
628/* Callback when a result is given to the reporter thread */
629static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
630        void *tls, libtrace_result_t *result) {
631
632        struct addr_local *results;
633        struct addr_local *tally;
634        uint64_t key;
635
636        /* We only want to handle results containing our user-defined structure  */
637        if(result->type != RESULT_USER) {
638                return;
639        }
640
641        /* This key is the key that was passed into trace_publish_results
642         * this will contain the erf timestamp for the packet */
643        key = result->key;
644
645        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
646        results = (struct addr_local *)result->value.ptr;
647
648        /* Grab our tally out of thread local storage */
649        tally = (struct addr_local *)tls;
650
651        /* Add all the results to the tally */
652        int i, j;
653        for(i=0;i<4;i++) {
654                for(j=0;j<256;j++) {
655                        tally->src[i][j] += results->src[i][j];
656                        tally->dst[i][j] += results->dst[i][j];
657                }
658        }
659        tally->packets += results->packets;
660
661        /* If the current timestamp is greater than the last printed plus the interval, output a result */
662        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
663
664                /* update last key */
665                tally->lastkey = key;
666
667                /* Plot the result with the key in epoch seconds*/
668                plot_results(tally, key >> 32);
669
670                /* increment the output counter */
671                tally->output_count++;
672
673                /* clear the tally but copy old values over first*/
674                for(i=0;i<4;i++) {
675                        for(j=0;j<256;j++) {
676                                tally->src_lastoutput[i][j] = tally->src[i][j];
677                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
678                                tally->src[i][j] = 0;
679                                tally->dst[i][j] = 0;
680                        }
681                        /* Clear all the stats data */
682                        tally->stats->mode_src[i] = 0;
683                        tally->stats->mode_dst[i] = 0;
684                        tally->stats->mean_src[i] = 0;
685                        tally->stats->mean_dst[i] = 0;
686                        tally->stats->median_src[i] = 0;
687                        tally->stats->median_dst[i] = 0;
688                        tally->stats->stddev_src[i] = 0;
689                        tally->stats->stddev_dst[i] = 0;
690                        tally->stats->variance_src[i] = 0;
691                        tally->stats->variance_dst[i] = 0;
692                        tally->stats->skewness_src[i] = 0;
693                        tally->stats->skewness_dst[i] = 0;
694                }
695                /* free the priority queue */
696                for(i=0;i<4;i++) {
697                        free(tally->stats->rank_src[i]);
698                        free(tally->stats->rank_dst[i]);
699                }
700
701                tally->packets = 0;
702
703        }
704
705        /* Cleanup the thread results */
706        free(results);
707}
708
709/* Callback when the reporter thread stops (essentially when the program ends) */
710static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
711
712        /* Get the tally from the thread local storage */
713        struct addr_local *tally = (struct addr_local *)tls;
714
715        /* If there is any remaining data in the tally plot it */
716        if(tally->packets > 0) {
717                /* Then plot the results */
718                plot_results(tally, (tally->lastkey >> 32) + 1);
719        }
720        /* Cleanup tally results*/
721        free(tally);
722}
723
724static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
725        libtrace_callback_set_t *reporting) {
726        /* Only destroy if the structure exists */
727        if(trace) {
728                trace_destroy(trace);
729        }
730        if(processing) {
731                trace_destroy_callback_set(processing);
732        }
733        if(reporting) {
734                trace_destroy_callback_set(reporting);
735        }
736}
737
738/* Converts a string representation eg 1.2.3.4/24 into a network structure */
739static int get_network(char *network_string, struct network *network) {
740
741        char delim[] = "/";
742        /* Split the address and mask portion of the string */
743        char *address = strtok(network_string, delim);
744        char *mask = strtok(NULL, delim);
745
746        /* Check the subnet mask is valid */
747        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
748                return 1;
749        }
750        /* right shift so netmask is in network byte order */
751        network->mask = 0xffffffff << (32 - atoi(mask));
752
753        struct in_addr addr;
754        /* Convert address string into uint32_t and check its valid */
755        if(inet_aton(address, &addr) == 0) {
756                return 2;
757        }
758        /* Ensure its saved in network byte order */
759        network->address = htonl(addr.s_addr);
760
761        /* Calculate the network address */
762        network->network = network->address & network->mask;
763
764        return 0;
765}
766
767static void usage(char *argv0) {
768        fprintf(stderr, "Usage:\n"
769        "%s inputURI output-interval\n"
770        "-i [inputURI] --set-uri [inputURI]\n"
771        "-o [output-interval] --output-interval [output-interval]\n"
772        "       Output statistical information every x seconds\n"
773        "-t [threads] --threads [threads]\n"
774        "-e [excluded-network] --exclude-network [excluded-network]\n"
775        "       Network to exclude from results\n"
776        "       e.g. -e 192.168.0.0/16 -e 10.0.0.0/8\n"
777        "-d [output-directory] --output-dir [output-directory]\n"
778        , argv0);
779        exit(1);
780}
781
782int main(int argc, char *argv[]) {
783
784        char *inputURI = NULL;
785        int threads = 4;
786        tickrate = 300;
787        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
788        exclude->count = 0;
789
790        while(1) {
791                int option_index;
792                struct option long_options[] = {
793                        { "set-uri",            1, 0, 'i' },
794                        { "output-interval",    1, 0, 'o' },
795                        { "threads",            1, 0, 't' },
796                        { "exclude-network",    1, 0, 'e' },
797                        { "output-dir",         1, 0, 'd' },
798                        { NULL,                 0, 0,  0  }
799                };
800
801                int c = getopt_long(argc, argv, "i:o:t:e:d:", long_options, &option_index);
802
803                if(c==-1) {
804                        break;
805                }
806
807                switch(c) {
808                        case 'i':
809                                inputURI = optarg;
810                                break;
811                        case 'o':
812                                tickrate = atoi(optarg);
813                                break;
814                        case 't':
815                                threads = atoi(optarg);
816                                break;
817                        case 'e':
818                                exclude->count += 1;
819                                if(exclude->count > 1) {
820                                        exclude->networks = realloc(exclude->networks, sizeof(struct network)*exclude->count);
821                                } else {
822                                        exclude->networks = malloc(sizeof(struct network));
823                                }
824                                if(get_network(optarg, &exclude->networks[exclude->count-1])) {
825                                        fprintf(stderr, "Error excluding network %s\n", optarg);
826                                        return 1;
827                                }
828                                break;
829                        case 'd':
830                                stats_outputdir = optarg;
831                                strcat(stats_outputdir, "/");
832                        case '?':
833                                break;
834                        default:
835                                fprintf(stderr, "Unknown option: %c\n", c);
836                                usage(argv[0]);
837                }
838        }
839
840        libtrace_t *trace;
841        /* Callbacks for processing and reporting threads */
842        libtrace_callback_set_t *processing, *reporter;
843
844        /* Ensure the input URI was supplied */
845        if(inputURI == NULL) {
846                usage(argv[0]);
847        }
848
849        /* Create the trace */
850        trace = trace_create(inputURI);
851        /* Ensure no error has occured creating the trace */
852        if(trace_is_err(trace)) {
853                trace_perror(trace, "Creating trace");
854                return 1;
855        }
856
857        /* Setup the processing threads */
858        processing = trace_create_callback_set();
859        trace_set_starting_cb(processing, start_callback);
860        trace_set_packet_cb(processing, per_packet);
861        trace_set_stopping_cb(processing, stop_processing);
862        trace_set_tick_interval_cb(processing, per_tick);
863        /* Setup the reporter threads */
864        reporter = trace_create_callback_set();
865        trace_set_starting_cb(reporter, start_reporter);
866        trace_set_result_cb(reporter, per_result);
867        trace_set_stopping_cb(reporter, stop_reporter);
868
869        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
870        trace_set_perpkt_threads(trace, threads);
871        /* Order the results by timestamp */
872        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
873        /* Try to balance the load across all processing threads */
874        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
875
876        /* Set the tick interval only if this is a live capture */
877        if(trace_get_information(trace)->live) {
878                /* tickrate is in seconds but tick_interval expects milliseconds */
879                trace_set_tick_interval(trace, tickrate*1000);
880        }
881        /* Do not buffer the reports */
882        trace_set_reporter_thold(trace, 1);
883
884        /* Start the trace, if it errors return */
885        if(trace_pstart(trace, exclude, processing, reporter)) {
886                trace_perror(trace, "Starting parallel trace");
887                libtrace_cleanup(trace, processing, reporter);
888                return 1;
889        }
890
891        /* This will wait for all threads to complete */
892        trace_join(trace);
893
894        /* Clean up everything */
895        free(exclude->networks);
896        free(exclude);
897        libtrace_cleanup(trace, processing, reporter);
898
899        return 0;
900}
Note: See TracBrowser for help on using the repository browser.