source: examples/stats/ipdist-parallel.c @ a7132cf

develop
Last change on this file since a7132cf was a7132cf, checked in by Jacob Van Walraven <jcv9@…>, 4 years ago

ipdist-parallel now reports cumulative count of lost packets to standard out, cleanup plotting script

  • Property mode set to 100644
File size: 23.7 KB
Line 
1#include "libtrace_parallel.h"
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <arpa/inet.h>
8#include <time.h>
9#include <math.h>
10#include <getopt.h>
11
12/* Structure to hold the counters each thread has its own one of these */
13struct addr_local {
14        /* Holds the counts of each number occurance per octet, These are cleared after every output. */
15        uint64_t src[4][256];
16        uint64_t dst[4][256];
17        /* Holds the results from the previous output */
18        uint64_t src_lastoutput[4][256];
19        uint64_t dst_lastoutput[4][256];
20        /* Holds the timestamp */
21        uint64_t lastkey;
22        /* Is the count of the number of packets processed, This is cleared after every output. */
23        uint64_t packets;
24        /* Total number an output has been generated */
25        uint64_t output_count;
26        /* Pointer to stats structure */
27        struct addr_stats *stats;
28        uint64_t lost_packets;
29};
30struct addr_stats {
31        /* Holds the percentage change compared to the previous output */
32        float src[4][256];
33        float dst[4][256];
34        /* Stats calculated independently per output */
35        double mode_src[4];
36        double mode_dst[4];
37        double mean_src[4];
38        double mean_dst[4];
39        double median_src[4];
40        double median_dst[4];
41        double stddev_src[4];
42        double stddev_dst[4];
43        double variance_src[4];
44        double variance_dst[4];
45        double skewness_src[4];
46        double skewness_dst[4];
47        struct addr_rank *rank_src[4];
48        struct addr_rank *rank_dst[4];
49};
50struct addr_rank {
51        uint8_t addr;
52        /* count is the priority */
53        uint64_t count;
54        /* pointer to next ranking item */
55        struct addr_rank* next;
56};
57
58/* Structure to hold excluded networks */
59struct exclude_networks {
60        int count;
61        struct network *networks;
62};
63struct network {
64        uint32_t address;
65        uint32_t mask;
66        uint32_t network;
67};
68
69uint64_t tickrate;
70char *stats_outputdir = "";
71
72/*************************************************************************
73Priority queue linked list */
74
75static struct addr_rank *rank_new(uint8_t addr, uint64_t count) {
76        struct addr_rank *tmp = malloc(sizeof(struct addr_rank));
77        tmp->addr = addr;
78        tmp->count = count;
79        tmp->next = NULL;
80
81        return tmp;
82}
83static uint8_t peak(struct addr_rank **head) {
84        return (*head)->addr;
85}
86static uint64_t peak_count(struct addr_rank **head) {
87        return (*head)->count;
88}
89static void pop(struct addr_rank **head) {
90        struct addr_rank* tmp = *head;
91        (*head) = (*head)->next;
92        free(tmp);
93}
94static void push(struct addr_rank **head, uint8_t addr, uint64_t count) {
95        struct addr_rank *curr = (*head);
96        struct addr_rank *tmp = rank_new(addr, count);
97
98        /* Check if the new node has a greater priority than the head */
99        if((*head)->count < count) {
100                tmp->next = *head;
101                (*head) = tmp;
102        } else {
103                /* Jump through the list until we find the correct position */
104                while (curr->next != NULL && curr->next->count > count) {
105                        curr = curr->next;
106                }
107
108                tmp->next = curr->next;
109                curr->next = tmp;
110        }
111}
112/*************************************************************************/
113
114
115static void compute_stats(struct addr_local *tally) {
116        int i, j, k;
117
118        /* To get ranking we push everything into the priority queue at pop things off 1 by one which returns them in high to lowest priority */
119        for(i=0;i<4;i++) {
120                tally->stats->rank_src[i] = rank_new(0, tally->src[i][0]);
121                tally->stats->rank_dst[i] = rank_new(0, tally->dst[i][0]);
122                for(j=1;j<256;j++) {
123                        /* Push everything into the priority queue
124                         * each item will be popped off in the correct order */
125                        push(&tally->stats->rank_src[i], j, tally->src[i][j]);
126                        push(&tally->stats->rank_dst[i], j, tally->dst[i][j]);
127                }
128        }
129
130        /* Calculate mean, variance and standard deviation */
131        for(k=0;k<4;k++) {
132
133                double ex = 0;
134                double ex2 = 0;
135                double n = 0;
136                double m = 0;
137                for(i=0;i<256;i++) {
138                        for(j=0;j<tally->src[k][i];j++) {
139                                if(n == 0) {
140                                        m = i;
141                                }
142                                n += 1;
143                                ex += (i - m);
144                                ex2 += ((i - m) * (i - m));
145                        }
146                }
147
148                tally->stats->mean_src[k] = (k + (ex / n));
149                tally->stats->variance_src[k] = ((ex2 - (ex*ex)/n) / n);
150                tally->stats->stddev_src[k] = sqrt(tally->stats->variance_src[k]);
151
152                ex = 0;
153                ex2 = 0;
154                n = 0;
155                m = 0;
156                for(i=0;i<256;i++) {
157                        for(j=0;j<tally->dst[k][i];j++) {
158                                if(n == 0) {
159                                        m = i;
160                                }
161                                n += 1;
162                                ex += (i - m);
163                                ex2 += ((i - m) * (i - m));
164                        }
165                }
166                tally->stats->mean_dst[k] = (k + (ex / n));
167                tally->stats->variance_dst[k] = ((ex2 - (ex*ex)/n) / n);
168                tally->stats->stddev_dst[k] = sqrt(tally->stats->variance_dst[k]);
169                /* Get the median */
170                int c = (n/2) - tally->src[k][0];
171                int c2 = 0;
172                while(c > 0) {
173                        c2 += 1;
174                        c -= tally->src[k][c2];
175                }
176                tally->stats->median_src[k] = c2;
177                c = (n/2) - tally->dst[k][0];
178                c2 = 0;
179                while(c > 0) {
180                        c2 += 1;
181                        c -= tally->dst[k][c2];
182                }
183                tally->stats->median_dst[k] = c2;
184                /* Get the mode which is the first item in the priority queue */
185                tally->stats->mode_src[k] = peak(&tally->stats->rank_src[k]);
186                tally->stats->mode_dst[k] = peak(&tally->stats->rank_src[k]);
187                /* Calculate skewness */
188                tally->stats->skewness_src[k] = (tally->stats->mean_src[k] - tally->stats->median_src[k]) / tally->stats->stddev_src[k];
189                tally->stats->skewness_dst[k] = (tally->stats->mean_dst[k] - tally->stats->median_dst[k]) / tally->stats->stddev_dst[k];
190        }
191
192}
193
194static void per_tick(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls, uint64_t tick) {
195
196        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
197        /* Proccessing thread local storage */
198        struct addr_local *local = (struct addr_local *)tls;
199
200        /* Populate the result structure from the threads local storage and clear threads local storage*/
201        int i, j;
202        for(i=0;i<4;i++) {
203                for(j=0;j<256;j++) {
204                        result->src[i][j] = local->src[i][j];
205                        result->dst[i][j] = local->dst[i][j];
206                        /* clear local storage */
207                        local->src[i][j] = 0;
208                        local->dst[i][j] = 0;
209                }
210        }
211        result->packets = local->packets;
212        local->packets = 0;
213
214        /* Push result to the combiner */
215        trace_publish_result(trace, thread, tick, (libtrace_generic_t){.ptr=result}, RESULT_USER);
216}
217
218/* Start callback function - This is run for each thread when it starts */
219static void *start_callback(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
220
221        /* Create and initialize the local counter struct */
222        struct addr_local *local = (struct addr_local *)malloc(sizeof(struct addr_local));
223        int i, j;
224        for(i=0;i<4;i++) {
225                for(j=0;j<256;j++) {
226                        local->src[i][j] = 0;
227                        local->dst[i][j] = 0;
228                }
229        }
230        local->lastkey = 0;
231        local->packets = 0;
232
233        /* return the local storage so it is available for all other callbacks for the thread*/
234        return local;
235}
236
237/* Checks if address is part of a excluded subnet. */
238static int network_excluded(uint32_t address, struct exclude_networks *exclude) {
239
240        int i;
241        for(i=0;i<exclude->count;i++) {
242                /* Convert address into a network address */
243                uint32_t net_addr = address & exclude->networks[i].mask;
244
245                /* If this matches the network address from the excluded list we need to exclude this
246                   address. */
247                if(net_addr == exclude->networks[i].network) {
248                        return 1;
249                }
250        }
251
252        /* If we got this far the address should not be excluded */
253        return 0;
254}
255
256static void process_ip(struct sockaddr *ip, struct addr_local *local, struct exclude_networks *exclude, int srcaddr) {
257
258        /* Checks if the ip is of type IPv4 */
259        if (ip->sa_family == AF_INET) {
260
261                /* IPv4 - cast the generic sockaddr to a sockaddr_in */
262                struct sockaddr_in *v4 = (struct sockaddr_in *)ip;
263                /* Get in_addr from sockaddr */
264                struct in_addr ip4 = (struct in_addr)v4->sin_addr;
265                /* Ensure the address is in network byte order */
266                uint32_t address = htonl(ip4.s_addr);
267
268                /* Check if the address is part of an excluded network. */
269                if(network_excluded(address, exclude) == 0) {
270
271                        /* Split the IPv4 address into each octet */
272                        uint8_t octet[4];
273                        octet[0] = (address & 0xff000000) >> 24;
274                        octet[1] = (address & 0x00ff0000) >> 16;
275                        octet[2] = (address & 0x0000ff00) >> 8;
276                        octet[3] = (address & 0x000000ff);
277
278                        /* check if the supplied address was a source or destination,
279                           increment the correct one */
280                        if(srcaddr) {
281                                int i;
282                                for(i=0;i<4;i++) {
283                                        local->src[i][octet[i]] += 1;
284                                }
285                        } else {
286                                int i;
287                                for(i=0;i<4;i++) {
288                                        local->dst[i][octet[i]] += 1;
289                                }
290                        }
291                }
292        }
293}
294
295/* Per packet callback function run by each thread */
296static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls,
297        libtrace_packet_t *packet) {
298
299        /* Regain access to the address counter structure */
300        struct addr_local *local = (struct addr_local *)tls;
301
302        /* If this is the first packet set the lastkey to the packets timestamp */
303        if(local->lastkey == 0) {
304                local->lastkey = trace_get_erf_timestamp(packet);
305        }
306
307        /* Increment the packet count */
308        local->packets += 1;
309
310        /* Regain access to excluded networks pointer */
311        struct exclude_networks *exclude = (struct exclude_networks *)global;
312
313        struct sockaddr_storage addr;
314        struct sockaddr *ip;
315
316        /* Get the source IP address */
317        ip = trace_get_source_address(packet, (struct sockaddr *)&addr);
318        /* If a source ip address was found */
319        if(ip != NULL) {
320                process_ip(ip, local, exclude, 1);
321        }
322
323        /* Get the destination IP address */
324        ip = trace_get_destination_address(packet, (struct sockaddr *)&addr);
325        /* If a destination ip address was found */
326        if(ip != NULL) {
327                process_ip(ip, local, exclude, 0);
328        }
329
330        /* If this trace is not live we will manually call "per tick" */
331        if(!trace_get_information(trace)->live) {
332                /* get the current packets timestamp */
333                uint64_t timestamp = trace_get_erf_timestamp(packet);
334
335                /* We only want to call per_tick if we are due to output something
336                 * Right shifting these converts them to seconds, tickrate is in seconds */
337                if((timestamp >> 32) >= (local->lastkey >> 32) + tickrate) {
338                        per_tick(trace, thread, global, local, timestamp);
339                        local->lastkey = timestamp;
340                }
341        }
342
343        /* Return the packet to libtrace */
344        return packet;
345}
346
347/* Stopping callback function - When a thread closes */
348static void stop_processing(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
349
350        /* cast the local storage structure */
351        struct addr_local *local = (struct addr_local *)tls;
352        /* Create structure to store the result */
353        struct addr_local *result = (struct addr_local *)malloc(sizeof(struct addr_local));
354
355        /* Populate the result */
356        int i, j;
357        for(i=0;i<4;i++) {
358                for(j=0;j<256;j++) {
359                        result->src[i][j] = local->src[i][j];
360                        result->dst[i][j] = local->dst[i][j];
361                }
362        }
363        result->packets = local->packets;
364
365        /* Send the final results to the combiner */
366        trace_publish_result(trace, thread, 0, (libtrace_generic_t){.ptr=result}, RESULT_USER);
367
368        /* Cleanup the local storage */
369        free(local);
370}
371
372/* Starting callback for reporter thread */
373static void *start_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global) {
374        /* Create tally structure */
375        struct addr_local *tally = (struct addr_local *)malloc(sizeof(struct addr_local));
376        tally->stats = malloc(sizeof(struct addr_stats));
377
378        /* Initialize the tally structure */
379        int i, j;
380        for(i=0;i<4;i++) {
381                for(j=0;j<256;j++) {
382                        tally->src[i][j] = 0;
383                        tally->dst[i][j] = 0;
384                        tally->src_lastoutput[i][j] = 0;
385                        tally->dst_lastoutput[i][j] = 0;
386                        tally->stats->src[i][j] = 0;
387                        tally->stats->dst[i][j] = 0;
388                }
389                /* Stats related varibles */
390                tally->stats->mode_src[i] = 0;
391                tally->stats->mode_dst[i] = 0;
392                tally->stats->mean_src[i] = 0;
393                tally->stats->mean_dst[i] = 0;
394                tally->stats->median_src[i] = 0;
395                tally->stats->median_dst[i] = 0;
396                tally->stats->stddev_src[i] = 0;
397                tally->stats->stddev_dst[i] = 0;
398                tally->stats->variance_src[i] = 0;
399                tally->stats->variance_dst[i] = 0;
400                tally->stats->skewness_src[i] = 0;
401                tally->stats->skewness_dst[i] = 0;
402        }
403        tally->lost_packets = 0;
404        tally->lastkey = 0;
405        tally->packets = 0;
406        tally->output_count = 0;
407
408        return tally;
409}
410
411static void output_results(struct addr_local *tally, uint64_t tick) {
412
413        int i, j;
414
415        /* Calculations before reporting the results */
416        /* Need to initialise lastoutput values on first pass,
417         * this is so we have a base line for percentage changed */
418        if(tally->output_count == 0) {
419                for(i=0;i<4;i++) {
420                        for(j=0;j<256;j++) {
421                                tally->src_lastoutput[i][j] = tally->src[i][j];
422                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
423                        }
424                }
425         }
426        /* Compute the stats */
427        compute_stats(tally);
428
429        /* Finaly output the results */
430        printf("Generating output \"%s/ipdist-%lu\" Packets lost: %lu\n", stats_outputdir, tick, tally->lost_packets);
431
432        /* Output the results */
433        char outputfile[255];
434        snprintf(outputfile, sizeof(outputfile), "%s/ipdist-%lu.data", stats_outputdir, tick);
435        FILE *tmp = fopen(outputfile, "w");
436        fprintf(tmp, "#time\t\trank\toctet1\t\t\t\toctet2\t\t\t\toctet3\t\t\t\toctet4\n");
437        fprintf(tmp, "#\t\t\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\tsrc\thits\tdst\thits\n");
438        for(i=0;i<256;i++) {
439                fprintf(tmp, "%lu\t%d", tick, i+1);
440                for(j=0;j<4;j++) {
441                        /* Get the highest ranking to lowest ranking octets */
442                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_src[j]));
443                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_src[j]));
444                        fprintf(tmp, "\t%u", peak(&tally->stats->rank_dst[j]));
445                        fprintf(tmp, "\t%lu", peak_count(&tally->stats->rank_dst[j]));
446                        pop(&tally->stats->rank_src[j]);
447                        pop(&tally->stats->rank_dst[j]);
448                }
449                fprintf(tmp, "\n");
450        }
451        fclose(tmp);
452
453        char outputfile_stats[255];
454        snprintf(outputfile_stats, sizeof(outputfile_stats), "%s/ipdist-%lu.stats", stats_outputdir, tick);
455        tmp = fopen(outputfile_stats, "w");
456        /* append stats data to end of file */
457        fprintf(tmp, "#\tmean\tstddev\tvariance\tmedian\tmode\tskewness\n");
458        for(i=0;i<4;i++) {
459                fprintf(tmp, "src%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_src[i], tally->stats->stddev_src[i], tally->stats->variance_src[i], tally->stats->median_src[i], tally->stats->mode_src[i], tally->stats->skewness_src[i]);
460                fprintf(tmp, "dst%d\t%0.f\t%0.f\t%0.f\t\t%0.f\t%0.f\t%f\n", i+1, tally->stats->mean_dst[i], tally->stats->stddev_dst[i], tally->stats->variance_dst[i], tally->stats->median_src[i], tally->stats->mode_dst[i], tally->stats->skewness_dst[i]);
461                fprintf(tmp, "\n\n");
462        }
463        fclose(tmp);
464}
465
466
467/* Callback when a result is given to the reporter thread */
468static void per_result(libtrace_t *trace, libtrace_thread_t *sender, void *global,
469        void *tls, libtrace_result_t *result) {
470
471        struct addr_local *results;
472        struct addr_local *tally;
473        uint64_t key;
474
475        /* We only want to handle results containing our user-defined structure  */
476        if(result->type != RESULT_USER) {
477                return;
478        }
479
480        /* This key is the key that was passed into trace_publish_results
481         * this will contain the erf timestamp for the packet */
482        key = result->key;
483
484        /* result->value is a libtrace_generic_t that was passed into trace_publish_results() */
485        results = (struct addr_local *)result->value.ptr;
486
487        /* Grab our tally out of thread local storage */
488        tally = (struct addr_local *)tls;
489
490        /* Add all the results to the tally */
491        int i, j;
492        for(i=0;i<4;i++) {
493                for(j=0;j<256;j++) {
494                        tally->src[i][j] += results->src[i][j];
495                        tally->dst[i][j] += results->dst[i][j];
496                }
497        }
498        tally->packets += results->packets;
499
500        /* Increment lost packets counter */
501        struct libtrace_stat_t *statistics = trace_get_statistics(trace, NULL);
502        if(statistics->dropped > tally->lost_packets) {
503                /* update lost packets to the new number of dropped packets */
504                tally->lost_packets = statistics->dropped;
505        }
506
507        /* If the current timestamp is greater than the last printed plus the interval, output a result */
508        if((key >> 32) >= (tally->lastkey >> 32) + tickrate) {
509
510                /* update last key */
511                tally->lastkey = key;
512
513                /* Output the results with the key in epoch seconds*/
514                output_results(tally, key >> 32);
515
516                /* increment the output counter */
517                tally->output_count++;
518
519                /* clear the tally but copy old values over first*/
520                for(i=0;i<4;i++) {
521                        for(j=0;j<256;j++) {
522                                tally->src_lastoutput[i][j] = tally->src[i][j];
523                                tally->dst_lastoutput[i][j] = tally->dst[i][j];
524                                tally->src[i][j] = 0;
525                                tally->dst[i][j] = 0;
526                        }
527                        /* Clear all the stats data */
528                        tally->stats->mode_src[i] = 0;
529                        tally->stats->mode_dst[i] = 0;
530                        tally->stats->mean_src[i] = 0;
531                        tally->stats->mean_dst[i] = 0;
532                        tally->stats->median_src[i] = 0;
533                        tally->stats->median_dst[i] = 0;
534                        tally->stats->stddev_src[i] = 0;
535                        tally->stats->stddev_dst[i] = 0;
536                        tally->stats->variance_src[i] = 0;
537                        tally->stats->variance_dst[i] = 0;
538                        tally->stats->skewness_src[i] = 0;
539                        tally->stats->skewness_dst[i] = 0;
540                }
541                /* free the priority queue */
542                for(i=0;i<4;i++) {
543                        free(tally->stats->rank_src[i]);
544                        free(tally->stats->rank_dst[i]);
545                }
546
547                tally->packets = 0;
548
549        }
550
551        /* Cleanup the thread results */
552        free(results);
553}
554
555/* Callback when the reporter thread stops (essentially when the program ends) */
556static void stop_reporter(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls) {
557
558        /* Get the tally from the thread local storage */
559        struct addr_local *tally = (struct addr_local *)tls;
560
561        /* If there is any remaining data in the tally plot it */
562        if(tally->packets > 0) {
563                /* Then output the results */
564                output_results(tally, (tally->lastkey >> 32) + 1);
565        }
566        /* Cleanup tally results*/
567        free(tally);
568}
569
570static void libtrace_cleanup(libtrace_t *trace, libtrace_callback_set_t *processing,
571        libtrace_callback_set_t *reporting, struct exclude_networks *exclude) {
572        /* Only destroy if the structure exists */
573        if(trace) {
574                trace_destroy(trace);
575        }
576        if(processing) {
577                trace_destroy_callback_set(processing);
578        }
579        if(reporting) {
580                trace_destroy_callback_set(reporting);
581        }
582        if(exclude->count > 0) {
583                free(exclude->networks);
584        }
585        if(exclude) {
586                free(exclude);
587        }
588}
589
590/* Converts a string representation eg 1.2.3.4/24 into a network structure */
591static int get_network(char *network_string, struct network *network) {
592
593        char delim[] = "/";
594        /* Split the address and mask portion of the string */
595        char *address = strtok(network_string, delim);
596        char *mask = strtok(NULL, delim);
597
598        /* Check the subnet mask is valid */
599        if(atoi(mask) == 0 || atoi(mask) > 32 || atoi(mask) < 0) {
600                return 1;
601        }
602        /* right shift so netmask is in network byte order */
603        network->mask = 0xffffffff << (32 - atoi(mask));
604
605        struct in_addr addr;
606        /* Convert address string into uint32_t and check its valid */
607        if(inet_aton(address, &addr) == 0) {
608                return 2;
609        }
610        /* Ensure its saved in network byte order */
611        network->address = htonl(addr.s_addr);
612
613        /* Calculate the network address */
614        network->network = network->address & network->mask;
615
616        return 0;
617}
618
619static void usage(char *argv0) {
620        fprintf(stderr, "Usage:\n"
621        "%s inputURI output-interval\n"
622        "-i [inputURI] --set-uri [inputURI]\n"
623        "-o [output-interval] --output-interval [output-interval]\n"
624        "       Output statistical information every x seconds\n"
625        "-t [threads] --threads [threads]\n"
626        "-e [excluded-network] --exclude-network [excluded-network]\n"
627        "       Network to exclude from results\n"
628        "       e.g. -e 192.168.0.0/16 -e 10.0.0.0/8\n"
629        "-d [output-directory] --output-dir [output-directory]\n"
630        , argv0);
631        exit(1);
632}
633
634int main(int argc, char *argv[]) {
635
636        char *inputURI = NULL;
637        int threads = 4;
638        tickrate = 300;
639        struct exclude_networks *exclude = malloc(sizeof(struct exclude_networks));
640        exclude->count = 0;
641
642        while(1) {
643                int option_index = 0;
644                struct option long_options[] = {
645                        { "set-uri",            1, 0, 'i' },
646                        { "output-interval",    1, 0, 'o' },
647                        { "threads",            1, 0, 't' },
648                        { "exclude-network",    1, 0, 'e' },
649                        { "output-dir",         1, 0, 'd' },
650                        { NULL,                 0, 0,  0  }
651                };
652
653                int c = getopt_long(argc, argv, "i:o:t:e:d:", long_options, &option_index);
654
655                if(c==-1) {
656                        break;
657                }
658
659                switch(c) {
660                        case 'i':
661                                inputURI = optarg;
662                                break;
663                        case 'o':
664                                tickrate = atoi(optarg);
665                                break;
666                        case 't':
667                                threads = atoi(optarg);
668                                break;
669                        case 'e':
670                                exclude->count += 1;
671                                if(exclude->count > 1) {
672                                        exclude->networks = realloc(exclude->networks, sizeof(struct network)*exclude->count);
673                                } else {
674                                        exclude->networks = malloc(sizeof(struct network));
675                                }
676                                if(get_network(optarg, &exclude->networks[exclude->count-1])) {
677                                        fprintf(stderr, "Error excluding network %s\n", optarg);
678                                        return 1;
679                                }
680                                break;
681                        case 'd':
682                                stats_outputdir = optarg;
683                                break;
684                        case '?':
685                                break;
686                        default:
687                                fprintf(stderr, "Unknown option: %c\n", c);
688                                usage(argv[0]);
689                }
690        }
691
692        libtrace_t *trace;
693        /* Callbacks for processing and reporting threads */
694        libtrace_callback_set_t *processing, *reporter;
695
696        /* Ensure the input URI was supplied */
697        if(inputURI == NULL) {
698                usage(argv[0]);
699        }
700
701        /* Create the trace */
702        trace = trace_create(inputURI);
703        /* Ensure no error has occured creating the trace */
704        if(trace_is_err(trace)) {
705                trace_perror(trace, "Creating trace");
706                return 1;
707        }
708
709        /* Setup the processing threads */
710        processing = trace_create_callback_set();
711        trace_set_starting_cb(processing, start_callback);
712        trace_set_packet_cb(processing, per_packet);
713        trace_set_stopping_cb(processing, stop_processing);
714        trace_set_tick_interval_cb(processing, per_tick);
715        /* Setup the reporter threads */
716        reporter = trace_create_callback_set();
717        trace_set_starting_cb(reporter, start_reporter);
718        trace_set_result_cb(reporter, per_result);
719        trace_set_stopping_cb(reporter, stop_reporter);
720
721        /* Parallel specific configuration MUST BE PERFORMED AFTER TRACE IS CREATED */
722        trace_set_perpkt_threads(trace, threads);
723        /* Order the results by timestamp */
724        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
725        /* Try to balance the load across all processing threads */
726        trace_set_hasher(trace, HASHER_BALANCE, NULL, NULL);
727
728        /* Set the tick interval only if this is a live capture */
729        if(trace_get_information(trace)->live) {
730                /* tickrate is in seconds but tick_interval expects milliseconds */
731                trace_set_tick_interval(trace, tickrate*1000);
732        }
733        /* Do not buffer the reports */
734        trace_set_reporter_thold(trace, 1);
735
736        /* Start the trace, if it errors return */
737        if(trace_pstart(trace, exclude, processing, reporter)) {
738                trace_perror(trace, "Starting parallel trace");
739                libtrace_cleanup(trace, processing, reporter, exclude);
740                return 1;
741        }
742
743        /* This will wait for all threads to complete */
744        trace_join(trace);
745
746        /* Clean up everything */
747        libtrace_cleanup(trace, processing, reporter, exclude);
748
749        return 0;
750}
Note: See TracBrowser for help on using the repository browser.