source: examples/tutorial/ipdist-parallel.c @ 4d5e6b3

develop
Last change on this file since 4d5e6b3 was 4d5e6b3, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Adding more usefull metrics

  • Property mode set to 100644
File size: 18.2 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9
10/* Structure to hold the counters each thread has its own one of these */
11struct addr_local {
12        /* Holds the counts of each number occurance per octet, These are cleared after every output. */
13        uint64_t src[4][256];
14        uint64_t dst[4][256];
15        /* Maintains a running average over. Only used within the tally */
16        uint64_t src_lastoutput[4][256];
17        uint64_t dst_lastoutput[4][256];
18        /* Holds the timestamp */
19        uint64_t lastkey;
20        /* Is the count of the number of packets processed, This is cleared after every output. */
21        uint64_t packets;
22        uint64_t output_count;
23};
24
25/* Structure to hold excluded networks */
26struct exclude_networks {
27        int count;
28        struct network *networks;
29};
30struct network {
31        uint32_t address;
32        uint32_t mask;
33        uint32_t network;
34};
35
36/* interval between outputs in seconds */
37uint64_t tickrate;
38
39static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
40
41        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
42        /* Proccessing thread local storage */
43        struct addr_local *local = (struct addr_local *)tls;
44
45        /* Populate the result structure from the threads local storage and clear threads local storage*/
46        int i, j;
47        for(i=0;i<4;i++) {
48                for(j=0;j<256;j++) {
49                        result->src[i][j] = local->src[i][j];
50                        result->dst[i][j] = local->dst[i][j];
51                        /* clear local storage */
52                        local->src[i][j] = 0;
53                        local->dst[i][j] = 0;
54                }
55        }
56        result->packets = local->packets;
57        local->packets = 0;
58
59        /* Push result to the combiner */
60        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
61}
62
63/* Start callback function - This is run for each thread when it starts */
64static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
65
66        /* Create and initialize the local counter struct */
67        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
68        int i, j;
69        for(i=0;i<4;i++) {
70                for(j=0;j<256;j++) {
71                        local->src[i][j] = 0;
72                        local->dst[i][j] = 0;
73                }
74        }
75        local->lastkey = 0;
76        local->packets = 0;
77
78        /* return the local storage so it is available for all other callbacks for the thread*/
79        return local;
80}
81
82/* Checks if address is part of a excluded subnet. */
83static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
84
85        int i;
86        for(i=0;i<exclude->count;i++) {
87                /* Convert address into a network address */
88                uint32_t net_addr = address & exclude->networks[i].mask;
89
90                /* If this matches the network address from the excluded list we need to exclude this
91                   address. */
92                if(net_addr == exclude->networks[i].network) {
93                        return 1;
94                }
95        }
96
97        /* If we got this far the address should not be excluded */
98        return 0;
99}
100
101static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
102
103        /* Checks if the ip is of type IPv4 */
104        if (ip->sa_family == AF_INET) {
105
106                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
107                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
108                /* Get in_addr from sockaddr */
109                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
110                /* Ensure the address is in network byte order */
111                uint32_t address = htonl(ip4.s_addr);
112
113                /* Check if the address is part of an excluded network. */
114                if(network_excluded(address, exclude) == 0) {
115
116                        /* Split the IPv4 address into each octet */
117                        uint8_t octet[4];
118                        octet[0] = (address & 0xff000000) >> 24;
119                        octet[1] = (address & 0x00ff0000) >> 16;
120                        octet[2] = (address & 0x0000ff00) >> 8;
121                        octet[3] = (address & 0x000000ff);
122
123                        /* check if the supplied address was a source or destination,
124                           increment the correct one */
125                        if(srcaddr) {
126                                int i;
127                                for(i=0;i<4;i++) {
128                                        local->src[i][octet[i]] += 1;
129                                }
130                        } else {
131                                int i;
132                                for(i=0;i<4;i++) {
133                                        local->dst[i][octet[i]] += 1;
134                                }
135                        }
136                }
137        }
138}
139
140/* Per packet callback function run by each thread */
141static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
142        libtrace_packet_t *packet) {
143
144        /* Regain access to the address counter structure */
145        struct addr_local *local = (struct addr_local *)tls;
146
147        /* If this is the first packet set the lastkey to the packets timestamp */
148        if(local->lastkey == 0) {
149                local->lastkey = trace_get_erf_timestamp(packet);
150        }
151
152        /* Increment the packet count */
153        local->packets += 1;
154
155        /* Regain access to excluded networks pointer */
156        struct exclude_networks *exclude = (struct exclude_networks *)global;
157
158        struct sockaddr_storage addr;
159        struct sockaddr *ip;
160
161        /* Get the source IP address */
162        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
163        /* If a source ip address was found */
164        if(ip != NULL) {
165                process_ip(ip, local, exclude, 1);
166        }
167
168        /* Get the destination IP address */
169        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
170        /* If a destination ip address was found */
171        if(ip != NULL) {
172                process_ip(ip, local, exclude, 0);
173        }
174
175        /* If this trace is not live we will manually call "per tick" */
176        if(!trace_get_information(trace)->live) {
177                /* get the current packets timestamp */
178                uint64_t timestamp = trace_get_erf_timestamp(packet);
179
180                /* We only want to call per_tick if we are due to output something
181                 * Right shifting these converts them to seconds, tickrate is in seconds */
182                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
183                        per_tick(trace, thread, global, local, timestamp);
184                        local->lastkey = timestamp;
185                }
186        }
187
188        /* Return the packet to libtrace */
189        return packet;
190}
191
192/* Stopping callback function - When a thread closes */
193static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
194
195        /* cast the local storage structure */
196        struct addr_local *local = (struct addr_local *)tls;
197        /* Create structure to store the result */
198        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
199
200        /* Populate the result */
201        int i, j;
202        for(i=0;i<4;i++) {
203                for(j=0;j<256;j++) {
204                        result->src[i][j] = local->src[i][j];
205                        result->dst[i][j] = local->dst[i][j];
206                }
207        }
208        result->packets = local->packets;
209
210        /* Send the final results to the combiner */
211        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
212
213        /* Cleanup the local storage */
214        free(local);
215}
216
217/* Starting callback for reporter thread */
218static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
219        /* Create tally structure */
220        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
221
222        /* Initialize the tally structure */
223        int i, j;
224        for(i=0;i<4;i++) {
225                for(j=0;j<256;j++) {
226                        tally->src[i][j] = 0;
227                        tally->dst[i][j] = 0;
228                        tally->src_lastoutput[i][j] = 0;
229                        tally->dst_lastoutput[i][j] = 0;
230                }
231        }
232        tally->lastkey = 0;
233        tally->packets = 0;
234        tally->output_count = 0;
235
236        return tally;
237}
238
239static void plot_results(struct addr_local *tally, uint64_t tick) {
240
241        char outputfile[255];
242        char outputplot[255];
243        snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", tick);
244        snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", tick);
245
246        /* Push all data into data file */
247        FILE *tmp = fopen(outputfile, "w");
248        int i, j;
249        fprintf(tmp, "#Hits\t\t\t\t\t\t\t\t\tPercentage Change\n");
250        fprintf(tmp, "#num\toctet1\t\toctet2\t\toctet3\t\toctet4\t\toctet1\t\toctet2\t\toctet3\t\toctet4\n");
251        fprintf(tmp, "#\tsrc\tdst\tsrc\tdst\tsrc\tdst\tsrc\tdst\tsrc\tdst\tsrc\tdst\tsrc\tdst\tsrc\tdst\n");
252        for(i=0;i<256;i++) {
253                fprintf(tmp, "%d", i);
254                for(j=0;j<4;j++) {
255                        fprintf(tmp, "\t%d\t%d", tally->src[j][i], tally->dst[j][i]);
256                }
257
258                /* Calculates the percentage change from the last output */
259                for(j=0;j<4;j++) {
260                        float src_inc = 0;
261                        float dst_inc = 0;
262                        if(tally->src[j][i] != 0) {
263                                src_inc = (((float)tally->src[j][i] - (float)tally->src_lastoutput[j][i]) / (float)tally->src[j][i]) * 100;
264                        }
265                        if(tally->dst[j][i] != 0) {
266                                dst_inc = (((float)tally->dst[j][i] - (float)tally->dst_lastoutput[j][i]) / (float)tally->dst[j][i]) * 100;
267                        }
268                        fprintf(tmp, "\t%.0f\t%.0f", src_inc, dst_inc);
269                }
270
271                fprintf(tmp, "\n");
272        }
273        fclose(tmp);
274        printf("Ouput data file %s and plot %s\n", outputfile, outputplot);
275
276        /* Open pipe to gnuplot */
277        FILE *gnuplot = popen("gnuplot -persistent", "w");
278        /* send all commands to gnuplot */
279        fprintf(gnuplot, "set term png size 1280,960 \n");
280        fprintf(gnuplot, "set title 'IP Distribution'\n");
281        fprintf(gnuplot, "set xrange[0:255]\n");
282        fprintf(gnuplot, "set xlabel 'Prefix'\n");
283        fprintf(gnuplot, "set ylabel 'Hits'\n");
284        fprintf(gnuplot, "set y2label 'Percentage Change'\n");
285        fprintf(gnuplot, "set xtics 0,10,255\n");
286        fprintf(gnuplot, "set y2range[-100:100]\n");
287        //fprintf(gnuplot, "set ytics nomirror\n");
288        //fprintf(gnuplot, "set y2tics\n");
289        //fprintf(gnuplot, "set tics out\n");
290        fprintf(gnuplot, "set output '%s'\n", outputplot);
291        fprintf(gnuplot, "plot '%s' using 1:2 title 'Source octet 1' axes x1y1 with boxes,", outputfile);
292        fprintf(gnuplot, "'%s' using 1:3 title 'Destination octet 1' axes x1y1 with boxes,", outputfile);
293        //fprintf(gnuplot, "'%s' using 1:4 title 'Source octet 2' axes x1y1 with boxes,", outputfile);
294        //fprintf(gnuplot, "'%s' using 1:5 title 'Destination octet 2' axes x1y1 with boxes,", outputfile);
295        //fprintf(gnuplot, "'%s' using 1:6 title 'Source octet 3' axes x1y1 with boxes,", outputfile);
296        //fprintf(gnuplot, "'%s' using 1:7 title 'Destination octet 3' axes x1y1 with boxes,", outputfile);
297        //fprintf(gnuplot, "'%s' using 1:8 title 'Source octet 4' axes x1y1 with boxes,", outputfile);
298        //fprintf(gnuplot, "'%s' using 1:9 title 'Destination octet 4' axes x1y1 with boxes,", outputfile);
299        fprintf(gnuplot, "'%s' using 1:10 title 'Octet 1 source change' axes x1y2 with lines,", outputfile);
300        fprintf(gnuplot, "'%s' using 1:11 title 'Octet 1 destination change' axes x1y2 with lines\n", outputfile);
301        //fprintf(gnuplot, "'%s' using 1:12 title 'Octet 2 source change' axes x1y2 with lines,", outputfile);
302        //fprintf(gnuplot, "'%s' using 1:13 title 'Octet 2 destination change' axes x1y2 with lines,", outputfile);
303        //fprintf(gnuplot, "'%s' using 1:14 title 'Octet 3 source change' axes x1y2 with lines,", outputfile);
304        //fprintf(gnuplot, "'%s' using 1:15 title 'Octet 3 destination change' axes x1y2 with lines,", outputfile);
305        //fprintf(gnuplot, "'%s' using 1:16 title 'Octet 4 source change' axes x1y2 with lines,", outputfile);
306        //fprintf(gnuplot, "'%s' using 1:17 title 'Octet 4 destination change' axes x1y2 with lines\n", outputfile);
307        fprintf(gnuplot, "replot");
308        pclose(gnuplot);
309}
310
311
312/* Callback when a result is given to the reporter thread */
313static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
314        void *tls, libtrace_result_t *result) {
315
316        struct addr_local *results;
317        struct addr_local *tally;
318        uint64_t key;
319
320        /* We only want to handle results containing our user-defined structure  */
321        if(result->type != RESULT_USER) {
322                return;
323        }
324
325        /* This key is the key that was passed into trace_publish_results
326         * this will contain the erf timestamp for the packet */
327        key = result->key;
328
329        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
330        results = (struct addr_local *)result->value.ptr;
331
332        /* Grab our tally out of thread local storage */
333        tally = (struct addr_local *)tls;
334
335        /* Add all the results to the tally */
336        int i, j;
337        for(i=0;i<4;i++) {
338                for(j=0;j<256;j++) {
339                        tally->src[i][j] += results->src[i][j];
340                        tally->dst[i][j] += results->dst[i][j];
341                }
342        }
343        tally->packets += results->packets;
344
345        /* If the current timestamp is greater than the last printed plus the interval, output a result */
346        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
347
348                /* update last key */
349                tally->lastkey = key;
350
351                /* Need to initialise lastoutput values on first pass */
352                if(tally->output_count == 0) {
353                        for(i=0;i<4;i++) {
354                                for(j=0;j<256;j++) {
355                                        tally->src_lastoutput[i][j] = tally->src[i][j];
356                                        tally->dst_lastoutput[i][j] = tally->dst[i][j];
357                                }
358                        }
359                }
360
361                /* Plot the result with the key in epoch seconds*/
362                plot_results(tally, key >> 32);
363
364                /* increment the output counter */
365                tally->output_count++;
366
367                /* clear the tally but copy old values over first*/
368                for(i=0;i<4;i++) {
369                        for(j=0;j<256;j++) {
370                                tally->src_lastoutput[i][j] = tally->src[i][j];
371                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
372                                tally->src[i][j] = 0;
373                                tally->dst[i][j] = 0;
374                        }
375                }
376                tally->packets = 0;
377        }
378
379        /* Cleanup the thread results */
380        free(results);
381}
382
383/* Callback when the reporter thread stops (essentially when the program ends) */
384static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
385
386        /* Get the tally from the thread local storage */
387        struct addr_local *tally = (struct addr_local *)tls;
388
389        /* If there is any remaining data in the tally plot it */
390        if(tally->packets > 0) {
391                plot_results(tally, (tally->lastkey >> 32) + 1);
392        }
393        /* Cleanup tally results*/
394        free(tally);
395}
396
397static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
398        libtrace_callback_set_t *reporting) {
399        /* Only destroy if the structure exists */
400        if(trace) {
401                trace_destroy(trace);
402        }
403        if(processing) {
404                trace_destroy_callback_set(processing);
405        }
406        if(reporting) {
407                trace_destroy_callback_set(reporting);
408        }
409}
410
411/* Converts a string representation eg 1.2.3.4/24 into a network structure */
412static int get_network(char *network_string, struct network *network) {
413
414        char delim[] = "/";
415        /* Split the address and mask portion of the string */
416        char *address = strtok(network_string, delim);
417        char *mask = strtok(NULL, delim);
418
419        /* Check the subnet mask is valid */
420        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
421                return 1;
422        }
423        /* right shift so netmask is in network byte order */
424        network->mask = 0xffffffff << (32 - atoi(mask));
425
426        struct in_addr addr;
427        /* Convert address string into uint32_t and check its valid */
428        if(inet_aton(address, &addr) == 0) {
429                return 2;
430        }
431        /* Ensure its saved in network byte order */
432        network->address = htonl(addr.s_addr);
433
434        /* Calculate the network address */
435        network->network = network->address & network->mask;
436
437        return 0;
438}
439
440int main(int argc, char *argv[]) {
441
442        libtrace_t *trace;
443        /* Callbacks for processing and reporting threads */
444        libtrace_callback_set_t *processing, *reporter;
445
446
447        /* Ensure the input URI was supplied */
448        if(argc < 3) {
449                fprintf(stderr, "Usage: %s inputURI [outputInterval (Seconds)] [excluded networks]\n", argv[0]);
450                fprintf(stderr, "       eg. ./ipdist input.erf 60 210.10.3.0/24 70.5.0.0/16\n");
451                return 1;
452        }
453        /* Convert tick into an int */
454        tickrate = atoi(argv[2]);
455
456
457        /* Create the trace */
458        trace = trace_create(argv[1]);
459        /* Ensure no error has occured creating the trace */
460        if(trace_is_err(trace)) {
461                trace_perror(trace, "Creating trace");
462                return 1;
463        }
464
465        /* Setup the processing threads */
466        processing = trace_create_callback_set();
467        trace_set_starting_cb(processing, start_callback);
468        trace_set_packet_cb(processing, per_packet);
469        trace_set_stopping_cb(processing, stop_processing);
470        trace_set_tick_interval_cb(processing, per_tick);
471        /* Setup the reporter threads */
472        reporter = trace_create_callback_set();
473        trace_set_starting_cb(reporter, start_reporter);
474        trace_set_result_cb(reporter, per_result);
475        trace_set_stopping_cb(reporter, stop_reporter);
476
477        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
478        trace_set_perpkt_threads(trace, 4);
479        /* Order the results by timestamp */
480        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
481        /* Try to balance the load across all processing threads */
482        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
483
484        /* Set the tick interval only if this is a live capture */
485        if(trace_get_information(trace)->live) {
486                /* tickrate is in seconds but tick_interval expects milliseconds */
487                trace_set_tick_interval(trace, tickrate*1000);
488        }
489        /* Do not buffer the reports */
490        trace_set_reporter_thold(trace, 1);
491
492
493        /* Setup excluded networks if any were supplied */
494        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
495        exclude->networks = malloc(sizeof(struct network)*(argc-3));
496        if(exclude == NULL || exclude->networks == NULL) {
497                fprintf(stderr, "Unable to allocate memory");
498                libtrace_cleanup(trace, processing, reporter);
499                return 1;
500        }
501        exclude->count = 0;
502        int i;
503        for(i=0;i<argc-3;i++) {
504                /* convert the network string into a network structure */
505                if(get_network(argv[i+3], &exclude->networks[i]) != 0) {
506                        fprintf(stderr, "Error creating excluded network");
507                        return 1;
508                }
509                /* increment the count of excluded networks */
510                exclude->count += 1;
511        }
512
513
514        /* Start the trace, if it errors return */
515        if(trace_pstart(trace, exclude, processing, reporter)) {
516                trace_perror(trace, "Starting parallel trace");
517                libtrace_cleanup(trace, processing, reporter);
518                return 1;
519        }
520
521        /* This will wait for all threads to complete */
522        trace_join(trace);
523
524        /* Clean up everything */
525        free(exclude->networks);
526        free(exclude);
527        libtrace_cleanup(trace, processing, reporter);
528
529        return 0;
530}
Note: See TracBrowser for help on using the repository browser.