source: examples/tutorial/ipdist-parallel.c @ 36fb135

develop
Last change on this file since 36fb135 was 36fb135, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Added more skewness plots

  • Property mode set to 100644
File size: 31.7 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9#include <math.h>
10
11/* Structure to hold the counters each thread has its own one of these */
12struct addr_local {
13        /* Holds the counts of each number occurance per octet, These are cleared after every output. */
14        uint64_t src[4][256];
15        uint64_t dst[4][256];
16        /* Holds the results from the previous output */
17        uint64_t src_lastoutput[4][256];
18        uint64_t dst_lastoutput[4][256];
19        /* Holds the timestamp */
20        uint64_t lastkey;
21        /* Is the count of the number of packets processed, This is cleared after every output. */
22        uint64_t packets;
23        /* Total number an output has been generated */
24        uint64_t output_count;
25        /* Pointer to stats structure */
26        struct addr_stats *stats;
27};
28struct addr_stats {
29        /* Holds the percentage change compared to the previous output */
30        float src[4][256];
31        float dst[4][256];
32        /* Stats calculated independently per output */
33        double mode_src[4];
34        double mode_dst[4];
35        double mean_src[4];
36        double mean_dst[4];
37        double median_src[4];
38        double median_dst[4];
39        double stddev_src[4];
40        double stddev_dst[4];
41        double variance_src[4];
42        double variance_dst[4];
43        double skewness_src[4];
44        double skewness_dst[4];
45        /* Stats calculated over entire trace */
46        double total_skewness_src[4];
47        double total_skewness_dst[4];
48
49        struct addr_rank *rank_src[4];
50        struct addr_rank *rank_dst[4];
51};
52struct addr_rank {
53        uint8_t addr;
54        /* count is the priority */
55        uint64_t count;
56        /* pointer to next ranking item */
57        struct addr_rank* next;
58};
59
60/* Structure to hold excluded networks */
61struct exclude_networks {
62        int count;
63        struct network *networks;
64};
65struct network {
66        uint32_t address;
67        uint32_t mask;
68        uint32_t network;
69};
70
71/* interval between outputs in seconds */
72uint64_t tickrate;
73
74char *stats_outputdir = "/home/jcv9/output-spectre/";
75/* Calculate and plot the percentage change from the previous plot */
76int stats_percentage_change = 0;
77
78/*************************************************************************
79Priority queue linked list */
80
81static struct addr_rank *rank_new(uint8_t addr, uint64_t count) {
82        struct addr_rank *tmp = malloc(sizeof(struct addr_rank));
83        tmp->addr = addr;
84        tmp->count = count;
85        tmp->next = NULL;
86
87        return tmp;
88}
89static uint8_t peak(struct addr_rank **head) {
90        return (*head)->addr;
91}
92static uint64_t peak_count(struct addr_rank **head) {
93        return (*head)->count;
94}
95static void pop(struct addr_rank **head) {
96        struct addr_rank* tmp = *head;
97        (*head) = (*head)->next;
98        free(tmp);
99}
100static void push(struct addr_rank **head, uint8_t addr, uint64_t count) {
101        struct addr_rank *curr = (*head);
102        struct addr_rank *tmp = rank_new(addr, count);
103
104        /* Check if the new node has a greater priority than the head */
105        if((*head)->count < count) {
106                tmp->next = *head;
107                (*head) = tmp;
108        } else {
109                /* Jump through the list until we find the correct position */
110                while (curr->next != NULL && curr->next->count > count) {
111                        curr = curr->next;
112                }
113
114                tmp->next = curr->next;
115                curr->next = tmp;
116        }
117}
118/*************************************************************************/
119
120
121static void compute_stats(struct addr_local *tally) {
122        int i, j, k;
123
124        /* Calculates the percentage change from the last output. NEED TO MAKE THIS WEIGHTED */
125        if(stats_percentage_change) {
126                for(i=0;i<256;i++) {
127                        for(j=0;j<4;j++) {
128                                tally->stats->src[j][i] = 0;
129                                tally->stats->dst[j][i] = 0;
130                                if(tally->src[j][i] != 0) {
131                                        tally->stats->src[j][i] = (((float)tally->src[j][i] - (float)tally->src_lastoutput[j][i]) / (float)tally->src[j][i]) * 100;
132                                }
133                                if(tally->dst[j][i] != 0) {
134                                        tally->stats->dst[j][i] = (((float)tally->dst[j][i] - (float)tally->dst_lastoutput[j][i]) / (float)tally->dst[j][i]) * 100;
135                                }
136                        }
137                }
138        }
139
140        /* To get ranking we push everything into the priority queue at pop things off 1 by one which returns them in high to lowest priority */
141        for(i=0;i<4;i++) {
142                tally->stats->rank_src[i] = rank_new(0, tally->src[i][0]);
143                tally->stats->rank_dst[i] = rank_new(0, tally->dst[i][0]);
144                for(j=1;j<256;j++) {
145                        /* Push everything into the priority queue
146                         * each item will be popped off in the correct order */
147                        push(&tally->stats->rank_src[i], j, tally->src[i][j]);
148                        push(&tally->stats->rank_dst[i], j, tally->dst[i][j]);
149                }
150        }
151
152        /* Calculate mean, variance and standard deviation */
153        for(k=0;k<4;k++) {
154
155                double ex = 0;
156                double ex2 = 0;
157                double n = 0;
158                double m = 0;
159                for(i=0;i<256;i++) {
160                        for(j=0;j<tally->src[k][i];j++) {
161                                if(n == 0) {
162                                        m = i;
163                                }
164                                n += 1;
165                                ex += (i - m);
166                                ex2 += ((i - m) * (i - m));
167                        }
168                }
169
170                tally->stats->mean_src[k] = (k + (ex / n));
171                tally->stats->variance_src[k] = ((ex2 - (ex*ex)/n) / n);
172                tally->stats->stddev_src[k] = sqrt(tally->stats->variance_src[k]);
173
174                ex = 0;
175                ex2 = 0;
176                n = 0;
177                m = 0;
178                for(i=0;i<256;i++) {
179                        for(j=0;j<tally->dst[k][i];j++) {
180                                if(n == 0) {
181                                        m = i;
182                                }
183                                n += 1;
184                                ex += (i - m);
185                                ex2 += ((i - m) * (i - m));
186                        }
187                }
188                tally->stats->mean_dst[k] = (k + (ex / n));
189                tally->stats->variance_dst[k] = ((ex2 - (ex*ex)/n) / n);
190                tally->stats->stddev_dst[k] = sqrt(tally->stats->variance_dst[k]);
191                /* Get the median */
192                int c = (n/2) - tally->src[k][0];
193                int c2 = 0;
194                while(c > 0) {
195                        c2 += 1;
196                        c -= tally->src[k][c2];
197                }
198                tally->stats->median_src[k] = c2;
199                c = (n/2) - tally->dst[k][0];
200                c2 = 0;
201                while(c > 0) {
202                        c2 += 1;
203                        c -= tally->dst[k][c2];
204                }
205                tally->stats->median_dst[k] = c2;
206                /* Get the mode which is the first item in the priority queue */
207                tally->stats->mode_src[k] = peak(&tally->stats->rank_src[k]);
208                tally->stats->mode_dst[k] = peak(&tally->stats->rank_src[k]);
209                /* Calculate skewness */
210                tally->stats->skewness_src[k] = (tally->stats->mean_src[k] - tally->stats->median_src[k]) / tally->stats->stddev_src[k];
211                tally->stats->skewness_dst[k] = (tally->stats->mean_dst[k] - tally->stats->median_dst[k]) / tally->stats->stddev_dst[k];
212
213                /* Increment total skew */
214                tally->stats->total_skewness_src[k] += tally->stats->skewness_src[k];
215                tally->stats->total_skewness_dst[k] += tally->stats->skewness_dst[k];
216        }
217
218}
219
220static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
221
222        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
223        /* Proccessing thread local storage */
224        struct addr_local *local = (struct addr_local *)tls;
225
226        /* Populate the result structure from the threads local storage and clear threads local storage*/
227        int i, j;
228        for(i=0;i<4;i++) {
229                for(j=0;j<256;j++) {
230                        result->src[i][j] = local->src[i][j];
231                        result->dst[i][j] = local->dst[i][j];
232                        /* clear local storage */
233                        local->src[i][j] = 0;
234                        local->dst[i][j] = 0;
235                }
236        }
237        result->packets = local->packets;
238        local->packets = 0;
239
240        /* Push result to the combiner */
241        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
242}
243
244/* Start callback function - This is run for each thread when it starts */
245static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
246
247        /* Create and initialize the local counter struct */
248        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
249        int i, j;
250        for(i=0;i<4;i++) {
251                for(j=0;j<256;j++) {
252                        local->src[i][j] = 0;
253                        local->dst[i][j] = 0;
254                }
255        }
256        local->lastkey = 0;
257        local->packets = 0;
258
259        /* return the local storage so it is available for all other callbacks for the thread*/
260        return local;
261}
262
263/* Checks if address is part of a excluded subnet. */
264static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
265
266        int i;
267        for(i=0;i<exclude->count;i++) {
268                /* Convert address into a network address */
269                uint32_t net_addr = address & exclude->networks[i].mask;
270
271                /* If this matches the network address from the excluded list we need to exclude this
272                   address. */
273                if(net_addr == exclude->networks[i].network) {
274                        return 1;
275                }
276        }
277
278        /* If we got this far the address should not be excluded */
279        return 0;
280}
281
282static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
283
284        /* Checks if the ip is of type IPv4 */
285        if (ip->sa_family == AF_INET) {
286
287                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
288                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
289                /* Get in_addr from sockaddr */
290                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
291                /* Ensure the address is in network byte order */
292                uint32_t address = htonl(ip4.s_addr);
293
294                /* Check if the address is part of an excluded network. */
295                if(network_excluded(address, exclude) == 0) {
296
297                        /* Split the IPv4 address into each octet */
298                        uint8_t octet[4];
299                        octet[0] = (address & 0xff000000) >> 24;
300                        octet[1] = (address & 0x00ff0000) >> 16;
301                        octet[2] = (address & 0x0000ff00) >> 8;
302                        octet[3] = (address & 0x000000ff);
303
304                        /* check if the supplied address was a source or destination,
305                           increment the correct one */
306                        if(srcaddr) {
307                                int i;
308                                for(i=0;i<4;i++) {
309                                        local->src[i][octet[i]] += 1;
310                                }
311                        } else {
312                                int i;
313                                for(i=0;i<4;i++) {
314                                        local->dst[i][octet[i]] += 1;
315                                }
316                        }
317                }
318        }
319}
320
321/* Per packet callback function run by each thread */
322static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
323        libtrace_packet_t *packet) {
324
325        /* Regain access to the address counter structure */
326        struct addr_local *local = (struct addr_local *)tls;
327
328        /* If this is the first packet set the lastkey to the packets timestamp */
329        if(local->lastkey == 0) {
330                local->lastkey = trace_get_erf_timestamp(packet);
331        }
332
333        /* Increment the packet count */
334        local->packets += 1;
335
336        /* Regain access to excluded networks pointer */
337        struct exclude_networks *exclude = (struct exclude_networks *)global;
338
339        struct sockaddr_storage addr;
340        struct sockaddr *ip;
341
342        /* Get the source IP address */
343        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
344        /* If a source ip address was found */
345        if(ip != NULL) {
346                process_ip(ip, local, exclude, 1);
347        }
348
349        /* Get the destination IP address */
350        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
351        /* If a destination ip address was found */
352        if(ip != NULL) {
353                process_ip(ip, local, exclude, 0);
354        }
355
356        /* If this trace is not live we will manually call "per tick" */
357        if(!trace_get_information(trace)->live) {
358                /* get the current packets timestamp */
359                uint64_t timestamp = trace_get_erf_timestamp(packet);
360
361                /* We only want to call per_tick if we are due to output something
362                 * Right shifting these converts them to seconds, tickrate is in seconds */
363                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
364                        per_tick(trace, thread, global, local, timestamp);
365                        local->lastkey = timestamp;
366                }
367        }
368
369        /* Return the packet to libtrace */
370        return packet;
371}
372
373/* Stopping callback function - When a thread closes */
374static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
375
376        /* cast the local storage structure */
377        struct addr_local *local = (struct addr_local *)tls;
378        /* Create structure to store the result */
379        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
380
381        /* Populate the result */
382        int i, j;
383        for(i=0;i<4;i++) {
384                for(j=0;j<256;j++) {
385                        result->src[i][j] = local->src[i][j];
386                        result->dst[i][j] = local->dst[i][j];
387                }
388        }
389        result->packets = local->packets;
390
391        /* Send the final results to the combiner */
392        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
393
394        /* Cleanup the local storage */
395        free(local);
396}
397
398/* Starting callback for reporter thread */
399static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
400        /* Create tally structure */
401        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
402        tally->stats = malloc(sizeof(struct addr_stats));
403
404        /* Initialize the tally structure */
405        int i, j;
406        for(i=0;i<4;i++) {
407                for(j=0;j<256;j++) {
408                        tally->src[i][j] = 0;
409                        tally->dst[i][j] = 0;
410                        tally->src_lastoutput[i][j] = 0;
411                        tally->dst_lastoutput[i][j] = 0;
412                        tally->stats->src[i][j] = 0;
413                        tally->stats->dst[i][j] = 0;
414                }
415                /* Stats related varibles */
416                tally->stats->mode_src[i] = 0;
417                tally->stats->mode_dst[i] = 0;
418                tally->stats->mean_src[i] = 0;
419                tally->stats->mean_dst[i] = 0;
420                tally->stats->median_src[i] = 0;
421                tally->stats->median_dst[i] = 0;
422                tally->stats->stddev_src[i] = 0;
423                tally->stats->stddev_dst[i] = 0;
424                tally->stats->variance_src[i] = 0;
425                tally->stats->variance_dst[i] = 0;
426                tally->stats->skewness_src[i] = 0;
427                tally->stats->skewness_dst[i] = 0;
428
429                tally->stats->total_skewness_src[i] = 0;
430                tally->stats->total_skewness_dst[i] = 0;
431        }
432        tally->lastkey = 0;
433        tally->packets = 0;
434        tally->output_count = 0;
435
436        return tally;
437}
438
439static void plot_results(struct addr_local *tally, uint64_t tick) {
440
441        int i, j, k;
442
443        /* Calculations before reporting the results */
444        /* Need to initialise lastoutput values on first pass,
445         * this is so we have a base line for percentage changed */
446        if(tally->output_count == 0) {
447                for(i=0;i<4;i++) {
448                        for(j=0;j<256;j++) {
449                                tally->src_lastoutput[i][j] = tally->src[i][j];
450                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
451                        }
452                }
453         }
454        /* Compute the stats */
455        compute_stats(tally);
456
457        /* Finaly output the results */
458        printf("Generating output \"%sipdist-%lu\"\n", stats_outputdir, tick);
459
460        /* Output the results */
461        char outputfile[255];
462        snprintf(outputfile, sizeof(outputfile), "%sipdist-%lu.data", stats_outputdir, tick);
463        FILE *tmp = fopen(outputfile, "w");
464        fprintf(tmp, "#time\t\trank\toctet1\t\t\t\toctet2\t\t\t\toctet3\t\t\t\toctet4\n");
465        fprintf(tmp, "#\t\t\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\n");
466        for(i=0;i<256;i++) {
467                fprintf(tmp, "%lu\t%d", tick, i+1);
468                for(j=0;j<4;j++) {
469                        /* Get the highest ranking to lowest ranking octets */
470                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_src[j]));
471                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_src[j]));
472                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_dst[j]));
473                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_dst[j]));
474                        pop(&tally->stats->rank_src[j]);
475                        pop(&tally->stats->rank_dst[j]);
476                }
477                fprintf(tmp, "\n");
478        }
479        fclose(tmp);
480
481        char outputfile_stats[255];
482        snprintf(outputfile_stats, sizeof(outputfile_stats), "%sipdist-%lu.stats", stats_outputdir, tick);
483        tmp = fopen(outputfile_stats, "w");
484        /* append stats data to end of file */
485        fprintf(tmp, "#\tmean\tstddev\tvariance\tmedian\tmode\tskewness\n");
486        for(i=0;i<4;i++) {
487                fprintf(tmp, "src%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_src[i], tally->stats->stddev_src[i], tally->stats->variance_src[i], tally->stats->median_src[i], tally->stats->mode_src[i], tally->stats->skewness_src[i]);
488                fprintf(tmp, "dst%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_dst[i], tally->stats->stddev_dst[i], tally->stats->variance_dst[i], tally->stats->median_src[i], tally->stats->mode_dst[i], tally->stats->skewness_dst[i]);
489                fprintf(tmp, "\n\n");
490        }
491        fclose(tmp);
492
493        char outputfile_stats_timeseries[255];
494        snprintf(outputfile_stats_timeseries, sizeof(outputfile_stats_timeseries), "%sipdist-timeseries-skewness.stats", stats_outputdir);
495        if(tally->output_count == 0) {
496                tmp = fopen(outputfile_stats_timeseries, "w");
497                fprintf(tmp, "timestamp\tsrc1\t\tdst1\t\tsrc2\t\tdst2\t\tsrc3\t\tdst3\t\tsrc4\t\tdst4");
498        } else {
499                tmp = fopen(outputfile_stats_timeseries, "a");
500        }
501        fprintf(tmp, "\n%lu\t", tick);
502        for(k=0;k<4;k++) {
503                fprintf(tmp, "%f\t", tally->stats->total_skewness_src[k] / (tally->output_count+1));
504                fprintf(tmp, "%f\t", tally->stats->total_skewness_dst[k] / (tally->output_count+1));
505        }
506        fclose(tmp);
507
508        /* Puts data into timeseries files that gnuplot likes */
509        char outputfile2[255];
510        for(k=0;k<2;k++) {
511                for(j=0;j<4;j++) {
512                        /* If k is 0 we are doing src else dst */
513                        if(k) {
514                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-dst-octet%d.data", stats_outputdir, j+1);
515                        } else {
516                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-src-octet%d.data", stats_outputdir, j+1);
517                        }
518                        if(tally->output_count == 0) {
519                                tmp = fopen(outputfile2, "w");
520                                fprintf(tmp, "timestamp\t");
521                                for(i=0;i<256;i++) {
522                                        fprintf(tmp, "%d\t", i);
523                                }
524                                fprintf(tmp, "\n");
525                        } else {
526                                tmp = fopen(outputfile2, "a");
527                        }
528                        fprintf(tmp, "%lu\t", tick);
529                        for(i=0;i<256;i++) {
530                                if(k) {
531                                        fprintf(tmp, "%lu\t", tally->dst[j][i]);
532                                } else {
533                                        fprintf(tmp, "%lu\t", tally->src[j][i]);
534                                }
535                        }
536                        fprintf(tmp, "\n");
537                        fclose(tmp);
538                }
539        }
540
541
542        /* Get the version of gnuplot */
543        //char delim[] = " ";
544        //char gnuplot_result[256];
545        //double gnuplot_version = 0;
546        //FILE *gnuplot = popen("gnuplot --version", "r");
547        //fgets(gnuplot_result, sizeof(gnuplot_result)-1, gnuplot);
548        //strtok(gnuplot_result, delim);
549        //gnuplot_version = atof(strtok(NULL, delim));
550        //pclose(gnuplot);
551
552        /* Plot the results */
553        for(i=0;i<4;i++) {
554                char outputplot[255];
555                snprintf(outputplot, sizeof(outputplot), "%sipdist-%lu-octet%d.png", stats_outputdir, tick, i+1);
556                /* Open pipe to gnuplot */
557                FILE *gnuplot = popen("gnuplot -persistent", "w");
558                /* send all commands to gnuplot */
559                fprintf(gnuplot, "set term pngcairo enhanced size 1280,960\n");
560                fprintf(gnuplot, "set output '%s'\n", outputplot);
561                fprintf(gnuplot, "set multiplot layout 2,1\n");
562                fprintf(gnuplot, "set title 'IP Distribution - %lu'\n", tick);
563                fprintf(gnuplot, "set xrange[0:255]\n");
564                fprintf(gnuplot, "set y2range[-1:1]\n");
565                fprintf(gnuplot, "set y2tics\n");
566                fprintf(gnuplot, "set xlabel 'Prefix'\n");
567                fprintf(gnuplot, "set ylabel 'Hits'\n");
568                fprintf(gnuplot, "set y2label 'Skewness'\n");
569                fprintf(gnuplot, "set xtics 0,10,255\n");
570                /* Setup labels that hold mean, standard deviation and variance */
571                fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 2 name 'SOURCEMEAN' nooutput\n", outputfile_stats, i);
572                //fprintf(gnuplot, "set label 1 gprintf(\"Source Mean %u\", SOURCEMEAN_min) at graph 0.02, 0.95\n");
573                fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 2 name 'DESTMEAN' nooutput\n", outputfile_stats, i);
574                //fprintf(gnuplot, "set label 2 gprintf(\"Destination Mean: %f\", DESTMEAN_min) at graph 0.02, 0.90\n");
575                //fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 3 name 'SOURCESTDDEV' nooutput\n", outputfile_stats, i);
576                //fprintf(gnuplot, "set label 3 sprintf('Source Standard Deviation: %f', SOURCESTDDEV_min) at graph 0.24, 0.95\n");
577                //fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 3 name 'DESTSTDDEV' nooutput\n", outputfile_stats, i);
578                //fprintf(gnuplot, "set label 4 sprintf('Destination Standard Deviation: %f', DESTSTDDEV_min) at graph 0.24, 0.90\n");
579                //fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 4 name 'SOURCEVAR' nooutput\n", outputfile_stats, i);
580                //fprintf(gnuplot, "set label 5 sprintf('Source Variance: %f', SOURCEVAR_min) at graph 0.55, 0.95\n");
581                //fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 4 name 'DESTVAR' nooutput\n", outputfile_stats, i);
582                //fprintf(gnuplot, "set label 6 sprintf('Destination Variance: %f', DESTVAR_min) at graph 0.55, 0.90\n");
583                fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 7 name 'SOURCESKEW' nooutput\n", outputfile_stats, i);
584                fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 7 name 'DESTSKEW' nooutput\n", outputfile_stats, i);
585                /* Plot the first graph of the multiplot */
586                fprintf(gnuplot, "set arrow from SOURCEMEAN_min, graph 0 to SOURCEMEAN_min, graph 1 nohead lt 1\n");
587                fprintf(gnuplot, "set arrow from DESTMEAN_min, graph 0 to DESTMEAN_min, graph 1 nohead lt 2\n");
588                fprintf(gnuplot, "plot '%s' using %d:%d index 0 title 'Source octet %d' smooth unique with boxes,", outputfile, (i*4)+3,(i*4)+4, i+1);
589                fprintf(gnuplot, "'%s' using %d:%d index 0 title 'Destination octet %d' smooth unique with boxes,", outputfile, (i*4)+5, (i*4)+6, i+1);
590                fprintf(gnuplot, "1/0 t 'Source mean' lt 1,");
591                fprintf(gnuplot, "1/0 t 'Destination mean' lt 2,");
592                fprintf(gnuplot, "SOURCESKEW_min title 'Source Skewness' axes x1y2,");
593                fprintf(gnuplot, "DESTSKEW_min title 'Destination Skewness' axes x1y2\n");
594                /* Unset labels for next plot */
595                fprintf(gnuplot, "unset y2tics\n");
596                fprintf(gnuplot, "unset y2label\n");
597                fprintf(gnuplot, "unset arrow\n");
598                fprintf(gnuplot, "unset label 1\nunset label 2\nunset label 3\nunset label 4\nunset label 5\nunset label 6\n");
599                fprintf(gnuplot, "set title 'Zipf Distribution'\n");
600                fprintf(gnuplot, "set xlabel 'Rank'\n");
601                fprintf(gnuplot, "set ylabel 'Frequency'\n");
602                fprintf(gnuplot, "set logscale xy 10\n");
603                fprintf(gnuplot, "set xrange[1:255]\n");
604                fprintf(gnuplot, "set xtics 0,10,255\n");
605                /* Plot the second graph of the multiplot */
606                fprintf(gnuplot, "plot '%s' using 2:%d index 0 title 'Source octet %d',", outputfile, (i*4)+4, i+1);
607                fprintf(gnuplot, "'%s' using 2:%d index 0 title 'Destination octet %d'\n", outputfile, (i*4)+6, i+1);
608                fprintf(gnuplot, "unset multiplot\n");
609                pclose(gnuplot);
610        }
611
612        /* Plot time series */
613        for(k=0;k<2;k++) {
614                for(i=0;i<4;i++) {
615                        char outputplot2[255];
616                        if(k) {
617                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-dst-octet%i.png", stats_outputdir, i+1);
618                        } else {
619                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-src-octet%i.png", stats_outputdir, i+1);
620                        }
621                        FILE *gnuplot = popen("gnuplot -persistent", "w");
622                        fprintf(gnuplot, "set term pngcairo size 1280,960 \n");
623                        fprintf(gnuplot, "set output '%s'\n", outputplot2);
624                        fprintf(gnuplot, "set multiplot layout 2,1\n");
625                        if(k) {
626                                fprintf(gnuplot, "set title 'Timeseries Dst Octet %i'\n", i+1);
627                        } else {
628                                fprintf(gnuplot, "set title 'Timeseries Src Octet %i'\n", i+1);
629                        }
630                        fprintf(gnuplot, "set xtics rotate\n");
631                        fprintf(gnuplot, "set y2tics\n");
632                        fprintf(gnuplot, "set xlabel 'Timestamp'\n");
633                        fprintf(gnuplot, "set ylabel 'Cumulative hits'\n");
634                        //fprintf(gnuplot, "set key out vert\n");
635                        fprintf(gnuplot, "set key off\n");
636                        //fprintf(gnuplot, "set xdata time\n");
637                        //fprintf(gnuplot, "set timefmt '%%s'\n");
638                        //fprintf(gnuplot, "set format x '%%m/%%d/%%Y %%H:%%M:%%S'\n");
639                        fprintf(gnuplot, "set autoscale xy\n");
640                        if(k) {
641                                fprintf(gnuplot, "plot '%sipdist-timeseries-dst-octet%d.data' using 2:xtic(1) with lines title columnheader(2) smooth cumulative, for[i=3:257] '' using i with lines title columnheader(i) smooth cumulative\n", stats_outputdir, i+1);
642                        } else {
643                                fprintf(gnuplot, "plot '%sipdist-timeseries-src-octet%d.data' using 2:xtic(1) with lines title columnheader(2) smooth cumulative, for[i=3:257] '' using i with lines title columnheader(i) smooth cumulative\n", stats_outputdir, i+1);
644                        }
645                        /* Draw the mean skewness line */
646                        fprintf(gnuplot, "set title 'Timeseries mean skewness\n");
647                        fprintf(gnuplot, "set yrange[-1:1]\n");
648                        fprintf(gnuplot, "set xlabel 'Timestamp'\n");
649                        fprintf(gnuplot, "set ylabel 'Skewness'\n");
650                        fprintf(gnuplot, "plot '%sipdist-timeseries-skewness.stats' using %d:xtic(1) with lines title columnheader(%d)\n", stats_outputdir, (i*2)+2+k);
651                        fprintf(gnuplot, "unset multiplot");
652                        pclose(gnuplot);
653                }
654        }
655}
656
657
658/* Callback when a result is given to the reporter thread */
659static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
660        void *tls, libtrace_result_t *result) {
661
662        struct addr_local *results;
663        struct addr_local *tally;
664        uint64_t key;
665
666        /* We only want to handle results containing our user-defined structure  */
667        if(result->type != RESULT_USER) {
668                return;
669        }
670
671        /* This key is the key that was passed into trace_publish_results
672         * this will contain the erf timestamp for the packet */
673        key = result->key;
674
675        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
676        results = (struct addr_local *)result->value.ptr;
677
678        /* Grab our tally out of thread local storage */
679        tally = (struct addr_local *)tls;
680
681        /* Add all the results to the tally */
682        int i, j;
683        for(i=0;i<4;i++) {
684                for(j=0;j<256;j++) {
685                        tally->src[i][j] += results->src[i][j];
686                        tally->dst[i][j] += results->dst[i][j];
687                }
688        }
689        tally->packets += results->packets;
690
691        /* If the current timestamp is greater than the last printed plus the interval, output a result */
692        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
693
694                /* update last key */
695                tally->lastkey = key;
696
697                /* Plot the result with the key in epoch seconds*/
698                plot_results(tally, key >> 32);
699
700                /* increment the output counter */
701                tally->output_count++;
702
703                /* clear the tally but copy old values over first*/
704                for(i=0;i<4;i++) {
705                        for(j=0;j<256;j++) {
706                                tally->src_lastoutput[i][j] = tally->src[i][j];
707                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
708                                tally->src[i][j] = 0;
709                                tally->dst[i][j] = 0;
710                        }
711                        /* Clear all the stats data */
712                        tally->stats->mode_src[i] = 0;
713                        tally->stats->mode_dst[i] = 0;
714                        tally->stats->mean_src[i] = 0;
715                        tally->stats->mean_dst[i] = 0;
716                        tally->stats->median_src[i] = 0;
717                        tally->stats->median_dst[i] = 0;
718                        tally->stats->stddev_src[i] = 0;
719                        tally->stats->stddev_dst[i] = 0;
720                        tally->stats->variance_src[i] = 0;
721                        tally->stats->variance_dst[i] = 0;
722                        tally->stats->skewness_src[i] = 0;
723                        tally->stats->skewness_dst[i] = 0;
724                }
725                /* free the priority queue */
726                for(i=0;i<4;i++) {
727                        free(tally->stats->rank_src[i]);
728                        free(tally->stats->rank_dst[i]);
729                }
730
731                tally->packets = 0;
732
733        }
734
735        /* Cleanup the thread results */
736        free(results);
737}
738
739/* Callback when the reporter thread stops (essentially when the program ends) */
740static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
741
742        /* Get the tally from the thread local storage */
743        struct addr_local *tally = (struct addr_local *)tls;
744
745        /* If there is any remaining data in the tally plot it */
746        if(tally->packets > 0) {
747                /* Then plot the results */
748                plot_results(tally, (tally->lastkey >> 32) + 1);
749        }
750        /* Cleanup tally results*/
751        free(tally);
752}
753
754static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
755        libtrace_callback_set_t *reporting) {
756        /* Only destroy if the structure exists */
757        if(trace) {
758                trace_destroy(trace);
759        }
760        if(processing) {
761                trace_destroy_callback_set(processing);
762        }
763        if(reporting) {
764                trace_destroy_callback_set(reporting);
765        }
766}
767
768/* Converts a string representation eg 1.2.3.4/24 into a network structure */
769static int get_network(char *network_string, struct network *network) {
770
771        char delim[] = "/";
772        /* Split the address and mask portion of the string */
773        char *address = strtok(network_string, delim);
774        char *mask = strtok(NULL, delim);
775
776        /* Check the subnet mask is valid */
777        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
778                return 1;
779        }
780        /* right shift so netmask is in network byte order */
781        network->mask = 0xffffffff << (32 - atoi(mask));
782
783        struct in_addr addr;
784        /* Convert address string into uint32_t and check its valid */
785        if(inet_aton(address, &addr) == 0) {
786                return 2;
787        }
788        /* Ensure its saved in network byte order */
789        network->address = htonl(addr.s_addr);
790
791        /* Calculate the network address */
792        network->network = network->address & network->mask;
793
794        return 0;
795}
796
797int main(int argc, char *argv[]) {
798
799        libtrace_t *trace;
800        /* Callbacks for processing and reporting threads */
801        libtrace_callback_set_t *processing, *reporter;
802
803
804        /* Ensure the input URI was supplied */
805        if(argc < 3) {
806                fprintf(stderr, "Usage: %s inputURI [outputInterval (Seconds)] [excluded networks]\n", argv[0]);
807                fprintf(stderr, "       eg. ./ipdist input.erf 60 210.10.3.0/24 70.5.0.0/16\n");
808                return 1;
809        }
810        /* Convert tick into an int */
811        tickrate = atoi(argv[2]);
812
813
814        /* Create the trace */
815        trace = trace_create(argv[1]);
816        /* Ensure no error has occured creating the trace */
817        if(trace_is_err(trace)) {
818                trace_perror(trace, "Creating trace");
819                return 1;
820        }
821
822        /* Setup the processing threads */
823        processing = trace_create_callback_set();
824        trace_set_starting_cb(processing, start_callback);
825        trace_set_packet_cb(processing, per_packet);
826        trace_set_stopping_cb(processing, stop_processing);
827        trace_set_tick_interval_cb(processing, per_tick);
828        /* Setup the reporter threads */
829        reporter = trace_create_callback_set();
830        trace_set_starting_cb(reporter, start_reporter);
831        trace_set_result_cb(reporter, per_result);
832        trace_set_stopping_cb(reporter, stop_reporter);
833
834        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
835        trace_set_perpkt_threads(trace, 4);
836        /* Order the results by timestamp */
837        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
838        /* Try to balance the load across all processing threads */
839        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
840
841        /* Set the tick interval only if this is a live capture */
842        if(trace_get_information(trace)->live) {
843                /* tickrate is in seconds but tick_interval expects milliseconds */
844                trace_set_tick_interval(trace, tickrate*1000);
845        }
846        /* Do not buffer the reports */
847        trace_set_reporter_thold(trace, 1);
848
849
850        /* Setup excluded networks if any were supplied */
851        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
852        exclude->networks = malloc(sizeof(struct network)*(argc-3));
853        if(exclude == NULL || exclude->networks == NULL) {
854                fprintf(stderr, "Unable to allocate memory");
855                libtrace_cleanup(trace, processing, reporter);
856                return 1;
857        }
858        exclude->count = 0;
859        int i;
860        for(i=0;i<argc-3;i++) {
861                /* convert the network string into a network structure */
862                if(get_network(argv[i+3], &exclude->networks[i]) != 0) {
863                        fprintf(stderr, "Error creating excluded network");
864                        return 1;
865                }
866                /* increment the count of excluded networks */
867                exclude->count += 1;
868        }
869
870
871        /* Start the trace, if it errors return */
872        if(trace_pstart(trace, exclude, processing, reporter)) {
873                trace_perror(trace, "Starting parallel trace");
874                libtrace_cleanup(trace, processing, reporter);
875                return 1;
876        }
877
878        /* This will wait for all threads to complete */
879        trace_join(trace);
880
881        /* Clean up everything */
882        free(exclude->networks);
883        free(exclude);
884        libtrace_cleanup(trace, processing, reporter);
885
886        return 0;
887}
Note: See TracBrowser for help on using the repository browser.