source: examples/tutorial/ipdist-parallel.c @ acffeb8

develop
Last change on this file since acffeb8 was acffeb8, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Output packet counts for all octets

  • Property mode set to 100644
File size: 15.1 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9
10/* Structure to hold the counters each thread has its own one of these */
11struct addr_local {
12        uint64_t src[4][256];
13        uint64_t dst[4][256];
14        uint64_t lastkey;
15        uint64_t packets;
16};
17
18/* Structure to hold excluded networks */
19struct exclude_networks {
20        int count;
21        struct network *networks;
22};
23struct network {
24        uint32_t address;
25        uint32_t mask;
26        uint32_t network;
27};
28
29/* interval between outputs in seconds */
30uint64_t tickrate;
31
32static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
33
34        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
35        /* Proccessing thread local storage */
36        struct addr_local *local = (struct addr_local *)tls;
37
38        /* Populate the result structure from the threads local storage and clear threads local storage*/
39        int i, j;
40        for(i=0;i<4;i++) {
41                for(j=0;j<256;j++) {
42                        result->src[i][j] = local->src[i][j];
43                        result->dst[i][j] = local->dst[i][j];
44                        /* clear local storage */
45                        local->src[i][j] = 0;
46                        local->dst[i][j] = 0;
47                }
48        }
49        result->packets = local->packets;
50        local->packets = 0;
51
52        /* Push result to the combiner */
53        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
54}
55
56/* Start callback function - This is run for each thread when it starts */
57static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
58
59        /* Create and initialize the local counter struct */
60        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
61        int i, j;
62        for(i=0;i<4;i++) {
63                for(j=0;j<256;j++) {
64                        local->src[i][j] = 0;
65                        local->dst[i][j] = 0;
66                }
67        }
68        local->lastkey = 0;
69        local->packets = 0;
70
71        /* return the local storage so it is available for all other callbacks for the thread*/
72        return local;
73}
74
75/* Checks if address is part of a excluded subnet. */
76static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
77
78        int i;
79        for(i=0;i<exclude->count;i++) {
80                /* Convert address into a network address */
81                uint32_t net_addr = address & exclude->networks[i].mask;
82
83                /* If this matches the network address from the excluded list we need to exclude this
84                   address. */
85                if(net_addr == exclude->networks[i].network) {
86                        return 1;
87                }
88        }
89
90        /* If we got this far the address should not be excluded */
91        return 0;
92}
93
94static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
95
96        /* Checks if the ip is of type IPv4 */
97        if (ip->sa_family == AF_INET) {
98
99                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
100                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
101                /* Get in_addr from sockaddr */
102                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
103                /* Ensure the address is in network byte order */
104                uint32_t address = htonl(ip4.s_addr);
105
106                /* Check if the address is part of an excluded network. */
107                if(network_excluded(address, exclude) == 0) {
108
109                        /* Split the IPv4 address into each octet */
110                        uint8_t octet[4];
111                        octet[0] = (address & 0xff000000) >> 24;
112                        octet[1] = (address & 0x00ff0000) >> 16;
113                        octet[2] = (address & 0x0000ff00) >> 8;
114                        octet[3] = (address & 0x000000ff);
115
116                        /* check if the supplied address was a source or destination,
117                           increment the correct one */
118                        if(srcaddr) {
119                                int i;
120                                for(i=0;i<4;i++) {
121                                        local->src[i][octet[i]] += 1;
122                                }
123                        } else {
124                                int i;
125                                for(i=0;i<4;i++) {
126                                        local->dst[i][octet[i]] += 1;
127                                }
128                        }
129                }
130        }
131}
132
133/* Per packet callback function run by each thread */
134static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
135        libtrace_packet_t *packet) {
136
137        /* Regain access to the address counter structure */
138        struct addr_local *local = (struct addr_local *)tls;
139
140        /* If this is the first packet set the lastkey to the packets timestamp */
141        if(local->lastkey == 0) {
142                local->lastkey = trace_get_erf_timestamp(packet);
143        }
144
145        /* Increment the packet count */
146        local->packets += 1;
147
148        /* Regain access to excluded networks pointer */
149        struct exclude_networks *exclude = (struct exclude_networks *)global;
150
151        struct sockaddr_storage addr;
152        struct sockaddr *ip;
153
154        /* Get the source IP address */
155        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
156        /* If a source ip address was found */
157        if(ip != NULL) {
158                process_ip(ip, local, exclude, 1);
159        }
160
161        /* Get the destination IP address */
162        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
163        /* If a destination ip address was found */
164        if(ip != NULL) {
165                process_ip(ip, local, exclude, 0);
166        }
167
168        /* If this trace is not live we will manually call "per tick" */
169        if(!trace_get_information(trace)->live) {
170                /* get the current packets timestamp */
171                uint64_t timestamp = trace_get_erf_timestamp(packet);
172
173                /* We only want to call per_tick if we are due to output something
174                 * Right shifting these converts them to seconds, tickrate is in seconds */
175                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
176                        per_tick(trace, thread,global, local, timestamp);
177                        local->lastkey = timestamp;
178                }
179        }
180
181        /* Return the packet to libtrace */
182        return packet;
183}
184
185/* Stopping callback function - When a thread closes */
186static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
187
188        /* cast the local storage structure */
189        struct addr_local *local = (struct addr_local *)tls;
190        /* Create structure to store the result */
191        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
192
193        /* Populate the result */
194        int i, j;
195        for(i=0;i<4;i++) {
196                for(j=0;j<256;j++) {
197                        result->src[i][j] = local->src[i][j];
198                        result->dst[i][j] = local->dst[i][j];
199                }
200        }
201        result->packets = local->packets;
202
203        /* Send the final results to the combiner */
204        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
205
206        /* Cleanup the local storage */
207        free(local);
208}
209
210/* Starting callback for reporter thread */
211static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
212        /* Create tally structure */
213        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
214
215        /* Initialize the tally structure */
216        int i, j;
217        for(i=0;i<4;i++) {
218                for(j=0;j<256;j++) {
219                        tally->src[i][j] = 0;
220                        tally->dst[i][j] = 0;
221                }
222        }
223        tally->lastkey = 0;
224        tally->packets = 0;
225
226        return tally;
227}
228
229static void plot_results(struct addr_local *tally, uint64_t tick) {
230
231        /* Get the current time */
232        time_t current_time = time(NULL);
233
234        char outputfile[255];
235        char outputplot[255];
236        snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", tick);
237        snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", tick);
238
239        /* Push all data into data file */
240        FILE *tmp = fopen(outputfile, "w");
241        int i, j;
242        fprintf(tmp, "#num\toctet1\t\toctet2\t\toctet3\t\toctet4\n");
243        fprintf(tmp, "#\tsrc\tdst\tsrc\tdst\tsrc\tdst\tsrc\tdst\n");
244        for(i=0;i<256;i++) {
245                fprintf(tmp, "%d", i);
246                for(j=0;j<4;j++) {
247                        fprintf(tmp, "\t%d\t%d", tally->src[j][i], tally->dst[j][i]);
248                }
249                fprintf(tmp, "\n");
250        }
251        fclose(tmp);
252        printf("wrote out to file %s\n", outputfile);
253
254        /* Open pipe to gnuplot */
255        FILE *gnuplot = popen("gnuplot -persistent", "w");
256        /* send all commands to gnuplot */
257        fprintf(gnuplot, "set term png size 1280,960 \n");
258        fprintf(gnuplot, "set title 'IP Distribution'\n");
259        fprintf(gnuplot, "set xrange[0:255]\n");
260        fprintf(gnuplot, "set xlabel 'Prefix'\n");
261        fprintf(gnuplot, "set ylabel 'Hits'\n");
262        fprintf(gnuplot, "set xtics 0,10,255\n");
263        //fprintf(gnuplot, "set datafile commentschars '#!%'");
264        fprintf(gnuplot, "set output '%s'\n", outputplot);
265        fprintf(gnuplot, "plot '%s' using 1:2 title 'Source address' with boxes, '%s' using 1:3 title 'Destination address' with boxes\n", outputfile, outputfile);
266        fprintf(gnuplot, "replot");
267        pclose(gnuplot);
268}
269
270
271/* Callback when a result is given to the reporter thread */
272static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
273        void *tls, libtrace_result_t *result) {
274
275        struct addr_local *results;
276        struct addr_local *tally;
277        uint64_t key;
278
279        /* We only want to handle results containing our user-defined structure  */
280        if(result->type != RESULT_USER) {
281                return;
282        }
283
284        /* This key is the key that was passed into trace_publish_results
285         * this will contain the erf timestamp for the packet */
286        key = result->key;
287
288        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
289        results = (struct addr_local *)result->value.ptr;
290
291        /* Grab our tally out of thread local storage */
292        tally = (struct addr_local *)tls;
293
294        /* Add all the results to the tally */
295        int i, j;
296        for(i=0;i<4;i++) {
297                for(j=0;j<256;j++) {
298                        tally->src[i][j] += results->src[i][j];
299                        tally->dst[i][j] += results->dst[i][j];
300                }
301        }
302        tally->packets += results->packets;
303
304        /* If the current timestamp is greater than the last printed plus the interval, output a result */
305        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
306
307                /* update last key */
308                tally->lastkey = key;
309
310                /* Plot the result with the key in epoch seconds*/
311                plot_results(tally, key >> 32);
312
313                /* clear the tally */
314                for(i=0;i<4;i++) {
315                        for(j=0;j<256;j++) {
316                                tally->src[i][j] = 0;
317                                tally->dst[i][j] = 0;
318                        }
319                }
320                tally->packets = 0;
321        }
322
323        /* Cleanup the thread results */
324        free(results);
325}
326
327/* Callback when the reporter thread stops (essentially when the program ends) */
328static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
329
330        /* Get the tally from the thread local storage */
331        struct addr_local *tally = (struct addr_local *)tls;
332
333        /* If there is any remaining data in the tally plot it */
334        if(tally->packets > 0) {
335                plot_results(tally, (tally->lastkey >> 32) + 1);
336        }
337        /* Cleanup tally results*/
338        free(tally);
339}
340
341static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
342        libtrace_callback_set_t *reporting) {
343        /* Only destroy if the structure exists */
344        if(trace) {
345                trace_destroy(trace);
346        }
347        if(processing) {
348                trace_destroy_callback_set(processing);
349        }
350        if(reporting) {
351                trace_destroy_callback_set(reporting);
352        }
353}
354
355/* Converts a string representation eg 1.2.3.4/24 into a network structure */
356static int get_network(char *network_string, struct network *network) {
357
358        char delim[] = "/";
359        /* Split the address and mask portion of the string */
360        char *address = strtok(network_string, delim);
361        char *mask = strtok(NULL, delim);
362
363        /* Check the subnet mask is valid */
364        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
365                return 1;
366        }
367        /* right shift so netmask is in network byte order */
368        network->mask = 0xffffffff << (32 - atoi(mask));
369
370        struct in_addr addr;
371        /* Convert address string into uint32_t and check its valid */
372        if(inet_aton(address, &addr) == 0) {
373                return 2;
374        }
375        /* Ensure its saved in network byte order */
376        network->address = htonl(addr.s_addr);
377
378        /* Calculate the network address */
379        network->network = network->address & network->mask;
380
381        return 0;
382}
383
384int main(int argc, char *argv[]) {
385
386        libtrace_t *trace;
387        /* Callbacks for processing and reporting threads */
388        libtrace_callback_set_t *processing, *reporter;
389
390
391        /* Ensure the input URI was supplied */
392        if(argc < 3) {
393                fprintf(stderr, "Usage: %s inputURI [outputInterval (Seconds)] [excluded networks]\n", argv[0]);
394                fprintf(stderr, "       eg. ./ipdist input.erf 60 210.10.3.0/24 70.5.0.0/16\n");
395                return 1;
396        }
397        /* Convert tick into an int */
398        tickrate = atoi(argv[2]);
399
400
401        /* Create the trace */
402        trace = trace_create(argv[1]);
403        /* Ensure no error has occured creating the trace */
404        if(trace_is_err(trace)) {
405                trace_perror(trace, "Creating trace");
406                return 1;
407        }
408
409        /* Setup the processing threads */
410        processing = trace_create_callback_set();
411        trace_set_starting_cb(processing, start_callback);
412        trace_set_packet_cb(processing, per_packet);
413        trace_set_stopping_cb(processing, stop_processing);
414        trace_set_tick_interval_cb(processing, per_tick);
415        /* Setup the reporter threads */
416        reporter = trace_create_callback_set();
417        trace_set_starting_cb(reporter, start_reporter);
418        trace_set_result_cb(reporter, per_result);
419        trace_set_stopping_cb(reporter, stop_reporter);
420
421        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
422        trace_set_perpkt_threads(trace, 4);
423        /* Order the results by timestamp */
424        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
425        /* Try to balance the load across all processing threads */
426        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
427
428        /* Set the tick interval only if this is a live capture */
429        if(trace_get_information(trace)->live) {
430                /* tickrate is in seconds but tick_interval expects milliseconds */
431                trace_set_tick_interval(trace, tickrate*1000);
432        }
433        /* Do not buffer the reports */
434        trace_set_reporter_thold(trace, 1);
435
436
437        /* Setup excluded networks if any were supplied */
438        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
439        exclude->networks = malloc(sizeof(struct network)*(argc-3));
440        if(exclude == NULL || exclude->networks == NULL) {
441                fprintf(stderr, "Unable to allocate memory");
442                libtrace_cleanup(trace, processing, reporter);
443                return 1;
444        }
445        exclude->count = 0;
446        int i;
447        for(i=0;i<argc-3;i++) {
448                /* convert the network string into a network structure */
449                if(get_network(argv[i+3], &exclude->networks[i]) != 0) {
450                        fprintf(stderr, "Error creating excluded network");
451                        return 1;
452                }
453                /* increment the count of excluded networks */
454                exclude->count += 1;
455        }
456
457
458        /* Start the trace, if it errors return */
459        if(trace_pstart(trace, exclude, processing, reporter)) {
460                trace_perror(trace, "Starting parallel trace");
461                libtrace_cleanup(trace, processing, reporter);
462                return 1;
463        }
464
465        /* This will wait for all threads to complete */
466        trace_join(trace);
467
468        /* Clean up everything */
469        free(exclude->networks);
470        free(exclude);
471        libtrace_cleanup(trace, processing, reporter);
472
473        return 0;
474}
Note: See TracBrowser for help on using the repository browser.