source: tools/tracertstats/tracertstats.c @ bab946c

develop
Last change on this file since bab946c was bab946c, checked in by Shane Alcock <salcock@…>, 22 months ago

Fix clashes between global and local variable names in tools.

  • Property mode set to 100644
File size: 14.0 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28/* This program takes a series of traces and bpf filters and outputs how many
29 * bytes/packets every time interval
30 */
31
32#include <stdio.h>
33#include <stdlib.h>
34#include <assert.h>
35#include <string.h>
36#include <sys/time.h>
37#include <sys/types.h>
38#include <time.h>
39
40#include <netinet/in.h>
41#include <netinet/in_systm.h>
42#include <netinet/tcp.h>
43#include <netinet/ip.h>
44#include <netinet/ip_icmp.h>
45#include <arpa/inet.h>
46#include <sys/socket.h>
47#include <getopt.h>
48#include <inttypes.h>
49#include <signal.h>
50
51#include <lt_inttypes.h>
52#include "libtrace_parallel.h"
53#include "output.h"
54#include "rt_protocol.h"
55#include "dagformat.h"
56
57#ifndef UINT32_MAX
58        #define UINT32_MAX      0xffffffffU
59#endif
60
61#define DEFAULT_OUTPUT_FMT "txt"
62
63char *output_format=NULL;
64int merge_inputs = 0;
65int threadcount = 4;
66int filter_count=0;
67int burstsize=10;
68uint8_t report_drops = 0;
69
70struct filter_t {
71        char *expr;
72        struct libtrace_filter_t *filter;
73        uint64_t count;
74        uint64_t bytes;
75} *filters = NULL;
76
77uint64_t packet_count=UINT64_MAX;
78double packet_interval=UINT32_MAX;
79
80struct output_data_t *output = NULL;
81
82uint64_t totalcount;
83uint64_t totalbytes;
84
85struct libtrace_t *currenttrace;
86
87static void cleanup_signal(int signal UNUSED) {
88        if (currenttrace) {
89                trace_pstop(currenttrace);
90        }
91}
92
93static void report_results(double ts,uint64_t count,uint64_t bytes,
94                libtrace_stat_t *stats)
95{
96        int i=0, offset = 3;
97        output_set_data_time(output,0,ts);
98        output_set_data_int(output,1,count);
99        output_set_data_int(output,2,bytes);
100        if (stats) {
101                if (stats->dropped_valid) {
102                        output_set_data_int(output, 3, stats->dropped);
103                } else {
104                        output_set_data_int(output, 3, -1);
105                }
106                if (stats->missing_valid) {
107                        output_set_data_int(output, 4, stats->missing);
108                } else {
109                        output_set_data_int(output, 4, -1);
110                }
111                offset += 2;
112        }
113        for(i=0;i<filter_count;++i) {
114                output_set_data_int(output,i*2+offset,filters[i].count);
115                output_set_data_int(output,i*2+offset+1,filters[i].bytes);
116                filters[i].count=filters[i].bytes=0;
117        }
118        output_flush_row(output);
119}
120
121static void create_output(char *title) {
122        int i;
123       
124        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
125        if (!output) {
126                fprintf(stderr,"Failed to create output file\n");
127                return;
128        }
129        output_add_column(output,"ts");
130        output_add_column(output,"packets");
131        output_add_column(output,"bytes");
132        if (report_drops) {
133                output_add_column(output, "dropped");
134                output_add_column(output, "missing");
135        }
136        for(i=0;i<filter_count;++i) {
137                char buff[1024];
138                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
139                output_add_column(output,buff);
140                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
141                output_add_column(output,buff);
142        }
143        output_flush_headings(output);
144
145}
146
147typedef struct statistic {
148        uint64_t count;
149        uint64_t bytes;
150} statistic_t;
151
152typedef struct result {
153        struct statistic total;
154        struct statistic filters[0];
155} result_t;
156
157static uint64_t glob_last_ts = 0;
158static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED,
159                void *global UNUSED, void *tls UNUSED,
160                libtrace_result_t *result) {
161        uint64_t ts = 0;
162        static bool stopped = false;
163        static uint64_t packets_seen = 0;
164        int j;
165        result_t *res;
166        libtrace_stat_t *stats = NULL;
167
168        if (stopped)
169                return;
170
171        ts = result->key;
172        res = result->value.ptr;
173        if (glob_last_ts == 0)
174                glob_last_ts = ts;
175        if (report_drops) {
176                stats = trace_create_statistics();
177                trace_get_statistics(trace, stats);
178        }
179        while ((glob_last_ts >> 32) < (ts >> 32)) {
180                report_results(glob_last_ts >> 32, totalcount, totalbytes,
181                                stats);
182                totalcount = 0;
183                totalbytes = 0;
184                for (j = 0; j < filter_count; j++)
185                        filters[j].count = filters[j].bytes = 0;
186                glob_last_ts = ts;
187        }
188        totalcount += res->total.count;
189        packets_seen += res->total.count;
190        totalbytes += res->total.bytes;
191        for (j = 0; j < filter_count; j++) {
192                filters[j].count += res->filters[j].count;
193                filters[j].bytes += res->filters[j].bytes;
194        }
195        free(res);
196        if (stats) {
197                free(stats);
198        }
199
200        /* Be careful to only call pstop once from within this thread! */
201        if (packets_seen > packet_count) {
202                trace_pstop(trace);
203                stopped = true;
204        }
205}
206
207typedef struct threadlocal {
208        result_t *results;
209        uint64_t last_key;
210} thread_data_t;
211
212static void *cb_starting(libtrace_t *trace UNUSED,
213        libtrace_thread_t *t UNUSED, void *global UNUSED)
214{
215        thread_data_t *td = calloc(1, sizeof(thread_data_t));
216        td->results = calloc(1, sizeof(result_t) +
217                        sizeof(statistic_t) * filter_count);
218        return td;
219}
220
221static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
222                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
223
224        uint64_t key;
225        thread_data_t *td = (thread_data_t *)tls;
226        int i;
227        size_t wlen;
228
229        if (IS_LIBTRACE_META_PACKET(packet)) {
230                return packet;
231        }
232
233        key = trace_get_erf_timestamp(packet);
234        if ((key >> 32) >= (td->last_key >> 32) + packet_interval) {
235                libtrace_generic_t tmp = {.ptr = td->results};
236                trace_publish_result(trace, t, key,
237                                tmp, RESULT_USER);
238                trace_post_reporter(trace);
239                td->last_key = key;
240                td->results = calloc(1, sizeof(result_t) +
241                                sizeof(statistic_t) * filter_count);
242        }
243        wlen = trace_get_wire_length(packet);
244        if (wlen == 0) {
245                /* Don't count ERF provenance and similar packets */
246                return packet;
247        }
248        for(i=0;i<filter_count;++i) {
249                if(trace_apply_filter(filters[i].filter, packet)) {
250                        td->results->filters[i].count++;
251                        td->results->filters[i].bytes+=wlen;
252                }
253        }
254
255        td->results->total.count++;
256        td->results->total.bytes += wlen;
257        return packet;
258}
259
260static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
261                void *global UNUSED, void *tls) {
262
263        thread_data_t *td = (thread_data_t *)tls;
264        if (td->results->total.count) {
265                libtrace_generic_t tmp = {.ptr = td->results};
266                trace_publish_result(trace, t, td->last_key, tmp, RESULT_USER);
267                trace_post_reporter(trace);
268                td->results = NULL;
269        }
270}
271
272static void cb_tick(libtrace_t *trace, libtrace_thread_t *t,
273                void *global UNUSED, void *tls, uint64_t order) {
274
275        thread_data_t *td = (thread_data_t *)tls;
276        if (order > td->last_key) {
277                libtrace_generic_t tmp = {.ptr = td->results};
278                trace_publish_result(trace, t, order, tmp, RESULT_USER);
279                trace_post_reporter(trace);
280                td->last_key = order;
281                td->results = calloc(1, sizeof(result_t) +
282                                sizeof(statistic_t) * filter_count);
283        }
284}
285
286/* Process a trace, counting packets that match filter(s) */
287static void run_trace(char *uri)
288{
289        libtrace_t *trace = NULL;
290        libtrace_callback_set_t *pktcbs, *repcbs;
291        libtrace_stat_t *stats = NULL;
292
293        if (!merge_inputs) 
294                create_output(uri);
295
296        if (output == NULL)
297                return;
298
299        trace = trace_create(uri);
300        if (trace_is_err(trace)) {
301                trace_perror(trace,"trace_create");
302                trace_destroy(trace);
303                if (!merge_inputs)
304                        output_destroy(output);
305                return;
306        }
307        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
308        trace_set_perpkt_threads(trace, threadcount);
309        trace_set_burst_size(trace, burstsize);
310
311        if (trace_get_information(trace)->live) {
312                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
313        } else {
314                trace_set_tracetime(trace, true);
315        }
316
317        pktcbs = trace_create_callback_set();
318        trace_set_starting_cb(pktcbs, cb_starting);
319        trace_set_stopping_cb(pktcbs, cb_stopping);
320        trace_set_packet_cb(pktcbs, cb_packet);
321        trace_set_tick_count_cb(pktcbs, cb_tick);
322        trace_set_tick_interval_cb(pktcbs, cb_tick);
323
324        repcbs = trace_create_callback_set();
325        trace_set_result_cb(repcbs, cb_result);
326
327        currenttrace = trace;
328        if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) {
329                trace_perror(trace,"Failed to start trace");
330                trace_destroy(trace);
331                trace_destroy_callback_set(pktcbs);
332                trace_destroy_callback_set(repcbs);
333                if (!merge_inputs)
334                        output_destroy(output);
335                return;
336        }
337
338
339        // Wait for all threads to stop
340        trace_join(trace);
341       
342        // Flush the last one out
343        if (report_drops) {
344                stats = trace_create_statistics();
345                stats = trace_get_statistics(trace, stats);
346        }
347        report_results((glob_last_ts >> 32), totalcount, totalbytes, stats);
348        if (trace_is_err(trace))
349                trace_perror(trace,"%s",uri);
350
351        if (stats) {
352                free(stats);
353        }
354        trace_destroy(trace);
355        trace_destroy_callback_set(pktcbs);
356        trace_destroy_callback_set(repcbs);
357
358        if (!merge_inputs)
359                output_destroy(output);
360       
361}
362
363// TODO Decide what to do with -c option
364static void usage(char *argv0)
365{
366        fprintf(stderr,"Usage:\n"
367        "%s flags libtraceuri [libtraceuri...]\n"
368        "-i --interval=seconds  Duration of reporting interval in seconds\n"
369        "-c --count=packets     Exit after count packets have been processed\n"
370        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
371        "-o --output-format=txt|csv|html|png Reporting output format\n"
372        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
373        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
374        "-N --nobuffer          Disable packet buffering within libtrace to force faster\n"
375        "                       updates at very low traffic rates\n"
376        "-d --report-drops      Include statistics about number of packets dropped or\n"
377        "                       lost by the capture process\n"
378        "-h --help      Print this usage statement\n"
379        ,argv0);
380}
381
382int main(int argc, char *argv[]) {
383
384        int i;
385        struct sigaction sigact;
386       
387        while(1) {
388                int option_index;
389                struct option long_options[] = {
390                        { "filter",             1, 0, 'f' },
391                        { "interval",           1, 0, 'i' },
392                        { "count",              1, 0, 'c' },
393                        { "output-format",      1, 0, 'o' },
394                        { "help",               0, 0, 'h' },
395                        { "merge-inputs",       0, 0, 'm' },
396                        { "threads",            1, 0, 't' },
397                        { "nobuffer",           0, 0, 'N' },
398                        { "report-drops",       0, 0, 'd' },
399                        { NULL,                 0, 0, 0   },
400                };
401
402                int c=getopt_long(argc, argv, "c:f:i:o:t:dhmN",
403                                long_options, &option_index);
404
405                if (c==-1)
406                        break;
407
408                switch (c) {
409                        case 'd':
410                                report_drops = 1;
411                                break;
412                        case 'N':
413                                burstsize = 1;
414                                break;
415                        case 'f':
416                                ++filter_count;
417                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
418                                filters[filter_count-1].expr=strdup(optarg);
419                                filters[filter_count-1].filter=trace_create_filter(optarg);
420                                filters[filter_count-1].count=0;
421                                filters[filter_count-1].bytes=0;
422                                break;
423                        case 't':
424                                threadcount = atoi(optarg);
425                                if (threadcount <= 0)
426                                        threadcount = 1;
427                                break;
428                        case 'i':
429                                packet_interval=atof(optarg);
430                                break;
431                        case 'c':
432                                packet_count=strtoul(optarg, NULL, 10);
433                                break;
434                        case 'o':
435                                if (output_format) free(output_format);
436                                output_format=strdup(optarg);
437                                break;
438                        case 'm':
439                                merge_inputs = 1;
440                                break;
441                        case 'h':
442                                  usage(argv[0]);
443                                  return 1;
444                        default:
445                                fprintf(stderr,"Unknown option: %c\n",c);
446                                usage(argv[0]);
447                                return 1;
448                }
449        }
450
451        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
452                packet_interval = 60; /* every minute */
453        }
454
455        if (optind >= argc)
456                return 0;
457
458        if (output_format)
459                fprintf(stderr,"output format: '%s'\n",output_format);
460        else
461                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
462       
463       
464        if (merge_inputs) {
465                /* If we're merging the inputs, we only want to create all
466                 * the column headers etc. once rather than doing them once
467                 * per trace */
468
469                /* This is going to "name" the output based on the first
470                 * provided URI - admittedly not ideal */
471                create_output(argv[optind]);
472                if (output == NULL)
473                        return 0;
474        }
475       
476        sigact.sa_handler = cleanup_signal;
477        sigemptyset(&sigact.sa_mask);
478        sigact.sa_flags = SA_RESTART;
479
480        sigaction(SIGINT, &sigact, NULL);
481        sigaction(SIGTERM, &sigact, NULL);
482
483
484        for(i=optind;i<argc;++i) {
485                run_trace(argv[i]);
486        }
487
488        if (merge_inputs) {
489                /* Clean up after ourselves */
490                output_destroy(output);
491        }
492
493
494        return 0;
495}
Note: See TracBrowser for help on using the repository browser.