source: examples/tutorial/ipdist-parallel.c @ 9d5f464

develop
Last change on this file since 9d5f464 was 9d5f464, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Improvments to tickrate calculations

  • Property mode set to 100644
File size: 14.0 KB
Line 
1/* Program reads a trace file and counts the first octet of the source and destination
2 * address and plots them on a graph using gnuplot.
3 */
4#include "libtrace_parallel.h"
5#include <stdio.h>
6#include <stdlib.h>
7#include <string.h>
8#include <sys/socket.h>
9#include <netinet/in.h>
10#include <arpa/inet.h>
11#include <time.h>
12
13/* Structure to hold the counters each thread has its own one of these */
14struct addr_local {
15        uint64_t src[256];
16        uint64_t dst[256];
17        uint64_t lastkey;
18        uint64_t packets;
19};
20/* Structure to hold the result from a processing thread */
21struct addr_result {
22        uint64_t src[256];
23        uint64_t dst[256];
24        uint64_t packets;
25};
26/* Structure to hold counters the report has one of these, it combines
27 * the the counters of each threads local storage used by the reporter thread */
28struct addr_tally {
29        uint64_t src[256];
30        uint64_t dst[256];
31        uint64_t lastkey;
32        uint64_t packets;
33};
34
35/* Structure to hold excluded networks */
36struct exclude_networks {
37        int count;
38        struct network *networks;
39};
40struct network {
41        uint32_t address;
42        uint32_t mask;
43        uint32_t network;
44};
45
46uint64_t tickrate;
47
48/* Start callback function - This is run for each thread when it starts */
49static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
50
51        /* Create and initialize the local counter struct */
52        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
53        int i;
54        for(i=0;i<256;i++) {
55                local->src[i] = 0;
56                local->dst[i] = 0;
57        }
58        local->lastkey = 0;
59        local->packets = 0;
60
61        /* return the local storage so it is available for all other callbacks for the thread*/
62        return local;
63}
64
65/* Checks if address is part of a excluded subnet. */
66static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
67
68        int i;
69        for(i=0;i<exclude->count;i++) {
70                /* Convert address into a network address */
71                uint32_t net_addr = address & exclude->networks[i].mask;
72
73                /* If this matches the network address from the excluded list we need to exclude this
74                   address. */
75                if(net_addr == exclude->networks[i].network) {
76                        return 1;
77                }
78        }
79
80        /* If we got this far the address should not be excluded */
81        return 0;
82}
83
84static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
85
86        /* Checks if the ip is of type IPv4 */
87        if (ip->sa_family == AF_INET) {
88
89                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
90                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
91                /* Get in_addr from sockaddr */
92                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
93                /* Ensure the address is in network byte order */
94                uint32_t address = htonl(ip4.s_addr);
95
96                /* Check if the address is part of an excluded network. */
97                if(network_excluded(address, exclude) == 0) {
98
99                        /* Split the IPv4 address into each octet */
100                        uint8_t octet[4];
101                        octet[0] = (address & 0xff000000) >> 24;
102                        octet[1] = (address & 0x00ff0000) >> 16;
103                        octet[2] = (address & 0x0000ff00) >> 8;
104                        octet[3] = (address & 0x000000ff);
105
106                        /* check if the supplied address was a source or destination,
107                           increment the correct one */
108                        if(srcaddr) {
109                                local->src[octet[0]]++;
110                        } else {
111                                local->dst[octet[0]]++;
112                        }
113                }
114        }
115}
116
117/* Per packet callback function run by each thread */
118static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
119        libtrace_packet_t *packet) {
120
121        /* Regain access to the address counter structure */
122        struct addr_local *local = (struct addr_local *)tls;
123
124        /* Store the timestamp of the last packet in erf format
125         * We use the timestamp in the packet for processing non live traces */
126        local->lastkey = trace_get_erf_timestamp(packet);
127        /* Increment the packet count */
128        local->packets += 1;
129
130        /* Regain access to excluded networks pointer */
131        struct exclude_networks *exclude = (struct exclude_networks *)global;
132
133        struct sockaddr_storage addr;
134        struct sockaddr *ip;
135
136        /* Get the source IP address */
137        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
138        /* If a source ip address was found */
139        if(ip != NULL) {
140                process_ip(ip, local, exclude, 1);
141        }
142
143        /* Get the destination IP address */
144        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
145        /* If a destination ip address was found */
146        if(ip != NULL) {
147                process_ip(ip, local, exclude, 0);
148        }
149
150        /* Return the packet to libtrace */
151        return packet;
152}
153
154/* Stopping callback function - When a thread closes */
155static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
156
157        /* cast the local storage structure */
158        struct addr_local *local = (struct addr_local *)tls;
159        /* Create structure to store the result */
160        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
161
162        /* Populate the result */
163        int i;
164        for(i=0;i<256;i++) {
165                result->src[i] = local->src[i];
166                result->src[i] = local->src[i];
167        }
168        result->packets = local->packets;
169
170        /* This will not cause the result to be printed but will atleast end up going into our tally
171         * The reporter thread can then deal with it when it closes */
172        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
173
174        /* Cleanup the local storage */
175        free(local);
176}
177
178/* Starting callback for reporter thread */
179static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
180        /* Create tally structure */
181        struct addr_tally *tally = (struct addr_tally *)malloc(sizeof(struct addr_tally));
182
183        /* Initialize the tally structure */
184        int i;
185        for(i=0;i<256;i++) {
186                tally->src[i] = 0;
187                tally->dst[i] = 0;
188        }
189        tally->lastkey = 0;
190        tally->packets = 0;
191
192        return tally;
193}
194
195static void plot_results(struct addr_tally *tally, uint64_t tick) {
196
197        /* Get the current time */
198        time_t current_time = time(NULL);
199
200        char outputfile[255];
201        char outputplot[255];
202        snprintf(outputfile, sizeof(outputfile), "ipdist-%u.data", tick);
203        snprintf(outputplot, sizeof(outputplot), "ipdist-%u.png", tick);
204
205        /* Push all data into data file */
206        FILE *tmp = fopen(outputfile, "w");
207        int i;
208        for(i=0;i<255;i++) {
209                fprintf(tmp, "%d %d %d\n", i, tally->src[i], tally->dst[i]);
210        }
211        fclose(tmp);
212        printf("wrote out to file %s\n", outputfile);
213
214        /* Open pipe to gnuplot */
215        FILE *gnuplot = popen("gnuplot -persistent", "w");
216        /* send all commands to gnuplot */
217        fprintf(gnuplot, "set term png size 1280,960 \n");
218        fprintf(gnuplot, "set title 'IP Distribution'\n");
219        fprintf(gnuplot, "set xrange[0:255]\n");
220        fprintf(gnuplot, "set xlabel 'Prefix'\n");
221        fprintf(gnuplot, "set ylabel 'Hits'\n");
222        fprintf(gnuplot, "set xtics 0,10,255\n");
223        fprintf(gnuplot, "set output '%s'\n", outputplot);
224        fprintf(gnuplot, "plot '%s' using 1:2 title 'Source address' with boxes, '%s' using 1:3 title 'Destination address' with boxes\n", outputfile, outputfile);
225        fprintf(gnuplot, "replot");
226        pclose(gnuplot);
227}
228
229
230/* Callback when a result is given to the reporter thread */
231static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
232        void *tls, libtrace_result_t *result) {
233
234        struct addr_result *results;
235        struct addr_tally *tally;
236        uint64_t key;
237
238        /* We only want to handle results containing our user-defined structure  */
239        if(result->type != RESULT_USER) {
240                return;
241        }
242
243        /* This key is the key that was passed into trace_publish_results */
244        key = result->key;
245
246        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
247        results = (struct addr_result *)result->value.ptr;
248
249        /* Grab our tally out of thread local storage */
250        tally = (struct addr_tally *)tls;
251
252        /* Add all the results to the tally */
253        int i;
254        for(i=0;i<256;i++) {
255                tally->src[i] += results->src[i];
256                tally->dst[i] += results->dst[i];
257        }
258        tally->packets += results->packets;
259
260        /* If the current timestamp is greater than the last printed plus the interval, output a result */
261        if((key >> 32) >= (tally->lastkey >> 32) + (tickrate/1000)) {
262
263                /* update last key */
264                tally->lastkey = key;
265
266                /* Plot the result with the key in epoch seconds*/
267                plot_results(tally, key >> 32);
268
269                /* clear the tally */
270                for(i=0;i<256;i++) {
271                        tally->src[i] = 0;
272                        tally->dst[i] = 0;
273                }
274                tally->packets = 0;
275        }
276
277        /* Cleanup the thread results */
278        free(results);
279}
280
281/* Callback when the reporter thread stops (essentially when the program ends) */
282static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
283
284        /* Get the tally from the thread local storage */
285        struct addr_tally *tally = (struct addr_tally *)tls;
286
287        /* If there is any remaining data in the tally plot it */
288        if(tally->packets > 0) {
289                plot_results(tally, (tally->lastkey >> 32) + tickrate);
290        }
291        /* Cleanup tally results*/
292        free(tally);
293}
294
295static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
296
297        struct addr_result *result = (struct addr_result *)malloc(sizeof(struct addr_result));
298        /* Proccessing thread local storage */
299        struct addr_local *local = (struct addr_local *)tls;
300
301        /* Populate the result structure from the threads local storage and clear threads local storage*/
302        int i;
303        for(i=0;i<256;i++) {
304                /* Populate results */
305                result->src[i] = local->src[i];
306                result->dst[i] = local->dst[i];
307                /* Clear local storage */
308                local->src[i] = 0;
309                local->dst[i] = 0;
310        }
311        result->packets = local->packets;
312
313        /* only use the tick timestamp if running against a live capture */
314        uint64_t key;
315        if(trace_get_information(trace)->live) {
316                key = tick;
317        } else {
318                key = local->lastkey;
319        }
320
321        /* Push result to the combiner */
322        trace_publish_result(trace, thread, key, (libtrace_generic_t){.ptr=result}, RESULT_USER);
323}
324
325int main(int argc, char *argv[]) {
326
327        libtrace_t *trace;
328        /* Callbacks for processing and reporting threads */
329        libtrace_callback_set_t *processing, *reporter;
330
331        /* Ensure the input URI was supplied */
332        if(argc < 3) {
333                fprintf(stderr, "Usage: %s inputURI outputInterval [excluded networks]\n", argv[0]);
334                fprintf(stderr, "       eg. ./ipdist input.erf 10000 210.10.3.0/24 70.5.0.0/16\n");
335                return 1;
336        }
337        /* Convert tick into correct format */
338        tickrate = atoi(argv[2]);
339
340        /* Create the trace */
341        trace = trace_create(argv[1]);
342        /* Ensure no error has occured creating the trace */
343        if(trace_is_err(trace)) {
344                trace_perror(trace, "Creating trace");
345                return 1;
346        }
347
348        /* Setup the processing threads */
349        processing = trace_create_callback_set();
350        trace_set_starting_cb(processing, start_callback);
351        trace_set_packet_cb(processing, per_packet);
352        trace_set_stopping_cb(processing, stop_processing);
353        trace_set_tick_interval_cb(processing, per_tick);
354        /* Setup the reporter threads */
355        reporter = trace_create_callback_set();
356        trace_set_starting_cb(reporter, start_reporter);
357        trace_set_result_cb(reporter, per_result);
358        trace_set_stopping_cb(reporter, stop_reporter);
359
360        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
361        trace_set_perpkt_threads(trace, 4);
362        /* Order the results by timestamp */
363        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
364        /* Try to balance the load across all processing threads */
365        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
366
367        /* Set the tick interval */
368        trace_set_tick_interval(trace, tickrate);
369        /* Do not buffer the reports */
370        trace_set_reporter_thold(trace, 1);
371
372
373        /* Setup excluded networks if any were supplied */
374        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
375        exclude->networks = malloc(sizeof(struct network)*(argc-3));
376        if(exclude == NULL || exclude->networks == NULL) {
377                fprintf(stderr, "Unable to allocate memory");
378                return 1;
379        }
380        exclude->count = 0;
381
382        char delim[] = "/";
383        int i;
384        for(i=0;i<argc-3;i++) {
385                char *address = strtok(argv[i+3], delim);
386                char *mask = strtok(NULL, delim);
387
388                /* Check the subnet mask is valid */
389                if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
390                        fprintf(stderr, "Invalid subnet mask: %s\n", mask);
391                        return 1;
392                }
393                /* right shift so netmask is in network byte order */
394                exclude->networks[i].mask = 0xffffffff << (32 - atoi(mask));
395
396                struct in_addr addr;
397                /* Convert address string into uint32_t and check its valid */
398                if(inet_aton(address, &addr) == 0) {
399                        fprintf(stderr, "Invalid exclude address: %s\n", address);
400                        return 1;
401                }
402                /* Ensure its saved in network byte order */
403                exclude->networks[i].address = htonl(addr.s_addr);
404
405                /* Calculate the network address */
406                exclude->networks[i].network = exclude->networks[i].address & exclude->networks[i].mask;
407
408                /* Increment counter of excluded networks */
409                exclude->count += 1;
410        }
411
412        /* Start the trace, if it errors return */
413        if(trace_pstart(trace, exclude, processing, reporter)) {
414                trace_perror(trace, "Starting parallel trace");
415                return 1;
416        }
417
418        /* This will wait for all threads to complete */
419        trace_join(trace);
420
421        /* Clean up everything */
422        free(exclude->networks);
423        free(exclude);
424        trace_destroy(trace);
425        trace_destroy_callback_set(processing);
426        trace_destroy_callback_set(reporter);
427
428        return 0;
429}
Note: See TracBrowser for help on using the repository browser.