source: examples/tutorial/ipdist-parallel.c @ 2f07199

develop
Last change on this file since 2f07199 was 2f07199, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Cleanup

  • Property mode set to 100644
File size: 14.9 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9
10/* Structure to hold the counters each thread has its own one of these */
11struct addr_local {
12        uint64_t src[256];
13        uint64_t dst[256];
14        uint64_t lastkey;
15        uint64_t packets;
16};
17/* Structure to hold the result from a processing thread */
18struct addr_result {
19        uint64_t src[256];
20        uint64_t dst[256];
21        uint64_t packets;
22};
23/* Structure to hold counters the report has one of these, it combines
24 * the the counters of each threads local storage used by the reporter thread */
25struct addr_tally {
26        uint64_t src[256];
27        uint64_t dst[256];
28        uint64_t lastkey;
29        uint64_t packets;
30};
31
32/* Structure to hold excluded networks */
33struct exclude_networks {
34        int count;
35        struct network *networks;
36};
37struct network {
38        uint32_t address;
39        uint32_t mask;
40        uint32_t network;
41};
42
43/* interval between outputs in seconds */
44uint64_t tickrate;
45/* The octet which to plot results for */
46int octet = 0;
47
48static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
49
50        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
51        /* Proccessing thread local storage */
52        struct addr_local *local = (struct addr_local *)tls;
53
54        /* Populate the result structure from the threads local storage and clear threads local storage*/
55        int i;
56        for(i=0;i<256;i++) {
57                /* Populate results */
58                result->src[i] = local->src[i];
59                result->dst[i] = local->dst[i];
60                /* Clear local storage */
61                local->src[i] = 0;
62                local->dst[i] = 0;
63        }
64        result->packets = local->packets;
65        local->packets = 0;
66
67        /* Push result to the combiner */
68        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
69}
70
71/* Start callback function - This is run for each thread when it starts */
72static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
73
74        /* Create and initialize the local counter struct */
75        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
76        int i;
77        for(i=0;i<256;i++) {
78                local->src[i] = 0;
79                local->dst[i] = 0;
80        }
81        local->lastkey = 0;
82        local->packets = 0;
83
84        /* return the local storage so it is available for all other callbacks for the thread*/
85        return local;
86}
87
88/* Checks if address is part of a excluded subnet. */
89static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
90
91        int i;
92        for(i=0;i<exclude->count;i++) {
93                /* Convert address into a network address */
94                uint32_t net_addr = address & exclude->networks[i].mask;
95
96                /* If this matches the network address from the excluded list we need to exclude this
97                   address. */
98                if(net_addr == exclude->networks[i].network) {
99                        return 1;
100                }
101        }
102
103        /* If we got this far the address should not be excluded */
104        return 0;
105}
106
107static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
108
109        /* Checks if the ip is of type IPv4 */
110        if (ip->sa_family == AF_INET) {
111
112                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
113                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
114                /* Get in_addr from sockaddr */
115                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
116                /* Ensure the address is in network byte order */
117                uint32_t address = htonl(ip4.s_addr);
118
119                /* Check if the address is part of an excluded network. */
120                if(network_excluded(address, exclude) == 0) {
121
122                        /* Split the IPv4 address into each octet */
123                        uint8_t octets[4];
124                        octets[0] = (address & 0xff000000) >> 24;
125                        octets[1] = (address & 0x00ff0000) >> 16;
126                        octets[2] = (address & 0x0000ff00) >> 8;
127                        octets[3] = (address & 0x000000ff);
128
129                        /* check if the supplied address was a source or destination,
130                           increment the correct one */
131                        if(srcaddr) {
132                                local->src[octets[octet]]++;
133                        } else {
134                                local->dst[octets[octet]]++;
135                        }
136                }
137        }
138}
139
140/* Per packet callback function run by each thread */
141static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
142        libtrace_packet_t *packet) {
143
144        /* Regain access to the address counter structure */
145        struct addr_local *local = (struct addr_local *)tls;
146
147        /* If this is the first packet set the lastkey to the packets timestamp */
148        if(local->lastkey == 0) {
149                local->lastkey = trace_get_erf_timestamp(packet);
150        }
151
152        /* Increment the packet count */
153        local->packets += 1;
154
155        /* Regain access to excluded networks pointer */
156        struct exclude_networks *exclude = (struct exclude_networks *)global;
157
158        struct sockaddr_storage addr;
159        struct sockaddr *ip;
160
161        /* Get the source IP address */
162        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
163        /* If a source ip address was found */
164        if(ip != NULL) {
165                process_ip(ip, local, exclude, 1);
166        }
167
168        /* Get the destination IP address */
169        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
170        /* If a destination ip address was found */
171        if(ip != NULL) {
172                process_ip(ip, local, exclude, 0);
173        }
174
175        /* If this trace is not live we will manually call "per tick" */
176        if(!trace_get_information(trace)->live) {
177                /* get the current packets timestamp */
178                uint64_t timestamp = trace_get_erf_timestamp(packet);
179
180                /* We only want to call per_tick if we are due to output something
181                 * Right shifting these converts them to seconds, tickrate is in seconds */
182                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
183                        per_tick(trace, thread,global, local, timestamp);
184                        local->lastkey = timestamp;
185                }
186        }
187
188        /* Return the packet to libtrace */
189        return packet;
190}
191
192/* Stopping callback function - When a thread closes */
193static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
194
195        /* cast the local storage structure */
196        struct addr_local *local = (struct addr_local *)tls;
197        /* Create structure to store the result */
198        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
199
200        /* Populate the result */
201        int i;
202        for(i=0;i<256;i++) {
203                result->src[i] = local->src[i];
204                result->dst[i] = local->dst[i];
205        }
206        result->packets = local->packets;
207
208        /* Send the final results to the combiner */
209        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
210
211        /* Cleanup the local storage */
212        free(local);
213}
214
215/* Starting callback for reporter thread */
216static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
217        /* Create tally structure */
218        struct addr_tally *tally = (struct addr_tally *)malloc(sizeof(struct addr_tally));
219
220        /* Initialize the tally structure */
221        int i;
222        for(i=0;i<256;i++) {
223                tally->src[i] = 0;
224                tally->dst[i] = 0;
225        }
226        tally->lastkey = 0;
227        tally->packets = 0;
228
229        return tally;
230}
231
232static void plot_results(struct addr_tally *tally, uint64_t tick) {
233
234        /* Get the current time */
235        time_t current_time = time(NULL);
236
237        char outputfile[255];
238        char outputplot[255];
239        snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", tick);
240        snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", tick);
241
242        /* Push all data into data file */
243        FILE *tmp = fopen(outputfile, "w");
244        int i;
245        for(i=0;i<255;i++) {
246                fprintf(tmp, "%d %d %d\n", i, tally->src[i], tally->dst[i]);
247        }
248        fclose(tmp);
249        printf("wrote out to file %s\n", outputfile);
250
251        /* Open pipe to gnuplot */
252        FILE *gnuplot = popen("gnuplot -persistent", "w");
253        /* send all commands to gnuplot */
254        fprintf(gnuplot, "set term png size 1280,960 \n");
255        fprintf(gnuplot, "set title 'IP Distribution'\n");
256        fprintf(gnuplot, "set xrange[0:255]\n");
257        fprintf(gnuplot, "set xlabel 'Prefix'\n");
258        fprintf(gnuplot, "set ylabel 'Hits'\n");
259        fprintf(gnuplot, "set xtics 0,10,255\n");
260        fprintf(gnuplot, "set output '%s'\n", outputplot);
261        fprintf(gnuplot, "plot '%s' using 1:2 title 'Source address' with boxes, '%s' using 1:3 title 'Destination address' with boxes\n", outputfile, outputfile);
262        fprintf(gnuplot, "replot");
263        pclose(gnuplot);
264}
265
266
267/* Callback when a result is given to the reporter thread */
268static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
269        void *tls, libtrace_result_t *result) {
270
271        struct addr_result *results;
272        struct addr_tally *tally;
273        uint64_t key;
274
275        /* We only want to handle results containing our user-defined structure  */
276        if(result->type != RESULT_USER) {
277                return;
278        }
279
280        /* This key is the key that was passed into trace_publish_results
281         * this will contain the erf timestamp for the packet */
282        key = result->key;
283
284        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
285        results = (struct addr_result *)result->value.ptr;
286
287        /* Grab our tally out of thread local storage */
288        tally = (struct addr_tally *)tls;
289
290        /* Add all the results to the tally */
291        int i;
292        for(i=0;i<256;i++) {
293                tally->src[i] += results->src[i];
294                tally->dst[i] += results->dst[i];
295        }
296        tally->packets += results->packets;
297
298        /* If the current timestamp is greater than the last printed plus the interval, output a result */
299        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
300
301                /* update last key */
302                tally->lastkey = key;
303
304                /* Plot the result with the key in epoch seconds*/
305                plot_results(tally, key >> 32);
306
307                /* clear the tally */
308                for(i=0;i<256;i++) {
309                        tally->src[i] = 0;
310                        tally->dst[i] = 0;
311                }
312                tally->packets = 0;
313        }
314
315        /* Cleanup the thread results */
316        free(results);
317}
318
319/* Callback when the reporter thread stops (essentially when the program ends) */
320static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
321
322        /* Get the tally from the thread local storage */
323        struct addr_tally *tally = (struct addr_tally *)tls;
324
325        /* If there is any remaining data in the tally plot it */
326        if(tally->packets > 0) {
327                plot_results(tally, (tally->lastkey >> 32) + 1);
328        }
329        /* Cleanup tally results*/
330        free(tally);
331}
332
333static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
334        libtrace_callback_set_t *reporting) {
335        /* Only destroy if the structure exists */
336        if(trace) {
337                trace_destroy(trace);
338        }
339        if(processing) {
340                trace_destroy_callback_set(processing);
341        }
342        if(reporting) {
343                trace_destroy_callback_set(reporting);
344        }
345}
346
347/* Converts a string representation eg 1.2.3.4/24 into a network structure */
348static int get_network(char *network_string, struct network *network) {
349
350        char delim[] = "/";
351        /* Split the address and mask portion of the string */
352        char *address = strtok(network_string, delim);
353        char *mask = strtok(NULL, delim);
354
355        /* Check the subnet mask is valid */
356        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
357                return 1;
358        }
359        /* right shift so netmask is in network byte order */
360        network->mask = 0xffffffff << (32 - atoi(mask));
361
362        struct in_addr addr;
363        /* Convert address string into uint32_t and check its valid */
364        if(inet_aton(address, &addr) == 0) {
365                return 2;
366        }
367        /* Ensure its saved in network byte order */
368        network->address = htonl(addr.s_addr);
369
370        /* Calculate the network address */
371        network->network = network->address & network->mask;
372
373        return 0;
374}
375
376int main(int argc, char *argv[]) {
377
378        libtrace_t *trace;
379        /* Callbacks for processing and reporting threads */
380        libtrace_callback_set_t *processing, *reporter;
381
382        /* Ensure the input URI was supplied */
383        if(argc < 3) {
384                fprintf(stderr, "Usage: %s inputURI [outputInterval (Seconds)] [excluded networks]\n", argv[0]);
385                fprintf(stderr, "       eg. ./ipdist input.erf 60 210.10.3.0/24 70.5.0.0/16\n");
386                return 1;
387        }
388        /* Convert tick into an int */
389        tickrate = atoi(argv[2]);
390
391        /* Create the trace */
392        trace = trace_create(argv[1]);
393        /* Ensure no error has occured creating the trace */
394        if(trace_is_err(trace)) {
395                trace_perror(trace, "Creating trace");
396                return 1;
397        }
398
399        /* Setup the processing threads */
400        processing = trace_create_callback_set();
401        trace_set_starting_cb(processing, start_callback);
402        trace_set_packet_cb(processing, per_packet);
403        trace_set_stopping_cb(processing, stop_processing);
404        trace_set_tick_interval_cb(processing, per_tick);
405        /* Setup the reporter threads */
406        reporter = trace_create_callback_set();
407        trace_set_starting_cb(reporter, start_reporter);
408        trace_set_result_cb(reporter, per_result);
409        trace_set_stopping_cb(reporter, stop_reporter);
410
411        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
412        trace_set_perpkt_threads(trace, 4);
413        /* Order the results by timestamp */
414        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
415        /* Try to balance the load across all processing threads */
416        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
417
418        /* Set the tick interval only if this is a live capture */
419        if(trace_get_information(trace)->live) {
420                /* tickrate is in seconds but tick_interval expects milliseconds */
421                trace_set_tick_interval(trace, tickrate*1000);
422        }
423        /* Do not buffer the reports */
424        trace_set_reporter_thold(trace, 1);
425
426
427        /* Setup excluded networks if any were supplied */
428        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
429        exclude->networks = malloc(sizeof(struct network)*(argc-3));
430        if(exclude == NULL || exclude->networks == NULL) {
431                fprintf(stderr, "Unable to allocate memory");
432                libtrace_cleanup(trace, processing, reporter);
433                return 1;
434        }
435        exclude->count = 0;
436
437        int i;
438        for(i=0;i<argc-3;i++) {
439                /* convert the network string into a network structure */
440                get_network(argv[i+3], &exclude->networks[i]);
441                /* increment the count of excluded networks */
442                exclude->count += 1;
443        }
444
445        /* Start the trace, if it errors return */
446        if(trace_pstart(trace, exclude, processing, reporter)) {
447                trace_perror(trace, "Starting parallel trace");
448                libtrace_cleanup(trace, processing, reporter);
449                return 1;
450        }
451
452        /* This will wait for all threads to complete */
453        trace_join(trace);
454
455        /* Clean up everything */
456        free(exclude->networks);
457        free(exclude);
458        libtrace_cleanup(trace, processing, reporter);
459
460        return 0;
461}
Note: See TracBrowser for help on using the repository browser.