source: examples/tutorial/ipdist-parallel.c @ 96ec511

develop
Last change on this file since 96ec511 was 96ec511, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Added more metrics

  • Property mode set to 100644
File size: 27.9 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9#include <math.h>
10
11/* Structure to hold the counters each thread has its own one of these */
12struct addr_local {
13        /* Holds the counts of each number occurance per octet, These are cleared after every output. */
14        uint64_t src[4][256];
15        uint64_t dst[4][256];
16        /* Holds the results from the previous output */
17        uint64_t src_lastoutput[4][256];
18        uint64_t dst_lastoutput[4][256];
19        /* Holds the timestamp */
20        uint64_t lastkey;
21        /* Is the count of the number of packets processed, This is cleared after every output. */
22        uint64_t packets;
23        uint64_t output_count;
24        /* Pointer to stats structure */
25        struct addr_stats *stats;
26};
27struct addr_stats {
28        /* Holds the percentage change compared to the previous output */
29        float src[4][256];
30        float dst[4][256];
31        double mode_src[4];
32        double mode_dst[4];
33        double mean_src[4];
34        double mean_dst[4];
35        double stddev_src[4];
36        double stddev_dst[4];
37        double variance_src[4];
38        double variance_dst[4];
39        double skewness_src[4];
40        double skewness_dst[4];
41        struct addr_rank *rank_src[4];
42        struct addr_rank *rank_dst[4];
43};
44struct addr_rank {
45        uint8_t addr;
46        /* count is the priority */
47        uint64_t count;
48        /* pointer to next ranking item */
49        struct addr_rank* next;
50};
51
52/* Structure to hold excluded networks */
53struct exclude_networks {
54        int count;
55        struct network *networks;
56};
57struct network {
58        uint32_t address;
59        uint32_t mask;
60        uint32_t network;
61};
62
63/* interval between outputs in seconds */
64uint64_t tickrate;
65
66char *stats_outputdir = "/home/jcv9/output-spectre/";
67/* Calculate and plot the percentage change from the previous plot */
68int stats_percentage_change = 0;
69
70/*************************************************************************
71Priority queue linked list */
72
73static struct addr_rank *rank_new(uint8_t addr, uint64_t count) {
74        struct addr_rank *tmp = malloc(sizeof(struct addr_rank));
75        tmp->addr = addr;
76        tmp->count = count;
77        tmp->next = NULL;
78
79        return tmp;
80}
81static uint8_t peak(struct addr_rank **head) {
82        return (*head)->addr;
83}
84static uint64_t peak_count(struct addr_rank **head) {
85        return (*head)->count;
86}
87static void pop(struct addr_rank **head) {
88        struct addr_rank* tmp = *head;
89        (*head) = (*head)->next;
90        free(tmp);
91}
92static void push(struct addr_rank **head, uint8_t addr, uint64_t count) {
93        struct addr_rank *curr = (*head);
94        struct addr_rank *tmp = rank_new(addr, count);
95
96        /* Check if the new node has a greater priority than the head */
97        if((*head)->count < count) {
98                tmp->next = *head;
99                (*head) = tmp;
100        } else {
101                /* Jump through the list until we find the correct position */
102                while (curr->next != NULL && curr->next->count > count) {
103                        curr = curr->next;
104                }
105
106                tmp->next = curr->next;
107                curr->next = tmp;
108        }
109}
110/*************************************************************************/
111
112
113static void compute_stats(struct addr_local *tally) {
114        int i, j, k;
115
116        /* Calculates the percentage change from the last output. NEED TO MAKE THIS WEIGHTED */
117        if(stats_percentage_change) {
118                for(i=0;i<256;i++) {
119                        for(j=0;j<4;j++) {
120                                tally->stats->src[j][i] = 0;
121                                tally->stats->dst[j][i] = 0;
122                                if(tally->src[j][i] != 0) {
123                                        tally->stats->src[j][i] = (((float)tally->src[j][i] - (float)tally->src_lastoutput[j][i]) / (float)tally->src[j][i]) * 100;
124                                }
125                                if(tally->dst[j][i] != 0) {
126                                        tally->stats->dst[j][i] = (((float)tally->dst[j][i] - (float)tally->dst_lastoutput[j][i]) / (float)tally->dst[j][i]) * 100;
127                                }
128                        }
129                }
130        }
131
132        /* To get ranking we push everything into the priority queue at pop things off 1 by one which returns them in high to lowest priority */
133        for(i=0;i<4;i++) {
134                tally->stats->rank_src[i] = rank_new(0, tally->src[i][0]);
135                tally->stats->rank_dst[i] = rank_new(0, tally->dst[i][0]);
136                for(j=1;j<256;j++) {
137                        /* Push everything into the priority queue
138                         * each item will be popped off in the correct order */
139                        push(&tally->stats->rank_src[i], j, tally->src[i][j]);
140                        push(&tally->stats->rank_dst[i], j, tally->dst[i][j]);
141                }
142        }
143
144        /* Calculate mean, variance and standard deviation */
145        for(k=0;k<4;k++) {
146
147                double ex = 0;
148                double ex2 = 0;
149                double n = 0;
150                double m = 0;
151                for(i=0;i<256;i++) {
152                        for(j=0;j<tally->src[k][i];j++) {
153                                if(n == 0) {
154                                        m = i;
155                                }
156                                n += 1;
157                                ex += (i - m);
158                                ex2 += ((i - m) * (i - m));
159                        }
160                }
161                tally->stats->mean_src[k] = (k + (ex / n));
162                tally->stats->variance_src[k] = ((ex2 - (ex*ex)/n) / n);
163                tally->stats->stddev_src[k] = sqrt(tally->stats->variance_src[k]);
164
165                ex = 0;
166                ex2 = 0;
167                n = 0;
168                m = 0;
169                for(i=0;i<256;i++) {
170                        for(j=0;j<tally->dst[k][i];j++) {
171                                if(n == 0) {
172                                        m = i;
173                                }
174                                n += 1;
175                                ex += (i - m);
176                                ex2 += ((i - m) * (i - m));
177                        }
178                }
179                tally->stats->mean_dst[k] = (k + (ex / n));
180                tally->stats->variance_dst[k] = ((ex2 - (ex*ex)/n) / n);
181                tally->stats->stddev_dst[k] = sqrt(tally->stats->variance_dst[k]);
182
183                tally->stats->mode_src[k] = peak(&tally->stats->rank_src[k]);
184                tally->stats->mode_dst[k] = peak(&tally->stats->rank_src[k]);
185                /* Calculate skewness using pearsons mode method. This is accurate with large amounts of data */
186                tally->stats->skewness_src[k] = (tally->stats->mean_src[k] - tally->stats->mode_src[k]) / tally->stats->stddev_src[k];
187                tally->stats->skewness_dst[k] = (tally->stats->mean_dst[k] - tally->stats->mode_dst[k]) / tally->stats->stddev_dst[k];
188        }
189
190        printf("skewnesss: %f\n", tally->stats->skewness_dst[1]);
191
192}
193
194static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
195
196        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
197        /* Proccessing thread local storage */
198        struct addr_local *local = (struct addr_local *)tls;
199
200        /* Populate the result structure from the threads local storage and clear threads local storage*/
201        int i, j;
202        for(i=0;i<4;i++) {
203                for(j=0;j<256;j++) {
204                        result->src[i][j] = local->src[i][j];
205                        result->dst[i][j] = local->dst[i][j];
206                        /* clear local storage */
207                        local->src[i][j] = 0;
208                        local->dst[i][j] = 0;
209                }
210        }
211        result->packets = local->packets;
212        local->packets = 0;
213
214        /* Push result to the combiner */
215        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
216}
217
218/* Start callback function - This is run for each thread when it starts */
219static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
220
221        /* Create and initialize the local counter struct */
222        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
223        int i, j;
224        for(i=0;i<4;i++) {
225                for(j=0;j<256;j++) {
226                        local->src[i][j] = 0;
227                        local->dst[i][j] = 0;
228                }
229        }
230        local->lastkey = 0;
231        local->packets = 0;
232
233        /* return the local storage so it is available for all other callbacks for the thread*/
234        return local;
235}
236
237/* Checks if address is part of a excluded subnet. */
238static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
239
240        int i;
241        for(i=0;i<exclude->count;i++) {
242                /* Convert address into a network address */
243                uint32_t net_addr = address & exclude->networks[i].mask;
244
245                /* If this matches the network address from the excluded list we need to exclude this
246                   address. */
247                if(net_addr == exclude->networks[i].network) {
248                        return 1;
249                }
250        }
251
252        /* If we got this far the address should not be excluded */
253        return 0;
254}
255
256static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
257
258        /* Checks if the ip is of type IPv4 */
259        if (ip->sa_family == AF_INET) {
260
261                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
262                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
263                /* Get in_addr from sockaddr */
264                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
265                /* Ensure the address is in network byte order */
266                uint32_t address = htonl(ip4.s_addr);
267
268                /* Check if the address is part of an excluded network. */
269                if(network_excluded(address, exclude) == 0) {
270
271                        /* Split the IPv4 address into each octet */
272                        uint8_t octet[4];
273                        octet[0] = (address & 0xff000000) >> 24;
274                        octet[1] = (address & 0x00ff0000) >> 16;
275                        octet[2] = (address & 0x0000ff00) >> 8;
276                        octet[3] = (address & 0x000000ff);
277
278                        /* check if the supplied address was a source or destination,
279                           increment the correct one */
280                        if(srcaddr) {
281                                int i;
282                                for(i=0;i<4;i++) {
283                                        local->src[i][octet[i]] += 1;
284                                }
285                        } else {
286                                int i;
287                                for(i=0;i<4;i++) {
288                                        local->dst[i][octet[i]] += 1;
289                                }
290                        }
291                }
292        }
293}
294
295/* Per packet callback function run by each thread */
296static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
297        libtrace_packet_t *packet) {
298
299        /* Regain access to the address counter structure */
300        struct addr_local *local = (struct addr_local *)tls;
301
302        /* If this is the first packet set the lastkey to the packets timestamp */
303        if(local->lastkey == 0) {
304                local->lastkey = trace_get_erf_timestamp(packet);
305        }
306
307        /* Increment the packet count */
308        local->packets += 1;
309
310        /* Regain access to excluded networks pointer */
311        struct exclude_networks *exclude = (struct exclude_networks *)global;
312
313        struct sockaddr_storage addr;
314        struct sockaddr *ip;
315
316        /* Get the source IP address */
317        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
318        /* If a source ip address was found */
319        if(ip != NULL) {
320                process_ip(ip, local, exclude, 1);
321        }
322
323        /* Get the destination IP address */
324        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
325        /* If a destination ip address was found */
326        if(ip != NULL) {
327                process_ip(ip, local, exclude, 0);
328        }
329
330        /* If this trace is not live we will manually call "per tick" */
331        if(!trace_get_information(trace)->live) {
332                /* get the current packets timestamp */
333                uint64_t timestamp = trace_get_erf_timestamp(packet);
334
335                /* We only want to call per_tick if we are due to output something
336                 * Right shifting these converts them to seconds, tickrate is in seconds */
337                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
338                        per_tick(trace, thread, global, local, timestamp);
339                        local->lastkey = timestamp;
340                }
341        }
342
343        /* Return the packet to libtrace */
344        return packet;
345}
346
347/* Stopping callback function - When a thread closes */
348static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
349
350        /* cast the local storage structure */
351        struct addr_local *local = (struct addr_local *)tls;
352        /* Create structure to store the result */
353        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
354
355        /* Populate the result */
356        int i, j;
357        for(i=0;i<4;i++) {
358                for(j=0;j<256;j++) {
359                        result->src[i][j] = local->src[i][j];
360                        result->dst[i][j] = local->dst[i][j];
361                }
362        }
363        result->packets = local->packets;
364
365        /* Send the final results to the combiner */
366        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
367
368        /* Cleanup the local storage */
369        free(local);
370}
371
372/* Starting callback for reporter thread */
373static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
374        /* Create tally structure */
375        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
376        tally->stats = malloc(sizeof(struct addr_stats));
377
378        /* Initialize the tally structure */
379        int i, j;
380        for(i=0;i<4;i++) {
381                for(j=0;j<256;j++) {
382                        tally->src[i][j] = 0;
383                        tally->dst[i][j] = 0;
384                        tally->src_lastoutput[i][j] = 0;
385                        tally->dst_lastoutput[i][j] = 0;
386                        tally->stats->src[i][j] = 0;
387                        tally->stats->dst[i][j] = 0;
388                }
389                /* Stats related varibles */
390                tally->stats->mode_src[i] = 0;
391                tally->stats->mode_dst[i] = 0;
392                tally->stats->mean_src[i] = 0;
393                tally->stats->mean_dst[i] = 0;
394                tally->stats->stddev_src[i] = 0;
395                tally->stats->stddev_dst[i] = 0;
396                tally->stats->variance_src[i] = 0;
397                tally->stats->variance_dst[i] = 0;
398                tally->stats->skewness_src[i] = 0;
399                tally->stats->skewness_dst[i] = 0;
400        }
401        tally->lastkey = 0;
402        tally->packets = 0;
403        tally->output_count = 0;
404
405        return tally;
406}
407
408static void plot_results(struct addr_local *tally, uint64_t tick) {
409
410        int i, j, k;
411
412        /* Calculations before reporting the results */
413        /* Need to initialise lastoutput values on first pass,
414         * this is so we have a base line for percentage changed */
415        if(tally->output_count == 0) {
416                for(i=0;i<4;i++) {
417                        for(j=0;j<256;j++) {
418                                tally->src_lastoutput[i][j] = tally->src[i][j];
419                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
420                        }
421                }
422         }
423        /* Compute the stats */
424        compute_stats(tally);
425
426        /* Finaly output the results */
427        printf("Generating output \"%sipdist-%u\"\n", stats_outputdir, tick);
428
429        /* Output the results */
430        char outputfile[255];
431        snprintf(outputfile, sizeof(outputfile), "%sipdist-%u.data", stats_outputdir, tick);
432        FILE *tmp = fopen(outputfile, "w");
433        fprintf(tmp, "#time\t\trank\toctet1\t\t\t\toctet2\t\t\t\toctet3\t\t\t\toctet4\n");
434        fprintf(tmp, "#\t\t\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\n");
435        for(i=0;i<256;i++) {
436                fprintf(tmp, "%d\t%d", tick, i+1);
437                for(j=0;j<4;j++) {
438                        /* Get the highest ranking to lowest ranking octets */
439                        fprintf(tmp, "\t%d", peak(&tally->stats->rank_src[j]));
440                        fprintf(tmp, "\t%d", peak_count(&tally->stats->rank_src[j]));
441                        fprintf(tmp, "\t%d", peak(&tally->stats->rank_dst[j]));
442                        fprintf(tmp, "\t%d", peak_count(&tally->stats->rank_dst[j]));
443                        pop(&tally->stats->rank_src[j]);
444                        pop(&tally->stats->rank_dst[j]);
445                }
446                fprintf(tmp, "\n");
447        }
448        fclose(tmp);
449
450        char outputfile_stats[255];
451        snprintf(outputfile_stats, sizeof(outputfile_stats), "%sipdist-%u.stats", stats_outputdir, tick);
452        tmp = fopen(outputfile_stats, "w");
453        /* append stats data to end of file */
454        fprintf(tmp, "#\tmean\tstddev\tvariance\tmode\tskewness\n");
455        for(i=0;i<4;i++) {
456                fprintf(tmp, "src%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%f\n", i+1, tally->stats->mean_src[i], tally->stats->stddev_src[i], tally->stats->variance_src[i], tally->stats->mode_src[i], tally->stats->skewness_src[i]);
457                fprintf(tmp, "dst%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%f\n", i+1, tally->stats->mean_dst[i], tally->stats->stddev_dst[i], tally->stats->variance_dst[i], tally->stats->mode_dst[i], tally->stats->skewness_dst[i]);
458                fprintf(tmp, "\n\n");
459        }
460        fclose(tmp);
461
462        /* Puts data into timeseries files that gnuplot likes */
463        char outputfile2[255];
464        for(k=0;k<2;k++) {
465                for(j=0;j<4;j++) {
466                        /* If k is 0 we are doing src else dst */
467                        if(k) {
468                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-dst-octet%d.data", stats_outputdir, j+1);
469                        } else {
470                                snprintf(outputfile2, sizeof(outputfile2), "%sipdist-timeseries-src-octet%d.data", stats_outputdir, j+1);
471                        }
472                        if(tally->output_count == 0) {
473                                tmp = fopen(outputfile2, "w");
474                                fprintf(tmp, "prefix\t");
475                                for(i=0;i<256;i++) {
476                                        fprintf(tmp, "%d\t", i);
477                                }
478                                fprintf(tmp, "\n");
479                        } else {
480                                tmp = fopen(outputfile2, "a");
481                        }
482                        fprintf(tmp, "%d\t", tick);
483                        for(i=0;i<256;i++) {
484                                if(k) {
485                                        fprintf(tmp, "%d\t", tally->dst[j][i]);
486                                } else {
487                                        fprintf(tmp, "%d\t", tally->src[j][i]);
488                                }
489                        }
490                        fprintf(tmp, "\n");
491                        fclose(tmp);
492                }
493        }
494
495        /* Plot the results */
496        for(i=0;i<4;i++) {
497                char outputplot[255];
498                snprintf(outputplot, sizeof(outputplot), "%sipdist-%u-octet%d.png", stats_outputdir, tick, i+1);
499                /* Open pipe to gnuplot */
500                FILE *gnuplot = popen("gnuplot -persistent", "w");
501                /* send all commands to gnuplot */
502                fprintf(gnuplot, "set term pngcairo enhanced size 1280,960\n");
503                fprintf(gnuplot, "set output '%s'\n", outputplot);
504                fprintf(gnuplot, "set multiplot layout 2,1\n");
505                fprintf(gnuplot, "set title 'IP Distribution'\n");
506                fprintf(gnuplot, "set xrange[0:255]\n");
507                fprintf(gnuplot, "set xlabel 'Prefix'\n");
508                fprintf(gnuplot, "set ylabel 'Hits'\n");
509                fprintf(gnuplot, "set xtics 0,10,255\n");
510                /* Setup labels that hold mean, standard deviation and variance */
511                fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 2 name 'SOURCEMEAN' nooutput\n", outputfile_stats, i);
512                //fprintf(gnuplot, "set label 1 gprintf(\"Source Mean %u\", SOURCEMEAN_min) at graph 0.02, 0.95\n");
513                fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 2 name 'DESTMEAN' nooutput\n", outputfile_stats, i);
514                //fprintf(gnuplot, "set label 2 gprintf(\"Destination Mean: %f\", DESTMEAN_min) at graph 0.02, 0.90\n");
515                //fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 3 name 'SOURCESTDDEV' nooutput\n", outputfile_stats, i);
516                //fprintf(gnuplot, "set label 3 sprintf('Source Standard Deviation: %f', SOURCESTDDEV_min) at graph 0.24, 0.95\n");
517                //fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 3 name 'DESTSTDDEV' nooutput\n", outputfile_stats, i);
518                //fprintf(gnuplot, "set label 4 sprintf('Destination Standard Deviation: %f', DESTSTDDEV_min) at graph 0.24, 0.90\n");
519                //fprintf(gnuplot, "stats '%s' index %d every ::0::0 using 4 name 'SOURCEVAR' nooutput\n", outputfile_stats, i);
520                //fprintf(gnuplot, "set label 5 sprintf('Source Variance: %f', SOURCEVAR_min) at graph 0.55, 0.95\n");
521                //fprintf(gnuplot, "stats '%s' index %d every ::1::1 using 4 name 'DESTVAR' nooutput\n", outputfile_stats, i);
522                //fprintf(gnuplot, "set label 6 sprintf('Destination Variance: %f', DESTVAR_min) at graph 0.55, 0.90\n");
523                /* Plot the first graph of the multiplot */
524                fprintf(gnuplot, "set arrow from SOURCEMEAN_min, graph 0 to SOURCEMEAN_min, graph 1 nohead lt 1\n");
525                fprintf(gnuplot, "set arrow from DESTMEAN_min, graph 0 to DESTMEAN_min, graph 1 nohead lt 2\n");
526                fprintf(gnuplot, "plot '%s' using %d:%d index 0 title 'Source octet %d' smooth unique with boxes,", outputfile, (i*4)+3,(i*4)+4, i+1);
527                fprintf(gnuplot, "'%s' using %d:%d index 0 title 'Destination octet %d' smooth unique with boxes,", outputfile, (i*4)+5, (i*4)+6, i+1);
528                fprintf(gnuplot, "1/0 t 'Source mean' lt 1,");
529                fprintf(gnuplot, "1/0 t 'Destination mean' lt 2\n");
530                /* Unset labels for next plot */
531                fprintf(gnuplot, "unset arrow\n");
532                fprintf(gnuplot, "unset label 1\nunset label 2\nunset label 3\nunset label 4\nunset label 5\nunset label 6\n");
533                fprintf(gnuplot, "set title 'Zipf Distribution'\n");
534                fprintf(gnuplot, "set xlabel 'Rank'\n");
535                fprintf(gnuplot, "set ylabel 'Frequency'\n");
536                fprintf(gnuplot, "set xrange[1:255]\n");
537                fprintf(gnuplot, "set logscale xy 10\n");
538                /* Plot the second graph of the multiplot */
539                fprintf(gnuplot, "plot '%s' using 2:%d index 0 title 'Source octet %d',", outputfile, (i*4)+4, i+1);
540                fprintf(gnuplot, "'%s' using 2:%d index 0 title 'Destination octet %d'\n", outputfile, (i*4)+6, i+1);
541                fprintf(gnuplot, "replot");
542                pclose(gnuplot);
543        }
544
545        /* Plot time series */
546        for(k=0;k<2;k++) {
547                for(i=0;i<4;i++) {
548                        char outputplot2[255];
549                        if(k) {
550                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-dst-octet%i.png", stats_outputdir, i+1);
551                        } else {
552                                snprintf(outputplot2, sizeof(outputplot2), "%sipdist-timeseries-src-octet%i.png", stats_outputdir, i+1);
553                        }
554                        FILE *gnuplot = popen("gnuplot -persistent", "w");
555                        fprintf(gnuplot, "set term pngcairo size 1280,960 \n");
556                        fprintf(gnuplot, "set output '%s'\n", outputplot2);
557                        if(k) {
558                                fprintf(gnuplot, "set title 'Timeseries Dst Octet %i'\n", i+1);
559                        } else {
560                                fprintf(gnuplot, "set title 'Timeseries Src Octet %i'\n", i+1);
561                        }
562                        fprintf(gnuplot, "set xtics rotate\n");
563                        fprintf(gnuplot, "set key out vert\n");
564                        fprintf(gnuplot, "set key right\n");
565                        //fprintf(gnuplot, "set xdata time\n");
566                        //fprintf(gnuplot, "set timefmt '%%s'\n");
567                        //fprintf(gnuplot, "set format x '%%m/%%d/%%Y %%H:%%M:%%S'\n");
568                        fprintf(gnuplot, "set autoscale xy\n");
569                        if(k) {
570                                fprintf(gnuplot, "plot '%sipdist-timeseries-dst-octet%d.data' using 2:xtic(1) with lines title columnheader(2), for[i=3:257] '' using i with lines title columnheader(i)\n", stats_outputdir, i+1);
571                        } else {
572                                fprintf(gnuplot, "plot '%sipdist-timeseries-src-octet%d.data' using 2:xtic(1) with lines title columnheader(2), for[i=3:257] '' using i with lines title columnheader(i)\n", stats_outputdir, i+1);
573                        }
574                        fprintf(gnuplot, "replot");
575                        pclose(gnuplot);
576                }
577        }
578}
579
580
581/* Callback when a result is given to the reporter thread */
582static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
583        void *tls, libtrace_result_t *result) {
584
585        struct addr_local *results;
586        struct addr_local *tally;
587        uint64_t key;
588
589        /* We only want to handle results containing our user-defined structure  */
590        if(result->type != RESULT_USER) {
591                return;
592        }
593
594        /* This key is the key that was passed into trace_publish_results
595         * this will contain the erf timestamp for the packet */
596        key = result->key;
597
598        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
599        results = (struct addr_local *)result->value.ptr;
600
601        /* Grab our tally out of thread local storage */
602        tally = (struct addr_local *)tls;
603
604        /* Add all the results to the tally */
605        int i, j;
606        for(i=0;i<4;i++) {
607                for(j=0;j<256;j++) {
608                        tally->src[i][j] += results->src[i][j];
609                        tally->dst[i][j] += results->dst[i][j];
610                }
611        }
612        tally->packets += results->packets;
613
614        /* If the current timestamp is greater than the last printed plus the interval, output a result */
615        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
616
617                /* update last key */
618                tally->lastkey = key;
619
620                /* Plot the result with the key in epoch seconds*/
621                plot_results(tally, key >> 32);
622
623                /* increment the output counter */
624                tally->output_count++;
625
626                /* clear the tally but copy old values over first*/
627                for(i=0;i<4;i++) {
628                        for(j=0;j<256;j++) {
629                                tally->src_lastoutput[i][j] = tally->src[i][j];
630                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
631                                tally->src[i][j] = 0;
632                                tally->dst[i][j] = 0;
633                        }
634                }
635                /* free the priority queue */
636                for(i=0;i<4;i++) {
637                        free(tally->stats->rank_src[i]);
638                        free(tally->stats->rank_dst[i]);
639                }
640
641                tally->packets = 0;
642
643        }
644
645        /* Cleanup the thread results */
646        free(results);
647}
648
649/* Callback when the reporter thread stops (essentially when the program ends) */
650static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
651
652        /* Get the tally from the thread local storage */
653        struct addr_local *tally = (struct addr_local *)tls;
654
655        /* If there is any remaining data in the tally plot it */
656        if(tally->packets > 0) {
657                /* Then plot the results */
658                plot_results(tally, (tally->lastkey >> 32) + 1);
659        }
660        /* Cleanup tally results*/
661        free(tally);
662}
663
664static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
665        libtrace_callback_set_t *reporting) {
666        /* Only destroy if the structure exists */
667        if(trace) {
668                trace_destroy(trace);
669        }
670        if(processing) {
671                trace_destroy_callback_set(processing);
672        }
673        if(reporting) {
674                trace_destroy_callback_set(reporting);
675        }
676}
677
678/* Converts a string representation eg 1.2.3.4/24 into a network structure */
679static int get_network(char *network_string, struct network *network) {
680
681        char delim[] = "/";
682        /* Split the address and mask portion of the string */
683        char *address = strtok(network_string, delim);
684        char *mask = strtok(NULL, delim);
685
686        /* Check the subnet mask is valid */
687        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
688                return 1;
689        }
690        /* right shift so netmask is in network byte order */
691        network->mask = 0xffffffff << (32 - atoi(mask));
692
693        struct in_addr addr;
694        /* Convert address string into uint32_t and check its valid */
695        if(inet_aton(address, &addr) == 0) {
696                return 2;
697        }
698        /* Ensure its saved in network byte order */
699        network->address = htonl(addr.s_addr);
700
701        /* Calculate the network address */
702        network->network = network->address & network->mask;
703
704        return 0;
705}
706
707int main(int argc, char *argv[]) {
708
709        libtrace_t *trace;
710        /* Callbacks for processing and reporting threads */
711        libtrace_callback_set_t *processing, *reporter;
712
713
714        /* Ensure the input URI was supplied */
715        if(argc < 3) {
716                fprintf(stderr, "Usage: %s inputURI [outputInterval (Seconds)] [excluded networks]\n", argv[0]);
717                fprintf(stderr, "       eg. ./ipdist input.erf 60 210.10.3.0/24 70.5.0.0/16\n");
718                return 1;
719        }
720        /* Convert tick into an int */
721        tickrate = atoi(argv[2]);
722
723
724        /* Create the trace */
725        trace = trace_create(argv[1]);
726        /* Ensure no error has occured creating the trace */
727        if(trace_is_err(trace)) {
728                trace_perror(trace, "Creating trace");
729                return 1;
730        }
731
732        /* Setup the processing threads */
733        processing = trace_create_callback_set();
734        trace_set_starting_cb(processing, start_callback);
735        trace_set_packet_cb(processing, per_packet);
736        trace_set_stopping_cb(processing, stop_processing);
737        trace_set_tick_interval_cb(processing, per_tick);
738        /* Setup the reporter threads */
739        reporter = trace_create_callback_set();
740        trace_set_starting_cb(reporter, start_reporter);
741        trace_set_result_cb(reporter, per_result);
742        trace_set_stopping_cb(reporter, stop_reporter);
743
744        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
745        trace_set_perpkt_threads(trace, 4);
746        /* Order the results by timestamp */
747        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
748        /* Try to balance the load across all processing threads */
749        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
750
751        /* Set the tick interval only if this is a live capture */
752        if(trace_get_information(trace)->live) {
753                /* tickrate is in seconds but tick_interval expects milliseconds */
754                trace_set_tick_interval(trace, tickrate*1000);
755        }
756        /* Do not buffer the reports */
757        trace_set_reporter_thold(trace, 1);
758
759
760        /* Setup excluded networks if any were supplied */
761        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
762        exclude->networks = malloc(sizeof(struct network)*(argc-3));
763        if(exclude == NULL || exclude->networks == NULL) {
764                fprintf(stderr, "Unable to allocate memory");
765                libtrace_cleanup(trace, processing, reporter);
766                return 1;
767        }
768        exclude->count = 0;
769        int i;
770        for(i=0;i<argc-3;i++) {
771                /* convert the network string into a network structure */
772                if(get_network(argv[i+3], &exclude->networks[i]) != 0) {
773                        fprintf(stderr, "Error creating excluded network");
774                        return 1;
775                }
776                /* increment the count of excluded networks */
777                exclude->count += 1;
778        }
779
780
781        /* Start the trace, if it errors return */
782        if(trace_pstart(trace, exclude, processing, reporter)) {
783                trace_perror(trace, "Starting parallel trace");
784                libtrace_cleanup(trace, processing, reporter);
785                return 1;
786        }
787
788        /* This will wait for all threads to complete */
789        trace_join(trace);
790
791        /* Clean up everything */
792        free(exclude->networks);
793        free(exclude);
794        libtrace_cleanup(trace, processing, reporter);
795
796        return 0;
797}
Note: See TracBrowser for help on using the repository browser.