source: examples/stats/ipdist-parallel.c @ 91852de2

develop
Last change on this file since 91852de2 was 91852de2, checked in by Jacob Van Walraven <jcv9@…>, 4 years ago

Move more plotting tasks to python script

  • Property mode set to 100644
File size: 23.3 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9#include <math.h>
10#include <getopt.h>
11
12/* Structure to hold the counters each thread has its own one of these */
13struct addr_local {
14        /* Holds the counts of each number occurance per octet, These are cleared after every output. */
15        uint64_t src[4][256];
16        uint64_t dst[4][256];
17        /* Holds the results from the previous output */
18        uint64_t src_lastoutput[4][256];
19        uint64_t dst_lastoutput[4][256];
20        /* Holds the timestamp */
21        uint64_t lastkey;
22        /* Is the count of the number of packets processed, This is cleared after every output. */
23        uint64_t packets;
24        /* Total number an output has been generated */
25        uint64_t output_count;
26        /* Pointer to stats structure */
27        struct addr_stats *stats;
28};
29struct addr_stats {
30        /* Holds the percentage change compared to the previous output */
31        float src[4][256];
32        float dst[4][256];
33        /* Stats calculated independently per output */
34        double mode_src[4];
35        double mode_dst[4];
36        double mean_src[4];
37        double mean_dst[4];
38        double median_src[4];
39        double median_dst[4];
40        double stddev_src[4];
41        double stddev_dst[4];
42        double variance_src[4];
43        double variance_dst[4];
44        double skewness_src[4];
45        double skewness_dst[4];
46        struct addr_rank *rank_src[4];
47        struct addr_rank *rank_dst[4];
48};
49struct addr_rank {
50        uint8_t addr;
51        /* count is the priority */
52        uint64_t count;
53        /* pointer to next ranking item */
54        struct addr_rank* next;
55};
56
57/* Structure to hold excluded networks */
58struct exclude_networks {
59        int count;
60        struct network *networks;
61};
62struct network {
63        uint32_t address;
64        uint32_t mask;
65        uint32_t network;
66};
67
68uint64_t tickrate;
69char *stats_outputdir = "";
70
71/*************************************************************************
72Priority queue linked list */
73
74static struct addr_rank *rank_new(uint8_t addr, uint64_t count) {
75        struct addr_rank *tmp = malloc(sizeof(struct addr_rank));
76        tmp->addr = addr;
77        tmp->count = count;
78        tmp->next = NULL;
79
80        return tmp;
81}
82static uint8_t peak(struct addr_rank **head) {
83        return (*head)->addr;
84}
85static uint64_t peak_count(struct addr_rank **head) {
86        return (*head)->count;
87}
88static void pop(struct addr_rank **head) {
89        struct addr_rank* tmp = *head;
90        (*head) = (*head)->next;
91        free(tmp);
92}
93static void push(struct addr_rank **head, uint8_t addr, uint64_t count) {
94        struct addr_rank *curr = (*head);
95        struct addr_rank *tmp = rank_new(addr, count);
96
97        /* Check if the new node has a greater priority than the head */
98        if((*head)->count < count) {
99                tmp->next = *head;
100                (*head) = tmp;
101        } else {
102                /* Jump through the list until we find the correct position */
103                while (curr->next != NULL && curr->next->count > count) {
104                        curr = curr->next;
105                }
106
107                tmp->next = curr->next;
108                curr->next = tmp;
109        }
110}
111/*************************************************************************/
112
113
114static void compute_stats(struct addr_local *tally) {
115        int i, j, k;
116
117        /* To get ranking we push everything into the priority queue at pop things off 1 by one which returns them in high to lowest priority */
118        for(i=0;i<4;i++) {
119                tally->stats->rank_src[i] = rank_new(0, tally->src[i][0]);
120                tally->stats->rank_dst[i] = rank_new(0, tally->dst[i][0]);
121                for(j=1;j<256;j++) {
122                        /* Push everything into the priority queue
123                         * each item will be popped off in the correct order */
124                        push(&tally->stats->rank_src[i], j, tally->src[i][j]);
125                        push(&tally->stats->rank_dst[i], j, tally->dst[i][j]);
126                }
127        }
128
129        /* Calculate mean, variance and standard deviation */
130        for(k=0;k<4;k++) {
131
132                double ex = 0;
133                double ex2 = 0;
134                double n = 0;
135                double m = 0;
136                for(i=0;i<256;i++) {
137                        for(j=0;j<tally->src[k][i];j++) {
138                                if(n == 0) {
139                                        m = i;
140                                }
141                                n += 1;
142                                ex += (i - m);
143                                ex2 += ((i - m) * (i - m));
144                        }
145                }
146
147                tally->stats->mean_src[k] = (k + (ex / n));
148                tally->stats->variance_src[k] = ((ex2 - (ex*ex)/n) / n);
149                tally->stats->stddev_src[k] = sqrt(tally->stats->variance_src[k]);
150
151                ex = 0;
152                ex2 = 0;
153                n = 0;
154                m = 0;
155                for(i=0;i<256;i++) {
156                        for(j=0;j<tally->dst[k][i];j++) {
157                                if(n == 0) {
158                                        m = i;
159                                }
160                                n += 1;
161                                ex += (i - m);
162                                ex2 += ((i - m) * (i - m));
163                        }
164                }
165                tally->stats->mean_dst[k] = (k + (ex / n));
166                tally->stats->variance_dst[k] = ((ex2 - (ex*ex)/n) / n);
167                tally->stats->stddev_dst[k] = sqrt(tally->stats->variance_dst[k]);
168                /* Get the median */
169                int c = (n/2) - tally->src[k][0];
170                int c2 = 0;
171                while(c > 0) {
172                        c2 += 1;
173                        c -= tally->src[k][c2];
174                }
175                tally->stats->median_src[k] = c2;
176                c = (n/2) - tally->dst[k][0];
177                c2 = 0;
178                while(c > 0) {
179                        c2 += 1;
180                        c -= tally->dst[k][c2];
181                }
182                tally->stats->median_dst[k] = c2;
183                /* Get the mode which is the first item in the priority queue */
184                tally->stats->mode_src[k] = peak(&tally->stats->rank_src[k]);
185                tally->stats->mode_dst[k] = peak(&tally->stats->rank_src[k]);
186                /* Calculate skewness */
187                tally->stats->skewness_src[k] = (tally->stats->mean_src[k] - tally->stats->median_src[k]) / tally->stats->stddev_src[k];
188                tally->stats->skewness_dst[k] = (tally->stats->mean_dst[k] - tally->stats->median_dst[k]) / tally->stats->stddev_dst[k];
189        }
190
191}
192
193static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
194
195        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
196        /* Proccessing thread local storage */
197        struct addr_local *local = (struct addr_local *)tls;
198
199        /* Populate the result structure from the threads local storage and clear threads local storage*/
200        int i, j;
201        for(i=0;i<4;i++) {
202                for(j=0;j<256;j++) {
203                        result->src[i][j] = local->src[i][j];
204                        result->dst[i][j] = local->dst[i][j];
205                        /* clear local storage */
206                        local->src[i][j] = 0;
207                        local->dst[i][j] = 0;
208                }
209        }
210        result->packets = local->packets;
211        local->packets = 0;
212
213        /* Push result to the combiner */
214        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
215}
216
217/* Start callback function - This is run for each thread when it starts */
218static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
219
220        /* Create and initialize the local counter struct */
221        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
222        int i, j;
223        for(i=0;i<4;i++) {
224                for(j=0;j<256;j++) {
225                        local->src[i][j] = 0;
226                        local->dst[i][j] = 0;
227                }
228        }
229        local->lastkey = 0;
230        local->packets = 0;
231
232        /* return the local storage so it is available for all other callbacks for the thread*/
233        return local;
234}
235
236/* Checks if address is part of a excluded subnet. */
237static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
238
239        int i;
240        for(i=0;i<exclude->count;i++) {
241                /* Convert address into a network address */
242                uint32_t net_addr = address & exclude->networks[i].mask;
243
244                /* If this matches the network address from the excluded list we need to exclude this
245                   address. */
246                if(net_addr == exclude->networks[i].network) {
247                        return 1;
248                }
249        }
250
251        /* If we got this far the address should not be excluded */
252        return 0;
253}
254
255static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
256
257        /* Checks if the ip is of type IPv4 */
258        if (ip->sa_family == AF_INET) {
259
260                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
261                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
262                /* Get in_addr from sockaddr */
263                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
264                /* Ensure the address is in network byte order */
265                uint32_t address = htonl(ip4.s_addr);
266
267                /* Check if the address is part of an excluded network. */
268                if(network_excluded(address, exclude) == 0) {
269
270                        /* Split the IPv4 address into each octet */
271                        uint8_t octet[4];
272                        octet[0] = (address & 0xff000000) >> 24;
273                        octet[1] = (address & 0x00ff0000) >> 16;
274                        octet[2] = (address & 0x0000ff00) >> 8;
275                        octet[3] = (address & 0x000000ff);
276
277                        /* check if the supplied address was a source or destination,
278                           increment the correct one */
279                        if(srcaddr) {
280                                int i;
281                                for(i=0;i<4;i++) {
282                                        local->src[i][octet[i]] += 1;
283                                }
284                        } else {
285                                int i;
286                                for(i=0;i<4;i++) {
287                                        local->dst[i][octet[i]] += 1;
288                                }
289                        }
290                }
291        }
292}
293
294/* Per packet callback function run by each thread */
295static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
296        libtrace_packet_t *packet) {
297
298        /* Regain access to the address counter structure */
299        struct addr_local *local = (struct addr_local *)tls;
300
301        /* If this is the first packet set the lastkey to the packets timestamp */
302        if(local->lastkey == 0) {
303                local->lastkey = trace_get_erf_timestamp(packet);
304        }
305
306        /* Increment the packet count */
307        local->packets += 1;
308
309        /* Regain access to excluded networks pointer */
310        struct exclude_networks *exclude = (struct exclude_networks *)global;
311
312        struct sockaddr_storage addr;
313        struct sockaddr *ip;
314
315        /* Get the source IP address */
316        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
317        /* If a source ip address was found */
318        if(ip != NULL) {
319                process_ip(ip, local, exclude, 1);
320        }
321
322        /* Get the destination IP address */
323        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
324        /* If a destination ip address was found */
325        if(ip != NULL) {
326                process_ip(ip, local, exclude, 0);
327        }
328
329        /* If this trace is not live we will manually call "per tick" */
330        if(!trace_get_information(trace)->live) {
331                /* get the current packets timestamp */
332                uint64_t timestamp = trace_get_erf_timestamp(packet);
333
334                /* We only want to call per_tick if we are due to output something
335                 * Right shifting these converts them to seconds, tickrate is in seconds */
336                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
337                        per_tick(trace, thread, global, local, timestamp);
338                        local->lastkey = timestamp;
339                }
340        }
341
342        /* Return the packet to libtrace */
343        return packet;
344}
345
346/* Stopping callback function - When a thread closes */
347static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
348
349        /* cast the local storage structure */
350        struct addr_local *local = (struct addr_local *)tls;
351        /* Create structure to store the result */
352        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
353
354        /* Populate the result */
355        int i, j;
356        for(i=0;i<4;i++) {
357                for(j=0;j<256;j++) {
358                        result->src[i][j] = local->src[i][j];
359                        result->dst[i][j] = local->dst[i][j];
360                }
361        }
362        result->packets = local->packets;
363
364        /* Send the final results to the combiner */
365        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
366
367        /* Cleanup the local storage */
368        free(local);
369}
370
371/* Starting callback for reporter thread */
372static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
373        /* Create tally structure */
374        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
375        tally->stats = malloc(sizeof(struct addr_stats));
376
377        /* Initialize the tally structure */
378        int i, j;
379        for(i=0;i<4;i++) {
380                for(j=0;j<256;j++) {
381                        tally->src[i][j] = 0;
382                        tally->dst[i][j] = 0;
383                        tally->src_lastoutput[i][j] = 0;
384                        tally->dst_lastoutput[i][j] = 0;
385                        tally->stats->src[i][j] = 0;
386                        tally->stats->dst[i][j] = 0;
387                }
388                /* Stats related varibles */
389                tally->stats->mode_src[i] = 0;
390                tally->stats->mode_dst[i] = 0;
391                tally->stats->mean_src[i] = 0;
392                tally->stats->mean_dst[i] = 0;
393                tally->stats->median_src[i] = 0;
394                tally->stats->median_dst[i] = 0;
395                tally->stats->stddev_src[i] = 0;
396                tally->stats->stddev_dst[i] = 0;
397                tally->stats->variance_src[i] = 0;
398                tally->stats->variance_dst[i] = 0;
399                tally->stats->skewness_src[i] = 0;
400                tally->stats->skewness_dst[i] = 0;
401        }
402        tally->lastkey = 0;
403        tally->packets = 0;
404        tally->output_count = 0;
405
406        return tally;
407}
408
409static void output_results(struct addr_local *tally, uint64_t tick) {
410
411        int i, j;
412
413        /* Calculations before reporting the results */
414        /* Need to initialise lastoutput values on first pass,
415         * this is so we have a base line for percentage changed */
416        if(tally->output_count == 0) {
417                for(i=0;i<4;i++) {
418                        for(j=0;j<256;j++) {
419                                tally->src_lastoutput[i][j] = tally->src[i][j];
420                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
421                        }
422                }
423         }
424        /* Compute the stats */
425        compute_stats(tally);
426
427        /* Finaly output the results */
428        printf("Generating output \"%sipdist-%lu\"\n", stats_outputdir, tick);
429
430        /* Output the results */
431        char outputfile[255];
432        snprintf(outputfile, sizeof(outputfile), "%sipdist-%lu.data", stats_outputdir, tick);
433        FILE *tmp = fopen(outputfile, "w");
434        fprintf(tmp, "#time\t\trank\toctet1\t\t\t\toctet2\t\t\t\toctet3\t\t\t\toctet4\n");
435        fprintf(tmp, "#\t\t\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\n");
436        for(i=0;i<256;i++) {
437                fprintf(tmp, "%lu\t%d", tick, i+1);
438                for(j=0;j<4;j++) {
439                        /* Get the highest ranking to lowest ranking octets */
440                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_src[j]));
441                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_src[j]));
442                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_dst[j]));
443                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_dst[j]));
444                        pop(&tally->stats->rank_src[j]);
445                        pop(&tally->stats->rank_dst[j]);
446                }
447                fprintf(tmp, "\n");
448        }
449        fclose(tmp);
450
451        char outputfile_stats[255];
452        snprintf(outputfile_stats, sizeof(outputfile_stats), "%sipdist-%lu.stats", stats_outputdir, tick);
453        tmp = fopen(outputfile_stats, "w");
454        /* append stats data to end of file */
455        fprintf(tmp, "#\tmean\tstddev\tvariance\tmedian\tmode\tskewness\n");
456        for(i=0;i<4;i++) {
457                fprintf(tmp, "src%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_src[i], tally->stats->stddev_src[i], tally->stats->variance_src[i], tally->stats->median_src[i], tally->stats->mode_src[i], tally->stats->skewness_src[i]);
458                fprintf(tmp, "dst%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_dst[i], tally->stats->stddev_dst[i], tally->stats->variance_dst[i], tally->stats->median_src[i], tally->stats->mode_dst[i], tally->stats->skewness_dst[i]);
459                fprintf(tmp, "\n\n");
460        }
461        fclose(tmp);
462}
463
464
465/* Callback when a result is given to the reporter thread */
466static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
467        void *tls, libtrace_result_t *result) {
468
469        struct addr_local *results;
470        struct addr_local *tally;
471        uint64_t key;
472
473        /* We only want to handle results containing our user-defined structure  */
474        if(result->type != RESULT_USER) {
475                return;
476        }
477
478        /* This key is the key that was passed into trace_publish_results
479         * this will contain the erf timestamp for the packet */
480        key = result->key;
481
482        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
483        results = (struct addr_local *)result->value.ptr;
484
485        /* Grab our tally out of thread local storage */
486        tally = (struct addr_local *)tls;
487
488        /* Add all the results to the tally */
489        int i, j;
490        for(i=0;i<4;i++) {
491                for(j=0;j<256;j++) {
492                        tally->src[i][j] += results->src[i][j];
493                        tally->dst[i][j] += results->dst[i][j];
494                }
495        }
496        tally->packets += results->packets;
497
498        /* If the current timestamp is greater than the last printed plus the interval, output a result */
499        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
500
501                /* update last key */
502                tally->lastkey = key;
503
504                /* Output the results with the key in epoch seconds*/
505                output_results(tally, key >> 32);
506
507                /* increment the output counter */
508                tally->output_count++;
509
510                /* clear the tally but copy old values over first*/
511                for(i=0;i<4;i++) {
512                        for(j=0;j<256;j++) {
513                                tally->src_lastoutput[i][j] = tally->src[i][j];
514                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
515                                tally->src[i][j] = 0;
516                                tally->dst[i][j] = 0;
517                        }
518                        /* Clear all the stats data */
519                        tally->stats->mode_src[i] = 0;
520                        tally->stats->mode_dst[i] = 0;
521                        tally->stats->mean_src[i] = 0;
522                        tally->stats->mean_dst[i] = 0;
523                        tally->stats->median_src[i] = 0;
524                        tally->stats->median_dst[i] = 0;
525                        tally->stats->stddev_src[i] = 0;
526                        tally->stats->stddev_dst[i] = 0;
527                        tally->stats->variance_src[i] = 0;
528                        tally->stats->variance_dst[i] = 0;
529                        tally->stats->skewness_src[i] = 0;
530                        tally->stats->skewness_dst[i] = 0;
531                }
532                /* free the priority queue */
533                for(i=0;i<4;i++) {
534                        free(tally->stats->rank_src[i]);
535                        free(tally->stats->rank_dst[i]);
536                }
537
538                tally->packets = 0;
539
540        }
541
542        /* Cleanup the thread results */
543        free(results);
544}
545
546/* Callback when the reporter thread stops (essentially when the program ends) */
547static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
548
549        /* Get the tally from the thread local storage */
550        struct addr_local *tally = (struct addr_local *)tls;
551
552        /* If there is any remaining data in the tally plot it */
553        if(tally->packets > 0) {
554                /* Then output the results */
555                output_results(tally, (tally->lastkey >> 32) + 1);
556        }
557        /* Cleanup tally results*/
558        free(tally);
559}
560
561static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
562        libtrace_callback_set_t *reporting, struct exclude_networks *exclude) {
563        /* Only destroy if the structure exists */
564        if(trace) {
565                trace_destroy(trace);
566        }
567        if(processing) {
568                trace_destroy_callback_set(processing);
569        }
570        if(reporting) {
571                trace_destroy_callback_set(reporting);
572        }
573        if(exclude->count > 0) {
574                free(exclude->networks);
575        }
576        if(exclude) {
577                free(exclude);
578        }
579}
580
581/* Converts a string representation eg 1.2.3.4/24 into a network structure */
582static int get_network(char *network_string, struct network *network) {
583
584        char delim[] = "/";
585        /* Split the address and mask portion of the string */
586        char *address = strtok(network_string, delim);
587        char *mask = strtok(NULL, delim);
588
589        /* Check the subnet mask is valid */
590        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
591                return 1;
592        }
593        /* right shift so netmask is in network byte order */
594        network->mask = 0xffffffff << (32 - atoi(mask));
595
596        struct in_addr addr;
597        /* Convert address string into uint32_t and check its valid */
598        if(inet_aton(address, &addr) == 0) {
599                return 2;
600        }
601        /* Ensure its saved in network byte order */
602        network->address = htonl(addr.s_addr);
603
604        /* Calculate the network address */
605        network->network = network->address & network->mask;
606
607        return 0;
608}
609
610static void usage(char *argv0) {
611        fprintf(stderr, "Usage:\n"
612        "%s inputURI output-interval\n"
613        "-i [inputURI] --set-uri [inputURI]\n"
614        "-o [output-interval] --output-interval [output-interval]\n"
615        "       Output statistical information every x seconds\n"
616        "-t [threads] --threads [threads]\n"
617        "-e [excluded-network] --exclude-network [excluded-network]\n"
618        "       Network to exclude from results\n"
619        "       e.g. -e 192.168.0.0/16 -e 10.0.0.0/8\n"
620        "-d [output-directory] --output-dir [output-directory]\n"
621        , argv0);
622        exit(1);
623}
624
625int main(int argc, char *argv[]) {
626
627        char *inputURI = NULL;
628        int threads = 4;
629        tickrate = 300;
630        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
631        exclude->count = 0;
632
633        while(1) {
634                int option_index;
635                struct option long_options[] = {
636                        { "set-uri",            1, 0, 'i' },
637                        { "output-interval",    1, 0, 'o' },
638                        { "threads",            1, 0, 't' },
639                        { "exclude-network",    1, 0, 'e' },
640                        { "output-dir",         1, 0, 'd' },
641                        { NULL,                 0, 0,  0  }
642                };
643
644                int c = getopt_long(argc, argv, "i:o:t:e:d:", long_options, &option_index);
645
646                if(c==-1) {
647                        break;
648                }
649
650                switch(c) {
651                        case 'i':
652                                inputURI = optarg;
653                                break;
654                        case 'o':
655                                tickrate = atoi(optarg);
656                                break;
657                        case 't':
658                                threads = atoi(optarg);
659                                break;
660                        case 'e':
661                                exclude->count += 1;
662                                if(exclude->count > 1) {
663                                        exclude->networks = realloc(exclude->networks, sizeof(struct network)*exclude->count);
664                                } else {
665                                        exclude->networks = malloc(sizeof(struct network));
666                                }
667                                if(get_network(optarg, &exclude->networks[exclude->count-1])) {
668                                        fprintf(stderr, "Error excluding network %s\n", optarg);
669                                        return 1;
670                                }
671                                break;
672                        case 'd':
673                                stats_outputdir = optarg;
674                                strcat(stats_outputdir, "/");
675                        case '?':
676                                break;
677                        default:
678                                fprintf(stderr, "Unknown option: %c\n", c);
679                                usage(argv[0]);
680                }
681        }
682
683        libtrace_t *trace;
684        /* Callbacks for processing and reporting threads */
685        libtrace_callback_set_t *processing, *reporter;
686
687        /* Ensure the input URI was supplied */
688        if(inputURI == NULL) {
689                usage(argv[0]);
690        }
691
692        /* Create the trace */
693        trace = trace_create(inputURI);
694        /* Ensure no error has occured creating the trace */
695        if(trace_is_err(trace)) {
696                trace_perror(trace, "Creating trace");
697                return 1;
698        }
699
700        /* Setup the processing threads */
701        processing = trace_create_callback_set();
702        trace_set_starting_cb(processing, start_callback);
703        trace_set_packet_cb(processing, per_packet);
704        trace_set_stopping_cb(processing, stop_processing);
705        trace_set_tick_interval_cb(processing, per_tick);
706        /* Setup the reporter threads */
707        reporter = trace_create_callback_set();
708        trace_set_starting_cb(reporter, start_reporter);
709        trace_set_result_cb(reporter, per_result);
710        trace_set_stopping_cb(reporter, stop_reporter);
711
712        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
713        trace_set_perpkt_threads(trace, threads);
714        /* Order the results by timestamp */
715        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
716        /* Try to balance the load across all processing threads */
717        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
718
719        /* Set the tick interval only if this is a live capture */
720        if(trace_get_information(trace)->live) {
721                /* tickrate is in seconds but tick_interval expects milliseconds */
722                trace_set_tick_interval(trace, tickrate*1000);
723        }
724        /* Do not buffer the reports */
725        trace_set_reporter_thold(trace, 1);
726
727        /* Start the trace, if it errors return */
728        if(trace_pstart(trace, exclude, processing, reporter)) {
729                trace_perror(trace, "Starting parallel trace");
730                libtrace_cleanup(trace, processing, reporter, exclude);
731                return 1;
732        }
733
734        /* This will wait for all threads to complete */
735        trace_join(trace);
736
737        /* Clean up everything */
738        libtrace_cleanup(trace, processing, reporter, exclude);
739
740        return 0;
741}
Note: See TracBrowser for help on using the repository browser.