source: examples/tutorial/ipdist-parallel.c @ 6984c96

develop
Last change on this file since 6984c96 was 6984c96, checked in by Jacob van Walraven <jacobvw@…>, 2 years ago

Tickrate improvements

  • Property mode set to 100644
File size: 14.1 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9
10/* Structure to hold the counters each thread has its own one of these */
11struct addr_local {
12        uint64_t src[256];
13        uint64_t dst[256];
14        uint64_t lastkey;
15        uint64_t packets;
16};
17/* Structure to hold the result from a processing thread */
18struct addr_result {
19        uint64_t src[256];
20        uint64_t dst[256];
21        uint64_t packets;
22};
23/* Structure to hold counters the report has one of these, it combines
24 * the the counters of each threads local storage used by the reporter thread */
25struct addr_tally {
26        uint64_t src[256];
27        uint64_t dst[256];
28        uint64_t lastkey;
29        uint64_t packets;
30};
31
32/* Structure to hold excluded networks */
33struct exclude_networks {
34        int count;
35        struct network *networks;
36};
37struct network {
38        uint32_t address;
39        uint32_t mask;
40        uint32_t network;
41};
42
43/* interval between outputs */
44uint64_t tickrate;
45
46static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
47
48        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
49        /* Proccessing thread local storage */
50        struct addr_local *local = (struct addr_local *)tls;
51
52        /* Populate the result structure from the threads local storage and clear threads local storage*/
53        int i;
54        for(i=0;i<256;i++) {
55                /* Populate results */
56                result->src[i] = local->src[i];
57                result->dst[i] = local->dst[i];
58                /* Clear local storage */
59                local->src[i] = 0;
60                local->dst[i] = 0;
61        }
62        result->packets = local->packets;
63        local->packets = 0;
64
65        /* Push result to the combiner */
66        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
67}
68
69/* Start callback function - This is run for each thread when it starts */
70static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
71
72        /* Create and initialize the local counter struct */
73        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
74        int i;
75        for(i=0;i<256;i++) {
76                local->src[i] = 0;
77                local->dst[i] = 0;
78        }
79        local->lastkey = 0;
80        local->packets = 0;
81
82        /* return the local storage so it is available for all other callbacks for the thread*/
83        return local;
84}
85
86/* Checks if address is part of a excluded subnet. */
87static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
88
89        int i;
90        for(i=0;i<exclude->count;i++) {
91                /* Convert address into a network address */
92                uint32_t net_addr = address & exclude->networks[i].mask;
93
94                /* If this matches the network address from the excluded list we need to exclude this
95                   address. */
96                if(net_addr == exclude->networks[i].network) {
97                        return 1;
98                }
99        }
100
101        /* If we got this far the address should not be excluded */
102        return 0;
103}
104
105static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
106
107        /* Checks if the ip is of type IPv4 */
108        if (ip->sa_family == AF_INET) {
109
110                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
111                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
112                /* Get in_addr from sockaddr */
113                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
114                /* Ensure the address is in network byte order */
115                uint32_t address = htonl(ip4.s_addr);
116
117                /* Check if the address is part of an excluded network. */
118                if(network_excluded(address, exclude) == 0) {
119
120                        /* Split the IPv4 address into each octet */
121                        uint8_t octet[4];
122                        octet[0] = (address & 0xff000000) >> 24;
123                        octet[1] = (address & 0x00ff0000) >> 16;
124                        octet[2] = (address & 0x0000ff00) >> 8;
125                        octet[3] = (address & 0x000000ff);
126
127                        /* check if the supplied address was a source or destination,
128                           increment the correct one */
129                        if(srcaddr) {
130                                local->src[octet[0]]++;
131                        } else {
132                                local->dst[octet[0]]++;
133                        }
134                }
135        }
136}
137
138/* Per packet callback function run by each thread */
139static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
140        libtrace_packet_t *packet) {
141
142        /* Regain access to the address counter structure */
143        struct addr_local *local = (struct addr_local *)tls;
144
145        /* Increment the packet count */
146        local->packets += 1;
147
148        /* Regain access to excluded networks pointer */
149        struct exclude_networks *exclude = (struct exclude_networks *)global;
150
151        struct sockaddr_storage addr;
152        struct sockaddr *ip;
153
154        /* Get the source IP address */
155        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
156        /* If a source ip address was found */
157        if(ip != NULL) {
158                process_ip(ip, local, exclude, 1);
159        }
160
161        /* Get the destination IP address */
162        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
163        /* If a destination ip address was found */
164        if(ip != NULL) {
165                process_ip(ip, local, exclude, 0);
166        }
167
168        /* If this trace is not live we will manually call "per tick" */
169        if(!trace_get_information(trace)->live) {
170                /* get the current packets timestamp */
171                uint64_t timestamp = trace_get_erf_timestamp(packet);
172
173                /* We only want to call per_tick if we are due to output something */
174                if((timestamp >> 32) >= (local->lastkey >> 32) + (tickrate/1000)) {
175                        per_tick(trace, thread,global, local, timestamp);
176                        local->lastkey = timestamp;
177                }
178        }
179
180        /* Return the packet to libtrace */
181        return packet;
182}
183
184/* Stopping callback function - When a thread closes */
185static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
186
187        /* cast the local storage structure */
188        struct addr_local *local = (struct addr_local *)tls;
189        /* Create structure to store the result */
190        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
191
192        /* Populate the result */
193        int i;
194        for(i=0;i<256;i++) {
195                result->src[i] = local->src[i];
196                result->src[i] = local->src[i];
197        }
198        result->packets = local->packets;
199
200        /* This will not cause the result to be printed but will atleast end up going into our tally
201         * The reporter thread can then deal with it when it closes */
202        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
203
204        /* Cleanup the local storage */
205        free(local);
206}
207
208/* Starting callback for reporter thread */
209static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
210        /* Create tally structure */
211        struct addr_tally *tally = (struct addr_tally *)malloc(sizeof(struct addr_tally));
212
213        /* Initialize the tally structure */
214        int i;
215        for(i=0;i<256;i++) {
216                tally->src[i] = 0;
217                tally->dst[i] = 0;
218        }
219        tally->lastkey = 0;
220        tally->packets = 0;
221
222        return tally;
223}
224
225static void plot_results(struct addr_tally *tally, uint64_t tick) {
226
227        /* Get the current time */
228        time_t current_time = time(NULL);
229
230        char outputfile[255];
231        char outputplot[255];
232        snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", tick);
233        snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", tick);
234
235        /* Push all data into data file */
236        FILE *tmp = fopen(outputfile, "w");
237        int i;
238        for(i=0;i<255;i++) {
239                fprintf(tmp, "%d %d %d\n", i, tally->src[i], tally->dst[i]);
240        }
241        fclose(tmp);
242        printf("wrote out to file %s\n", outputfile);
243
244        /* Open pipe to gnuplot */
245        FILE *gnuplot = popen("gnuplot -persistent", "w");
246        /* send all commands to gnuplot */
247        fprintf(gnuplot, "set term png size 1280,960 \n");
248        fprintf(gnuplot, "set title 'IP Distribution'\n");
249        fprintf(gnuplot, "set xrange[0:255]\n");
250        fprintf(gnuplot, "set xlabel 'Prefix'\n");
251        fprintf(gnuplot, "set ylabel 'Hits'\n");
252        fprintf(gnuplot, "set xtics 0,10,255\n");
253        fprintf(gnuplot, "set output '%s'\n", outputplot);
254        fprintf(gnuplot, "plot '%s' using 1:2 title 'Source address' with boxes, '%s' using 1:3 title 'Destination address' with boxes\n", outputfile, outputfile);
255        fprintf(gnuplot, "replot");
256        pclose(gnuplot);
257}
258
259
260/* Callback when a result is given to the reporter thread */
261static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
262        void *tls, libtrace_result_t *result) {
263
264        struct addr_result *results;
265        struct addr_tally *tally;
266        uint64_t key;
267
268        /* We only want to handle results containing our user-defined structure  */
269        if(result->type != RESULT_USER) {
270                return;
271        }
272
273        /* This key is the key that was passed into trace_publish_results
274         * this will contain the erf timestamp for the packet */
275        key = result->key;
276
277        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
278        results = (struct addr_result *)result->value.ptr;
279
280        /* Grab our tally out of thread local storage */
281        tally = (struct addr_tally *)tls;
282
283        /* Add all the results to the tally */
284        int i;
285        for(i=0;i<256;i++) {
286                tally->src[i] += results->src[i];
287                tally->dst[i] += results->dst[i];
288        }
289        tally->packets += results->packets;
290
291        /* If the current timestamp is greater than the last printed plus the interval, output a result */
292        if((key >> 32) >= (tally->lastkey >> 32) + (tickrate/1000)) {
293
294                /* update last key */
295                tally->lastkey = key;
296
297                /* Plot the result with the key in epoch seconds*/
298                plot_results(tally, key >> 32);
299
300                /* clear the tally */
301                for(i=0;i<256;i++) {
302                        tally->src[i] = 0;
303                        tally->dst[i] = 0;
304                }
305                tally->packets = 0;
306        }
307
308        /* Cleanup the thread results */
309        free(results);
310}
311
312/* Callback when the reporter thread stops (essentially when the program ends) */
313static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
314
315        /* Get the tally from the thread local storage */
316        struct addr_tally *tally = (struct addr_tally *)tls;
317
318        /* If there is any remaining data in the tally plot it */
319        if(tally->packets > 0) {
320                plot_results(tally, (tally->lastkey >> 32) + tickrate);
321        }
322        /* Cleanup tally results*/
323        free(tally);
324}
325
326int main(int argc, char *argv[]) {
327
328        libtrace_t *trace;
329        /* Callbacks for processing and reporting threads */
330        libtrace_callback_set_t *processing, *reporter;
331
332        /* Ensure the input URI was supplied */
333        if(argc < 3) {
334                fprintf(stderr, "Usage: %s inputURI outputInterval [excluded networks]\n", argv[0]);
335                fprintf(stderr, "       eg. ./ipdist input.erf 10000 210.10.3.0/24 70.5.0.0/16\n");
336                return 1;
337        }
338        /* Convert tick into correct format */
339        tickrate = atoi(argv[2]);
340
341        /* Create the trace */
342        trace = trace_create(argv[1]);
343        /* Ensure no error has occured creating the trace */
344        if(trace_is_err(trace)) {
345                trace_perror(trace, "Creating trace");
346                return 1;
347        }
348
349        /* Setup the processing threads */
350        processing = trace_create_callback_set();
351        trace_set_starting_cb(processing, start_callback);
352        trace_set_packet_cb(processing, per_packet);
353        trace_set_stopping_cb(processing, stop_processing);
354        trace_set_tick_interval_cb(processing, per_tick);
355        /* Setup the reporter threads */
356        reporter = trace_create_callback_set();
357        trace_set_starting_cb(reporter, start_reporter);
358        trace_set_result_cb(reporter, per_result);
359        trace_set_stopping_cb(reporter, stop_reporter);
360
361        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
362        trace_set_perpkt_threads(trace, 4);
363        /* Order the results by timestamp */
364        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
365        /* Try to balance the load across all processing threads */
366        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
367
368        /* Set the tick interval only if this is a live capture */
369        if(trace_get_information(trace)->live) {
370                trace_set_tick_interval(trace, tickrate);
371        }
372        /* Do not buffer the reports */
373        trace_set_reporter_thold(trace, 1);
374
375
376        /* Setup excluded networks if any were supplied */
377        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
378        exclude->networks = malloc(sizeof(struct network)*(argc-3));
379        if(exclude == NULL || exclude->networks == NULL) {
380                fprintf(stderr, "Unable to allocate memory");
381                return 1;
382        }
383        exclude->count = 0;
384
385        char delim[] = "/";
386        int i;
387        for(i=0;i<argc-3;i++) {
388                char *address = strtok(argv[i+3], delim);
389                char *mask = strtok(NULL, delim);
390
391                /* Check the subnet mask is valid */
392                if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
393                        fprintf(stderr, "Invalid subnet mask: %s\n", mask);
394                        return 1;
395                }
396                /* right shift so netmask is in network byte order */
397                exclude->networks[i].mask = 0xffffffff << (32 - atoi(mask));
398
399                struct in_addr addr;
400                /* Convert address string into uint32_t and check its valid */
401                if(inet_aton(address, &addr) == 0) {
402                        fprintf(stderr, "Invalid exclude address: %s\n", address);
403                        return 1;
404                }
405                /* Ensure its saved in network byte order */
406                exclude->networks[i].address = htonl(addr.s_addr);
407
408                /* Calculate the network address */
409                exclude->networks[i].network = exclude->networks[i].address & exclude->networks[i].mask;
410
411                /* Increment counter of excluded networks */
412                exclude->count += 1;
413        }
414
415        /* Start the trace, if it errors return */
416        if(trace_pstart(trace, exclude, processing, reporter)) {
417                trace_perror(trace, "Starting parallel trace");
418                return 1;
419        }
420
421        /* This will wait for all threads to complete */
422        trace_join(trace);
423
424        /* Clean up everything */
425        free(exclude->networks);
426        free(exclude);
427        trace_destroy(trace);
428        trace_destroy_callback_set(processing);
429        trace_destroy_callback_set(reporter);
430
431        return 0;
432}
Note: See TracBrowser for help on using the repository browser.