source: examples/tutorial/ipdist-parallel.c @ 93f4c64

develop
Last change on this file since 93f4c64 was 93f4c64, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Added parallel version of ipdist

  • Property mode set to 100644
File size: 12.7 KB
Line 
1/* Program reads a trace file and counts the first octet of the source and destination
2 * address and plots them on a graph using gnuplot.
3 */
4#include "libtrace_parallel.h"
5#include <stdio.h>
6#include <stdlib.h>
7#include <string.h>
8#include <sys/socket.h>
9#include <netinet/in.h>
10#include <arpa/inet.h>
11#include <time.h>
12
13/* Structure to hold the counters each thread has its own one of these */
14struct addr_local {
15        uint64_t src[256];
16        uint64_t dst[256];
17};
18/* Structure to hold the result from a processing thread */
19struct addr_result {
20        uint64_t src[256];
21        uint64_t dst[256];
22};
23/* Structure to hold counters the report has one of these, it combines
24 * the the counters of each threads local storage used by the reporter thread */
25struct addr_tally {
26        uint64_t src[256];
27        uint64_t dst[256];
28        uint64_t lastkey;
29};
30
31/* Structure to hold excluded networks */
32struct exclude_networks {
33        int count;
34        struct network *networks;
35};
36struct network {
37        uint32_t address;
38        uint32_t mask;
39        uint32_t network;
40};
41
42/* Start callback function - This is run for each thread when it starts */
43static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
44
45        /* Create and initialize the local counter struct */
46        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
47        int i;
48        for(i=0;i<256;i++) {
49                local->src[i] = 0;
50                local->dst[i] = 0;
51        }
52
53        /* return the local storage so it is available for all other callbacks for the thread*/
54        return local;
55}
56
57/* Checks if address is part of a excluded subnet. */
58static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
59
60        int i;
61        for(i=0;i<exclude->count;i++) {
62                /* Convert address into a network address */
63                uint32_t net_addr = address & exclude->networks[i].mask;
64
65                /* If this matches the network address from the excluded list we need to exclude this
66                   address. */
67                if(net_addr == exclude->networks[i].network) {
68                        return 1;
69                }
70        }
71
72        /* If we got this far the address should not be excluded */
73        return 0;
74}
75
76static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
77
78        /* Checks if the ip is of type IPv4 */
79        if (ip->sa_family == AF_INET) {
80
81                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
82                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
83                /* Get in_addr from sockaddr */
84                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
85                /* Ensure the address is in network byte order */
86                uint32_t address = htonl(ip4.s_addr);
87
88                /* Check if the address is part of an excluded network. */
89                if(network_excluded(address, exclude) == 0) {
90
91                        /* Split the IPv4 address into each octet */
92                        uint8_t octet[4];
93                        octet[0] = (address & 0xff000000) >> 24;
94                        octet[1] = (address & 0x00ff0000) >> 16;
95                        octet[2] = (address & 0x0000ff00) >> 8;
96                        octet[3] = (address & 0x000000ff);
97
98                        /* check if the supplied address was a source or destination,
99                           increment the correct one */
100                        if(srcaddr) {
101                                local->src[octet[0]]++;
102                        } else {
103                                local->dst[octet[0]]++;
104                        }
105                }
106        }
107}
108
109/* Per packet callback function run by each thread */
110static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
111        libtrace_packet_t *packet) {
112
113        /* Regain access to the address counter structure */
114        struct addr_local *local = (struct addr_local *)tls;
115
116        /* Regain access to excluded networks pointer */
117        struct exclude_networks *exclude = (struct exclude_networks *)global;
118
119        struct sockaddr_storage addr;
120        struct sockaddr *ip;
121
122        /* Get the source IP address */
123        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
124        /* If a source ip address was found */
125        if(ip != NULL) {
126                process_ip(ip, local, exclude, 1);
127        }
128
129        /* Get the destination IP address */
130        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
131        /* If a destination ip address was found */
132        if(ip != NULL) {
133                process_ip(ip, local, exclude, 0);
134        }
135
136        /* Return the packet to libtrace */
137        return packet;
138}
139
140/* Stopping callback function */
141static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
142
143        /* create a structure to hold the result from the processing thread */
144        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
145        /* cast the local storage structure */
146        struct addr_local *local = (struct addr_local *)tls;
147
148        /* Populate the result structure */
149        int i;
150        for(i=0;i<256;i++) {
151                result->src[i] = local->src[i];
152                result->dst[i] = local->dst[i];
153        }
154
155        /* Publish the result to the reporter thread */
156        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
157
158        /* Cleanup the local storage */
159        free(local);
160}
161
162
163
164
165/* Starting callback for reporter thread */
166static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
167        /* Create tally structure */
168        struct addr_tally *tally = (struct addr_tally *)malloc(sizeof(struct addr_tally));
169
170        /* Initialize the tally structure */
171        int i;
172        for(i=0;i<256;i++) {
173                tally->src[i] = 0;
174                tally->dst[i] = 0;
175        }
176        tally->lastkey = 0;
177
178        return tally;
179}
180
181static void plot_results(struct addr_tally *tally) {
182
183        /* Get the current time */
184        time_t current_time = time(NULL);
185        char* time_string = ctime(&current_time);
186
187        /* Push all the data into a tmp file for gnuplot */
188        char outputfile[255];
189        char outputplot[255];
190        snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", current_time);
191        snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", current_time);
192        FILE *tmp = fopen(outputfile, "w");
193        int i;
194        for(i=0;i<255;i++) {
195                fprintf(tmp, "%d %d %d\n", i, tally->src[i], tally->dst[i]);
196        }
197        fclose(tmp);
198
199        /* Open pipe to gnuplot */
200        FILE *gnuplot = popen("gnuplot -persistent", "w");
201        /* send all commands to gnuplot */
202        fprintf(gnuplot, "set term png size 1280,960 \n");
203        fprintf(gnuplot, "set title 'IP Distribution'\n");
204        fprintf(gnuplot, "set xrange[0:255]\n");
205        fprintf(gnuplot, "set xlabel 'Prefix'\n");
206        fprintf(gnuplot, "set xlabel 'Hits'\n");
207        fprintf(gnuplot, "set xtics 0,10,255\n");
208        fprintf(gnuplot, "set output '%s'\n", outputplot);
209        fprintf(gnuplot, "plot '%s' using 1:2 title 'Source address' with boxes, '%s' using 1:3 title 'Destination address' with boxes\n", outputfile, outputfile);
210        pclose(gnuplot);
211}
212
213
214/* Callback when a result is given to the reporter thread */
215static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
216        void *tls, libtrace_result_t *result) {
217
218        struct addr_result *results;
219        uint64_t key;
220        struct addr_tally *tally;
221
222        /* We only want to handle results containing our user-defined structure  */
223        if(result->type != RESULT_USER) {
224                return;
225        }
226
227        /* This key is the key that was passed into trace_publish_results */
228        key = result->key;
229
230        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
231        results = (struct addr_result *)result->value.ptr;
232
233        /* Grab our tally out of thread local storage */
234        tally = (struct addr_tally *)tls;
235        /* increment tally with the new results */
236        int i;
237        for(i=0;i<256;i++) {
238                tally->src[i] += results->src[i];
239                tally->dst[i] += results->dst[i];
240                tally->lastkey = key;
241        }
242
243        /* Plot the result */
244        plot_results(tally);
245
246        /* Cleanup the thread results */
247        free(results);
248}
249
250/* Callback when the reporter thread stops (essentially when the program ends) */
251static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
252
253        /* Get the tally from the thread local storage */
254        struct addr_tally *tally = (struct addr_tally *)tls;
255
256        //OUTPUT DATA
257        //plot_results(tally);
258
259        /* Cleanup tally results*/
260        free(tally);
261}
262
263static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
264
265        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
266        struct addr_local *local = (struct addr_local *)tls;
267
268        /* Populate the result structure from the threads local storage and clear threads local storage*/
269        int i;
270        for(i=0;i<256;i++) {
271                /* Populate results */
272                result->src[i] = local->src[i];
273                result->dst[i] = local->dst[i];
274                /* Clear local storage */
275                local->src[i] = 0;
276                local->dst[i] = 0;
277        }
278
279        printf("tick: %u\n", tick);
280
281        /* Push the result to the combiner */
282        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
283}
284
285int main(int argc, char *argv[]) {
286
287        libtrace_t *trace;
288        /* Callbacks for processing and reporting threads */
289        libtrace_callback_set_t *processing, *reporter;
290
291        /* Ensure the input URI was supplied */
292        if(argc < 3) {
293                fprintf(stderr, "Usage: %s inputURI outputInterval [excluded networks]\n", argv[0]);
294                fprintf(stderr, "       eg. ./ipdist input.erf 10000 210.10.3.0/24 70.5.0.0/16\n");
295                return 1;
296        }
297        /* Convert tick into correct format */
298        uint64_t tickrate = atoi(argv[2]);
299
300        /* Create the trace */
301        trace = trace_create(argv[1]);
302        /* Ensure no error has occured creating the trace */
303        if(trace_is_err(trace)) {
304                trace_perror(trace, "Creating trace");
305                return 1;
306        }
307
308        /* Setup the processing threads */
309        processing = trace_create_callback_set();
310        trace_set_starting_cb(processing, start_callback);
311        trace_set_packet_cb(processing, per_packet);
312        trace_set_stopping_cb(processing, stop_processing);
313        trace_set_tick_interval_cb(processing, per_tick);
314        /* Setup the reporter threads */
315        reporter = trace_create_callback_set();
316        trace_set_starting_cb(reporter, start_reporter);
317        trace_set_result_cb(reporter, per_result);
318        trace_set_stopping_cb(reporter, stop_reporter);
319
320        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
321        trace_set_perpkt_threads(trace, 4);
322        /* Order the results by timestamp */
323        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
324        /* Try to balance the load across all processing threads */
325        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
326        /* Set the tick interval */
327        trace_set_tick_interval(trace, tickrate);
328
329
330
331        /* Setup excluded networks if any were supplied */
332        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
333        exclude->networks = malloc(sizeof(struct network)*(argc-3));
334        if(exclude == NULL || exclude->networks == NULL) {
335                fprintf(stderr, "Unable to allocate memory");
336                return 1;
337        }
338        exclude->count = 0;
339
340        char delim[] = "/";
341        int i;
342        for(i=0;i<argc-3;i++) {
343                char *address = strtok(argv[i+3], delim);
344                char *mask = strtok(NULL, delim);
345
346                /* Check the subnet mask is valid */
347                if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
348                        fprintf(stderr, "Invalid subnet mask: %s\n", mask);
349                        return 1;
350                }
351                /* right shift so netmask is in network byte order */
352                exclude->networks[i].mask = 0xffffffff << (32 - atoi(mask));
353
354                struct in_addr addr;
355                /* Convert address string into uint32_t and check its valid */
356                if(inet_aton(address, &addr) == 0) {
357                        fprintf(stderr, "Invalid exclude address: %s\n", address);
358                        return 1;
359                }
360                /* Ensure its saved in network byte order */
361                exclude->networks[i].address = htonl(addr.s_addr);
362
363                /* Calculate the network address */
364                exclude->networks[i].network = exclude->networks[i].address & exclude->networks[i].mask;
365
366                /* Increment counter of excluded networks */
367                exclude->count += 1;
368        }
369
370        /* Start the trace, if it errors return */
371        if(trace_pstart(trace, exclude, processing, reporter)) {
372                trace_perror(trace, "Starting parallel trace");
373                return 1;
374        }
375
376        /* This will wait for all threads to complete */
377        trace_join(trace);
378
379        /* Clean up everything */
380        free(exclude->networks);
381        free(exclude);
382        trace_destroy(trace);
383        trace_destroy_callback_set(processing);
384        trace_destroy_callback_set(reporter);
385
386        return 0;
387}
Note: See TracBrowser for help on using the repository browser.