source: examples/tutorial/ipdist-parallel.c @ 1f79936

develop
Last change on this file since 1f79936 was 1f79936, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Make timeseries graph cumulative

  • Property mode set to 100644
File size: 30.1 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9#include <math.h>
10
11/* Structure to hold the counters each thread has its own one of these */
12struct addr_local {
13        /* Holds the counts of each number occurance per octet, These are cleared after every output. */
14        uint64_t src[4][256];
15        uint64_t dst[4][256];
16        /* Holds the results from the previous output */
17        uint64_t src_lastoutput[4][256];
18        uint64_t dst_lastoutput[4][256];
19        /* Holds the timestamp */
20        uint64_t lastkey;
21        /* Is the count of the number of packets processed, This is cleared after every output. */
22        uint64_t packets;
23        uint64_t output_count;
24        /* Pointer to stats structure */
25        struct addr_stats *stats;
26};
27struct addr_stats {
28        /* Holds the percentage change compared to the previous output */
29        float src[4][256];
30        float dst[4][256];
31        double mode_src[4];
32        double mode_dst[4];
33        double mean_src[4];
34        double mean_dst[4];
35        double median_src[4];
36        double median_dst[4];
37        double stddev_src[4];
38        double stddev_dst[4];
39        double variance_src[4];
40        double variance_dst[4];
41        double skewness_src[4];
42        double skewness_dst[4];
43        struct addr_rank *rank_src[4];
44        struct addr_rank *rank_dst[4];
45};
46struct addr_rank {
47        uint8_t addr;
48        /* count is the priority */
49        uint64_t count;
50        /* pointer to next ranking item */
51        struct addr_rank* next;
52};
53
54/* Structure to hold excluded networks */
55struct exclude_networks {
56        int count;
57        struct network *networks;
58};
59struct network {
60        uint32_t address;
61        uint32_t mask;
62        uint32_t network;
63};
64
65/* interval between outputs in seconds */
66uint64_t tickrate;
67
68char *stats_outputdir = "/home/jcv9/output-spectre/";
69/* Calculate and plot the percentage change from the previous plot */
70int stats_percentage_change = 0;
71
72/*************************************************************************
73Priority queue linked list */
74
75static struct addr_rank *rank_new(uint8_t addr, uint64_t count) {
76        struct addr_rank *tmp = malloc(sizeof(struct addr_rank));
77        tmp->addr = addr;
78        tmp->count = count;
79        tmp->next = NULL;
80
81        return tmp;
82}
83static uint8_t peak(struct addr_rank **head) {
84        return (*head)->addr;
85}
86static uint64_t peak_count(struct addr_rank **head) {
87        return (*head)->count;
88}
89static void pop(struct addr_rank **head) {
90        struct addr_rank* tmp = *head;
91        (*head) = (*head)->next;
92        free(tmp);
93}
94static void push(struct addr_rank **head, uint8_t addr, uint64_t count) {
95        struct addr_rank *curr = (*head);
96        struct addr_rank *tmp = rank_new(addr, count);
97
98        /* Check if the new node has a greater priority than the head */
99        if((*head)->count < count) {
100                tmp->next = *head;
101                (*head) = tmp;
102        } else {
103                /* Jump through the list until we find the correct position */
104                while (curr->next != NULL && curr->next->count > count) {
105                        curr = curr->next;
106                }
107
108                tmp->next = curr->next;
109                curr->next = tmp;
110        }
111}
112/*************************************************************************/
113
114
115static void compute_stats(struct addr_local *tally) {
116        int i, j, k;
117
118        /* Calculates the percentage change from the last output. NEED TO MAKE THIS WEIGHTED */
119        if(stats_percentage_change) {
120                for(i=0;i<256;i++) {
121                        for(j=0;j<4;j++) {
122                                tally->stats->src[j][i] = 0;
123                                tally->stats->dst[j][i] = 0;
124                                if(tally->src[j][i] != 0) {
125                                        tally->stats->src[j][i] = (((float)tally->src[j][i] - (float)tally->src_lastoutput[j][i]) / (float)tally->src[j][i]) * 100;
126                                }
127                                if(tally->dst[j][i] != 0) {
128                                        tally->stats->dst[j][i] = (((float)tally->dst[j][i] - (float)tally->dst_lastoutput[j][i]) / (float)tally->dst[j][i]) * 100;
129                                }
130                        }
131                }
132        }
133
134        /* To get ranking we push everything into the priority queue at pop things off 1 by one which returns them in high to lowest priority */
135        for(i=0;i<4;i++) {
136                tally->stats->rank_src[i] = rank_new(0, tally->src[i][0]);
137                tally->stats->rank_dst[i] = rank_new(0, tally->dst[i][0]);
138                for(j=1;j<256;j++) {
139                        /* Push everything into the priority queue
140                         * each item will be popped off in the correct order */
141                        push(&tally->stats->rank_src[i], j, tally->src[i][j]);
142                        push(&tally->stats->rank_dst[i], j, tally->dst[i][j]);
143                }
144        }
145
146        /* Calculate mean, variance and standard deviation */
147        for(k=0;k<4;k++) {
148
149                double ex = 0;
150                double ex2 = 0;
151                double n = 0;
152                double m = 0;
153                for(i=0;i<256;i++) {
154                        for(j=0;j<tally->src[k][i];j++) {
155                                if(n == 0) {
156                                        m = i;
157                                }
158                                n += 1;
159                                ex += (i - m);
160                                ex2 += ((i - m) * (i - m));
161                        }
162                }
163
164                tally->stats->mean_src[k] = (k + (ex / n));
165                tally->stats->variance_src[k] = ((ex2 - (ex*ex)/n) / n);
166                tally->stats->stddev_src[k] = sqrt(tally->stats->variance_src[k]);
167
168                ex = 0;
169                ex2 = 0;
170                n = 0;
171                m = 0;
172                for(i=0;i<256;i++) {
173                        for(j=0;j<tally->dst[k][i];j++) {
174                                if(n == 0) {
175                                        m = i;
176                                }
177                                n += 1;
178                                ex += (i - m);
179                                ex2 += ((i - m) * (i - m));
180                        }
181                }
182                tally->stats->mean_dst[k] = (k + (ex / n));
183                tally->stats->variance_dst[k] = ((ex2 - (ex*ex)/n) / n);
184                tally->stats->stddev_dst[k] = sqrt(tally->stats->variance_dst[k]);
185                /* Get the median */
186                int c = (n/2) - tally->src[k][0];
187                int c2 = 0;
188                while(c > 0) {
189                        c2 += 1;
190                        c -= tally->src[k][c2];
191                }
192                tally->stats->median_src[k] = c2;
193                c = (n/2) - tally->dst[k][0];
194                c2 = 0;
195                while(c > 0) {
196                        c2 += 1;
197                        c -= tally->dst[k][c2];
198                }
199                tally->stats->median_dst[k] = c2;
200                /* Get the mode which is the first item in the priority queue */
201                tally->stats->mode_src[k] = peak(&tally->stats->rank_src[k]);
202                tally->stats->mode_dst[k] = peak(&tally->stats->rank_src[k]);
203                /* Calculate skewness */
204                tally->stats->skewness_src[k] = (tally->stats->mean_src[k] - tally->stats->median_src[k]) / tally->stats->stddev_src[k];
205                tally->stats->skewness_dst[k] = (tally->stats->mean_dst[k] - tally->stats->median_dst[k]) / tally->stats->stddev_dst[k];
206        }
207
208}
209
210static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
211
212        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
213        /* Proccessing thread local storage */
214        struct addr_local *local = (struct addr_local *)tls;
215
216        /* Populate the result structure from the threads local storage and clear threads local storage*/
217        int i, j;
218        for(i=0;i<4;i++) {
219                for(j=0;j<256;j++) {
220                        result->src[i][j] = local->src[i][j];
221                        result->dst[i][j] = local->dst[i][j];
222                        /* clear local storage */
223                        local->src[i][j] = 0;
224                        local->dst[i][j] = 0;
225                }
226        }
227        result->packets = local->packets;
228        local->packets = 0;
229
230        /* Push result to the combiner */
231        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
232}
233
234/* Start callback function - This is run for each thread when it starts */
235static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
236
237        /* Create and initialize the local counter struct */
238        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
239        int i, j;
240        for(i=0;i<4;i++) {
241                for(j=0;j<256;j++) {
242                        local->src[i][j] = 0;
243                        local->dst[i][j] = 0;
244                }
245        }
246        local->lastkey = 0;
247        local->packets = 0;
248
249        /* return the local storage so it is available for all other callbacks for the thread*/
250        return local;
251}
252
253/* Checks if address is part of a excluded subnet. */
254static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
255
256        int i;
257        for(i=0;i<exclude->count;i++) {
258                /* Convert address into a network address */
259                uint32_t net_addr = address & exclude->networks[i].mask;
260
261                /* If this matches the network address from the excluded list we need to exclude this
262                   address. */
263                if(net_addr == exclude->networks[i].network) {
264                        return 1;
265                }
266        }
267
268        /* If we got this far the address should not be excluded */
269        return 0;
270}
271
272static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
273
274        /* Checks if the ip is of type IPv4 */
275        if (ip->sa_family == AF_INET) {
276
277                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
278                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
279                /* Get in_addr from sockaddr */
280                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
281                /* Ensure the address is in network byte order */
282                uint32_t address = htonl(ip4.s_addr);
283
284                /* Check if the address is part of an excluded network. */
285                if(network_excluded(address, exclude) == 0) {
286
287                        /* Split the IPv4 address into each octet */
288                        uint8_t octet[4];
289                        octet[0] = (address & 0xff000000) >> 24;
290                        octet[1] = (address & 0x00ff0000) >> 16;
291                        octet[2] = (address & 0x0000ff00) >> 8;
292                        octet[3] = (address & 0x000000ff);
293
294                        /* check if the supplied address was a source or destination,
295                           increment the correct one */
296                        if(srcaddr) {
297                                int i;
298                                for(i=0;i<4;i++) {
299                                        local->src[i][octet[i]] += 1;
300                                }
301                        } else {
302                                int i;
303                                for(i=0;i<4;i++) {
304                                        local->dst[i][octet[i]] += 1;
305                                }
306                        }
307                }
308        }
309}
310
311/* Per packet callback function run by each thread */
312static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
313        libtrace_packet_t *packet) {
314
315        /* Regain access to the address counter structure */
316        struct addr_local *local = (struct addr_local *)tls;
317
318        /* If this is the first packet set the lastkey to the packets timestamp */
319        if(local->lastkey == 0) {
320                local->lastkey = trace_get_erf_timestamp(packet);
321        }
322
323        /* Increment the packet count */
324        local->packets += 1;
325
326        /* Regain access to excluded networks pointer */
327        struct exclude_networks *exclude = (struct exclude_networks *)global;
328
329        struct sockaddr_storage addr;
330        struct sockaddr *ip;
331
332        /* Get the source IP address */
333        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
334        /* If a source ip address was found */
335        if(ip != NULL) {
336                process_ip(ip, local, exclude, 1);
337        }
338
339        /* Get the destination IP address */
340        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
341        /* If a destination ip address was found */
342        if(ip != NULL) {
343                process_ip(ip, local, exclude, 0);
344        }
345
346        /* If this trace is not live we will manually call "per tick" */
347        if(!trace_get_information(trace)->live) {
348                /* get the current packets timestamp */
349                uint64_t timestamp = trace_get_erf_timestamp(packet);
350
351                /* We only want to call per_tick if we are due to output something
352                 * Right shifting these converts them to seconds, tickrate is in seconds */
353                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
354                        per_tick(trace, thread, global, local, timestamp);
355                        local->lastkey = timestamp;
356                }
357        }
358
359        /* Return the packet to libtrace */
360        return packet;
361}
362
363/* Stopping callback function - When a thread closes */
364static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
365
366        /* cast the local storage structure */
367        struct addr_local *local = (struct addr_local *)tls;
368        /* Create structure to store the result */
369        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
370
371        /* Populate the result */
372        int i, j;
373        for(i=0;i<4;i++) {
374                for(j=0;j<256;j++) {
375                        result->src[i][j] = local->src[i][j];
376                        result->dst[i][j] = local->dst[i][j];
377                }
378        }
379        result->packets = local->packets;
380
381        /* Send the final results to the combiner */
382        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
383
384        /* Cleanup the local storage */
385        free(local);
386}
387
388/* Starting callback for reporter thread */
389static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
390        /* Create tally structure */
391        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
392        tally->stats = malloc(sizeof(struct addr_stats));
393
394        /* Initialize the tally structure */
395        int i, j;
396        for(i=0;i<4;i++) {
397                for(j=0;j<256;j++) {
398                        tally->src[i][j] = 0;
399                        tally->dst[i][j] = 0;
400                        tally->src_lastoutput[i][j] = 0;
401                        tally->dst_lastoutput[i][j] = 0;
402                        tally->stats->src[i][j] = 0;
403                        tally->stats->dst[i][j] = 0;
404                }
405                /* Stats related varibles */
406                tally->stats->mode_src[i] = 0;
407                tally->stats->mode_dst[i] = 0;
408                tally->stats->mean_src[i] = 0;
409                tally->stats->mean_dst[i] = 0;
410                tally->stats->median_src[i] = 0;
411                tally->stats->median_dst[i] = 0;
412                tally->stats->stddev_src[i] = 0;
413                tally->stats->stddev_dst[i] = 0;
414                tally->stats->variance_src[i] = 0;
415                tally->stats->variance_dst[i] = 0;
416                tally->stats->skewness_src[i] = 0;
417                tally->stats->skewness_dst[i] = 0;
418        }
419        tally->lastkey = 0;
420        tally->packets = 0;
421        tally->output_count = 0;
422
423        return tally;
424}
425
426static void plot_results(struct addr_local *tally, uint64_t tick) {
427
428        int i, j, k;
429
430        /* Calculations before reporting the results */
431        /* Need to initialise lastoutput values on first pass,
432         * this is so we have a base line for percentage changed */
433        if(tally->output_count == 0) {
434                for(i=0;i<4;i++) {
435                        for(j=0;j<256;j++) {
436                                tally->src_lastoutput[i][j] = tally->src[i][j];
437                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
438                        }
439                }
440         }
441        /* Compute the stats */
442        compute_stats(tally);
443
444        /* Finaly output the results */
445        printf("Generating output \"%sipdist-%lu\"\n", stats_outputdir, tick);
446
447        /* Output the results */
448        char outputfile[255];
449        snprintf(outputfile, sizeof(outputfile), "%sipdist-%lu.data", stats_outputdir, tick);
450        FILE *tmp = fopen(outputfile, "w");
451        fprintf(tmp, "#time\t\trank\toctet1\t\t\t\toctet2\t\t\t\toctet3\t\t\t\toctet4\n");
452        fprintf(tmp, "#\t\t\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\n");
453        for(i=0;i<256;i++) {
454                fprintf(tmp, "%lu\t%d", tick, i+1);
455                for(j=0;j<4;j++) {
456                        /* Get the highest ranking to lowest ranking octets */
457                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_src[j]));
458                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_src[j]));
459                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_dst[j]));
460                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_dst[j]));
461                        pop(&tally->stats->rank_src[j]);
462                        pop(&tally->stats->rank_dst[j]);
463                }
464                fprintf(tmp, "\n");
465        }
466        fclose(tmp);
467
468        char outputfile_stats[255];
469        snprintf(outputfile_stats, sizeof(outputfile_stats), "%sipdist-%lu.stats", stats_outputdir, tick);
470        tmp = fopen(outputfile_stats, "w");
471        /* append stats data to end of file */
472        fprintf(tmp, "#\tmean\tstddev\tvariance\tmedian\tmode\tskewness\n");
473        for(i=0;i<4;i++) {
474                fprintf(tmp, "src%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_src[i], tally->stats->stddev_src[i], tally->stats->variance_src[i], tally->stats->median_src[i], tally->stats->mode_src[i], tally->stats->skewness_src[i]);
475                fprintf(tmp, "dst%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_dst[i], tally->stats->stddev_dst[i], tally->stats->variance_dst[i], tally->stats->median_src[i], tally->stats->mode_dst[i], tally->stats->skewness_dst[i]);
476                fprintf(tmp, "\n\n");
477        }
478        fclose(tmp);
479
480        /* Puts data into timeseries files that gnuplot likes */
481        char outputfile2[255];
482        for(k=0;k<2;k++) {
483                for(j=0;j<4;j++) {
484                        /* If k is 0 we are doing src else dst */
485                        if(k) {
486                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-dst-octet%d.data", stats_outputdir, j+1);
487                        } else {
488                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-src-octet%d.data", stats_outputdir, j+1);
489                        }
490                        if(tally->output_count == 0) {
491                                tmp = fopen(outputfile2, "w");
492                                fprintf(tmp, "timestamp\t");
493                                for(i=0;i<256;i++) {
494                                        fprintf(tmp, "%d\t", i);
495                                }
496                                fprintf(tmp, "\n");
497                        } else {
498                                tmp = fopen(outputfile2, "a");
499                        }
500                        fprintf(tmp, "%lu\t", tick);
501                        for(i=0;i<256;i++) {
502                                if(k) {
503                                        fprintf(tmp, "%lu\t", tally->dst[j][i]);
504                                } else {
505                                        fprintf(tmp, "%lu\t", tally->src[j][i]);
506                                }
507                        }
508                        fprintf(tmp, "\n");
509                        fclose(tmp);
510                }
511        }
512
513
514        /* Get the version of gnuplot */
515        //char delim[] = " ";
516        //char gnuplot_result[256];
517        //double gnuplot_version = 0;
518        //FILE *gnuplot = popen("gnuplot --version", "r");
519        //fgets(gnuplot_result, sizeof(gnuplot_result)-1, gnuplot);
520        //strtok(gnuplot_result, delim);
521        //gnuplot_version = atof(strtok(NULL, delim));
522        //pclose(gnuplot);
523
524        /* Plot the results */
525        for(i=0;i<4;i++) {
526                char outputplot[255];
527                snprintf(outputplot, sizeof(outputplot), "%sipdist-%lu-octet%d.png", stats_outputdir, tick, i+1);
528                /* Open pipe to gnuplot */
529                FILE *gnuplot = popen("gnuplot -persistent", "w");
530                /* send all commands to gnuplot */
531                fprintf(gnuplot, "set term pngcairo enhanced size 1280,960\n");
532                fprintf(gnuplot, "set output '%s'\n", outputplot);
533                fprintf(gnuplot, "set multiplot layout 2,1\n");
534                fprintf(gnuplot, "set title 'IP Distribution'\n");
535                fprintf(gnuplot, "set xrange[0:255]\n");
536                fprintf(gnuplot, "set xlabel 'Prefix'\n");
537                fprintf(gnuplot, "set ylabel 'Hits'\n");
538                fprintf(gnuplot, "set xtics 0,10,255\n");
539                /* Setup labels that hold mean, standard deviation and variance */
540                fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 2 name 'SOURCEMEAN' nooutput\n", outputfile_stats, i);
541                //fprintf(gnuplot, "set label 1 gprintf(\"Source Mean %u\", SOURCEMEAN_min) at graph 0.02, 0.95\n");
542                fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 2 name 'DESTMEAN' nooutput\n", outputfile_stats, i);
543                //fprintf(gnuplot, "set label 2 gprintf(\"Destination Mean: %f\", DESTMEAN_min) at graph 0.02, 0.90\n");
544                //fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 3 name 'SOURCESTDDEV' nooutput\n", outputfile_stats, i);
545                //fprintf(gnuplot, "set label 3 sprintf('Source Standard Deviation: %f', SOURCESTDDEV_min) at graph 0.24, 0.95\n");
546                //fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 3 name 'DESTSTDDEV' nooutput\n", outputfile_stats, i);
547                //fprintf(gnuplot, "set label 4 sprintf('Destination Standard Deviation: %f', DESTSTDDEV_min) at graph 0.24, 0.90\n");
548                //fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 4 name 'SOURCEVAR' nooutput\n", outputfile_stats, i);
549                //fprintf(gnuplot, "set label 5 sprintf('Source Variance: %f', SOURCEVAR_min) at graph 0.55, 0.95\n");
550                //fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 4 name 'DESTVAR' nooutput\n", outputfile_stats, i);
551                //fprintf(gnuplot, "set label 6 sprintf('Destination Variance: %f', DESTVAR_min) at graph 0.55, 0.90\n");
552                /* Plot the first graph of the multiplot */
553                fprintf(gnuplot, "set arrow from SOURCEMEAN_min, graph 0 to SOURCEMEAN_min, graph 1 nohead lt 1\n");
554                fprintf(gnuplot, "set arrow from DESTMEAN_min, graph 0 to DESTMEAN_min, graph 1 nohead lt 2\n");
555                fprintf(gnuplot, "plot '%s' using %d:%d index 0 title 'Source octet %d' smooth unique with boxes,", outputfile, (i*4)+3,(i*4)+4, i+1);
556                fprintf(gnuplot, "'%s' using %d:%d index 0 title 'Destination octet %d' smooth unique with boxes,", outputfile, (i*4)+5, (i*4)+6, i+1);
557                fprintf(gnuplot, "1/0 t 'Source mean' lt 1,");
558                fprintf(gnuplot, "1/0 t 'Destination mean' lt 2\n");
559                /* Unset labels for next plot */
560                fprintf(gnuplot, "unset arrow\n");
561                fprintf(gnuplot, "unset label 1\nunset label 2\nunset label 3\nunset label 4\nunset label 5\nunset label 6\n");
562                fprintf(gnuplot, "set title 'Zipf Distribution'\n");
563                fprintf(gnuplot, "set xlabel 'Rank'\n");
564                fprintf(gnuplot, "set ylabel 'Frequency'\n");
565                fprintf(gnuplot, "set logscale xy 10\n");
566                fprintf(gnuplot, "set xrange[1:255]\n");
567                fprintf(gnuplot, "set xtics 0,10,255\n");
568                /* Plot the second graph of the multiplot */
569                fprintf(gnuplot, "plot '%s' using 2:%d index 0 title 'Source octet %d',", outputfile, (i*4)+4, i+1);
570                fprintf(gnuplot, "'%s' using 2:%d index 0 title 'Destination octet %d'\n", outputfile, (i*4)+6, i+1);
571                fprintf(gnuplot, "unset multiplot\n");
572                pclose(gnuplot);
573        }
574
575        /* Plot time series */
576        for(k=0;k<2;k++) {
577                for(i=0;i<4;i++) {
578                        char outputplot2[255];
579                        if(k) {
580                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-dst-octet%i.png", stats_outputdir, i+1);
581                        } else {
582                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-src-octet%i.png", stats_outputdir, i+1);
583                        }
584                        FILE *gnuplot = popen("gnuplot -persistent", "w");
585                        fprintf(gnuplot, "set term pngcairo size 1280,960 \n");
586                        fprintf(gnuplot, "set output '%s'\n", outputplot2);
587                        if(k) {
588                                fprintf(gnuplot, "set title 'Timeseries Dst Octet %i - Cumulative'\n", i+1);
589                        } else {
590                                fprintf(gnuplot, "set title 'Timeseries Src Octet %i - Cumulative'\n", i+1);
591                        }
592                        fprintf(gnuplot, "set xtics rotate\n");
593                        //fprintf(gnuplot, "set key out vert\n");
594                        fprintf(gnuplot, "set key off\n");
595                        //fprintf(gnuplot, "set xdata time\n");
596                        //fprintf(gnuplot, "set timefmt '%%s'\n");
597                        //fprintf(gnuplot, "set format x '%%m/%%d/%%Y %%H:%%M:%%S'\n");
598                        fprintf(gnuplot, "set autoscale xy\n");
599                        if(k) {
600                                //if(gnuplot_version < 5) {
601                                        fprintf(gnuplot, "plot '%sipdist-timeseries-dst-octet%d.data' using 2:xtic(1) with lines title columnheader(2) smooth cumulative, for[i=3:257] '' using i with lines title columnheader(i) smooth cumulative\n", stats_outputdir, i+1);
602                                //} else {
603                                //      fprintf(gnuplot, "plot '%sipdist-timeseries-dst-octet%d.data' using 2:xtic(1) with lines title columnheader(2) at end smooth cumulative, for[i=3:257] '' using i with lines title columnheader(i) at end smooth cumulative\n", stats_outputdir, i+1);
604                                //}
605                        } else {
606                                //if(gnuplot_version < 5) {
607                                        fprintf(gnuplot, "plot '%sipdist-timeseries-src-octet%d.data' using 2:xtic(1) with lines title columnheader(2) smooth cumulative, for[i=3:257] '' using i with lines title columnheader(i) smooth cumulative\n", stats_outputdir, i+1);
608                                //} else {
609                                //      fprintf(gnuplot, "plot '%sipdist-timeseries-src-octet%d.data' using 2:xtic(1) with lines title columnheader(2) at end smooth cumulative, for[i=3:257] '' using i with lines title columnheader(i) at end smooth cumulative\n", stats_outputdir, i+1);
610                                //}
611                        }
612                        fprintf(gnuplot, "replot");
613                        pclose(gnuplot);
614                }
615        }
616}
617
618
619/* Callback when a result is given to the reporter thread */
620static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
621        void *tls, libtrace_result_t *result) {
622
623        struct addr_local *results;
624        struct addr_local *tally;
625        uint64_t key;
626
627        /* We only want to handle results containing our user-defined structure  */
628        if(result->type != RESULT_USER) {
629                return;
630        }
631
632        /* This key is the key that was passed into trace_publish_results
633         * this will contain the erf timestamp for the packet */
634        key = result->key;
635
636        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
637        results = (struct addr_local *)result->value.ptr;
638
639        /* Grab our tally out of thread local storage */
640        tally = (struct addr_local *)tls;
641
642        /* Add all the results to the tally */
643        int i, j;
644        for(i=0;i<4;i++) {
645                for(j=0;j<256;j++) {
646                        tally->src[i][j] += results->src[i][j];
647                        tally->dst[i][j] += results->dst[i][j];
648                }
649        }
650        tally->packets += results->packets;
651
652        /* If the current timestamp is greater than the last printed plus the interval, output a result */
653        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
654
655                /* update last key */
656                tally->lastkey = key;
657
658                /* Plot the result with the key in epoch seconds*/
659                plot_results(tally, key >> 32);
660
661                /* increment the output counter */
662                tally->output_count++;
663
664                /* clear the tally but copy old values over first*/
665                for(i=0;i<4;i++) {
666                        for(j=0;j<256;j++) {
667                                tally->src_lastoutput[i][j] = tally->src[i][j];
668                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
669                                tally->src[i][j] = 0;
670                                tally->dst[i][j] = 0;
671                        }
672                        /* Clear all the stats data */
673                        tally->stats->mode_src[i] = 0;
674                        tally->stats->mode_dst[i] = 0;
675                        tally->stats->mean_src[i] = 0;
676                        tally->stats->mean_dst[i] = 0;
677                        tally->stats->median_src[i] = 0;
678                        tally->stats->median_dst[i] = 0;
679                        tally->stats->stddev_src[i] = 0;
680                        tally->stats->stddev_dst[i] = 0;
681                        tally->stats->variance_src[i] = 0;
682                        tally->stats->variance_dst[i] = 0;
683                        tally->stats->skewness_src[i] = 0;
684                        tally->stats->skewness_dst[i] = 0;
685                }
686                /* free the priority queue */
687                for(i=0;i<4;i++) {
688                        free(tally->stats->rank_src[i]);
689                        free(tally->stats->rank_dst[i]);
690                }
691
692                tally->packets = 0;
693
694        }
695
696        /* Cleanup the thread results */
697        free(results);
698}
699
700/* Callback when the reporter thread stops (essentially when the program ends) */
701static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
702
703        /* Get the tally from the thread local storage */
704        struct addr_local *tally = (struct addr_local *)tls;
705
706        /* If there is any remaining data in the tally plot it */
707        if(tally->packets > 0) {
708                /* Then plot the results */
709                plot_results(tally, (tally->lastkey >> 32) + 1);
710        }
711        /* Cleanup tally results*/
712        free(tally);
713}
714
715static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
716        libtrace_callback_set_t *reporting) {
717        /* Only destroy if the structure exists */
718        if(trace) {
719                trace_destroy(trace);
720        }
721        if(processing) {
722                trace_destroy_callback_set(processing);
723        }
724        if(reporting) {
725                trace_destroy_callback_set(reporting);
726        }
727}
728
729/* Converts a string representation eg 1.2.3.4/24 into a network structure */
730static int get_network(char *network_string, struct network *network) {
731
732        char delim[] = "/";
733        /* Split the address and mask portion of the string */
734        char *address = strtok(network_string, delim);
735        char *mask = strtok(NULL, delim);
736
737        /* Check the subnet mask is valid */
738        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
739                return 1;
740        }
741        /* right shift so netmask is in network byte order */
742        network->mask = 0xffffffff << (32 - atoi(mask));
743
744        struct in_addr addr;
745        /* Convert address string into uint32_t and check its valid */
746        if(inet_aton(address, &addr) == 0) {
747                return 2;
748        }
749        /* Ensure its saved in network byte order */
750        network->address = htonl(addr.s_addr);
751
752        /* Calculate the network address */
753        network->network = network->address & network->mask;
754
755        return 0;
756}
757
758int main(int argc, char *argv[]) {
759
760        libtrace_t *trace;
761        /* Callbacks for processing and reporting threads */
762        libtrace_callback_set_t *processing, *reporter;
763
764
765        /* Ensure the input URI was supplied */
766        if(argc < 3) {
767                fprintf(stderr, "Usage: %s inputURI [outputInterval (Seconds)] [excluded networks]\n", argv[0]);
768                fprintf(stderr, "       eg. ./ipdist input.erf 60 210.10.3.0/24 70.5.0.0/16\n");
769                return 1;
770        }
771        /* Convert tick into an int */
772        tickrate = atoi(argv[2]);
773
774
775        /* Create the trace */
776        trace = trace_create(argv[1]);
777        /* Ensure no error has occured creating the trace */
778        if(trace_is_err(trace)) {
779                trace_perror(trace, "Creating trace");
780                return 1;
781        }
782
783        /* Setup the processing threads */
784        processing = trace_create_callback_set();
785        trace_set_starting_cb(processing, start_callback);
786        trace_set_packet_cb(processing, per_packet);
787        trace_set_stopping_cb(processing, stop_processing);
788        trace_set_tick_interval_cb(processing, per_tick);
789        /* Setup the reporter threads */
790        reporter = trace_create_callback_set();
791        trace_set_starting_cb(reporter, start_reporter);
792        trace_set_result_cb(reporter, per_result);
793        trace_set_stopping_cb(reporter, stop_reporter);
794
795        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
796        trace_set_perpkt_threads(trace, 4);
797        /* Order the results by timestamp */
798        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
799        /* Try to balance the load across all processing threads */
800        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
801
802        /* Set the tick interval only if this is a live capture */
803        if(trace_get_information(trace)->live) {
804                /* tickrate is in seconds but tick_interval expects milliseconds */
805                trace_set_tick_interval(trace, tickrate*1000);
806        }
807        /* Do not buffer the reports */
808        trace_set_reporter_thold(trace, 1);
809
810
811        /* Setup excluded networks if any were supplied */
812        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
813        exclude->networks = malloc(sizeof(struct network)*(argc-3));
814        if(exclude == NULL || exclude->networks == NULL) {
815                fprintf(stderr, "Unable to allocate memory");
816                libtrace_cleanup(trace, processing, reporter);
817                return 1;
818        }
819        exclude->count = 0;
820        int i;
821        for(i=0;i<argc-3;i++) {
822                /* convert the network string into a network structure */
823                if(get_network(argv[i+3], &exclude->networks[i]) != 0) {
824                        fprintf(stderr, "Error creating excluded network");
825                        return 1;
826                }
827                /* increment the count of excluded networks */
828                exclude->count += 1;
829        }
830
831
832        /* Start the trace, if it errors return */
833        if(trace_pstart(trace, exclude, processing, reporter)) {
834                trace_perror(trace, "Starting parallel trace");
835                libtrace_cleanup(trace, processing, reporter);
836                return 1;
837        }
838
839        /* This will wait for all threads to complete */
840        trace_join(trace);
841
842        /* Clean up everything */
843        free(exclude->networks);
844        free(exclude);
845        libtrace_cleanup(trace, processing, reporter);
846
847        return 0;
848}
Note: See TracBrowser for help on using the repository browser.