source: tools/tracertstats/tracertstats.c @ 1800ee3

develop
Last change on this file since 1800ee3 was 1800ee3, checked in by Shane Alcock <salcock@…>, 2 years ago

Add option to report packet drops to tracertstats.c

Note: this is going to report a cumulative counter rather
than the per-interval count that the rest of the columns do.

This can be fixed later, but will suffice for now.

  • Property mode set to 100644
File size: 13.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28/* This program takes a series of traces and bpf filters and outputs how many
29 * bytes/packets every time interval
30 */
31
32#include <stdio.h>
33#include <stdlib.h>
34#include <assert.h>
35#include <string.h>
36#include <sys/time.h>
37#include <sys/types.h>
38#include <time.h>
39
40#include <netinet/in.h>
41#include <netinet/in_systm.h>
42#include <netinet/tcp.h>
43#include <netinet/ip.h>
44#include <netinet/ip_icmp.h>
45#include <arpa/inet.h>
46#include <sys/socket.h>
47#include <getopt.h>
48#include <inttypes.h>
49#include <signal.h>
50
51#include <lt_inttypes.h>
52#include "libtrace_parallel.h"
53#include "output.h"
54#include "rt_protocol.h"
55#include "dagformat.h"
56
57#ifndef UINT32_MAX
58        #define UINT32_MAX      0xffffffffU
59#endif
60
61#define DEFAULT_OUTPUT_FMT "txt"
62
63char *output_format=NULL;
64int merge_inputs = 0;
65int threadcount = 4;
66int filter_count=0;
67int burstsize=10;
68uint8_t report_drops = 0;
69
70struct filter_t {
71        char *expr;
72        struct libtrace_filter_t *filter;
73        uint64_t count;
74        uint64_t bytes;
75} *filters = NULL;
76
77uint64_t packet_count=UINT64_MAX;
78double packet_interval=UINT32_MAX;
79
80struct output_data_t *output = NULL;
81
82uint64_t count;
83uint64_t bytes;
84
85struct libtrace_t *currenttrace;
86
87static void cleanup_signal(int signal UNUSED) {
88        if (currenttrace) {
89                trace_pstop(currenttrace);
90        }
91}
92
93static void report_results(double ts,uint64_t count,uint64_t bytes,
94                libtrace_stat_t *stats)
95{
96        int i=0, offset = 3;
97        output_set_data_time(output,0,ts);
98        output_set_data_int(output,1,count);
99        output_set_data_int(output,2,bytes);
100        if (stats) {
101                if (stats->dropped_valid) {
102                        output_set_data_int(output, 3, stats->dropped);
103                } else {
104                        output_set_data_int(output, 3, -1);
105                }
106                if (stats->missing_valid) {
107                        output_set_data_int(output, 4, stats->missing);
108                } else {
109                        output_set_data_int(output, 4, -1);
110                }
111                offset += 2;
112        }
113        for(i=0;i<filter_count;++i) {
114                output_set_data_int(output,i*2+offset,filters[i].count);
115                output_set_data_int(output,i*2+offset+1,filters[i].bytes);
116                filters[i].count=filters[i].bytes=0;
117        }
118        output_flush_row(output);
119}
120
121static void create_output(char *title) {
122        int i;
123       
124        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
125        if (!output) {
126                fprintf(stderr,"Failed to create output file\n");
127                return;
128        }
129        output_add_column(output,"ts");
130        output_add_column(output,"packets");
131        output_add_column(output,"bytes");
132        if (report_drops) {
133                output_add_column(output, "dropped");
134                output_add_column(output, "missing");
135        }
136        for(i=0;i<filter_count;++i) {
137                char buff[1024];
138                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
139                output_add_column(output,buff);
140                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
141                output_add_column(output,buff);
142        }
143        output_flush_headings(output);
144
145}
146
147typedef struct statistic {
148        uint64_t count;
149        uint64_t bytes;
150} statistic_t;
151
152typedef struct result {
153        struct statistic total;
154        struct statistic filters[0];
155} result_t;
156
157static uint64_t glob_last_ts = 0;
158static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED,
159                void *global UNUSED, void *tls UNUSED,
160                libtrace_result_t *result) {
161        uint64_t ts = 0;
162        static bool stopped = false;
163        static uint64_t packets_seen = 0;
164        int j;
165        result_t *res;
166        libtrace_stat_t *stats = NULL;
167
168        if (stopped)
169                return;
170
171        ts = result->key;
172        res = result->value.ptr;
173        if (glob_last_ts == 0)
174                glob_last_ts = ts;
175        if (report_drops) {
176                stats = trace_create_statistics();
177                trace_get_statistics(trace, stats);
178        }
179        while ((glob_last_ts >> 32) < (ts >> 32)) {
180                report_results(glob_last_ts >> 32, count, bytes, stats);
181                count = 0;
182                bytes = 0;
183                for (j = 0; j < filter_count; j++)
184                        filters[j].count = filters[j].bytes = 0;
185                glob_last_ts = ts;
186        }
187        count += res->total.count;
188        packets_seen += res->total.count;
189        bytes += res->total.bytes;
190        for (j = 0; j < filter_count; j++) {
191                filters[j].count += res->filters[j].count;
192                filters[j].bytes += res->filters[j].bytes;
193        }
194        free(res);
195        if (stats) {
196                free(stats);
197        }
198
199        /* Be careful to only call pstop once from within this thread! */
200        if (packets_seen > packet_count) {
201                trace_pstop(trace);
202                stopped = true;
203        }
204}
205
206typedef struct threadlocal {
207        result_t *results;
208        uint64_t last_key;
209} thread_data_t;
210
211static void *cb_starting(libtrace_t *trace UNUSED,
212        libtrace_thread_t *t UNUSED, void *global UNUSED)
213{
214        thread_data_t *td = calloc(1, sizeof(thread_data_t));
215        td->results = calloc(1, sizeof(result_t) +
216                        sizeof(statistic_t) * filter_count);
217        return td;
218}
219
220static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
221                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
222
223        uint64_t key;
224        thread_data_t *td = (thread_data_t *)tls;
225        int i;
226        size_t wlen;
227
228        if (IS_LIBTRACE_META_PACKET(packet)) {
229                return packet;
230        }
231
232        key = trace_get_erf_timestamp(packet);
233        if ((key >> 32) >= (td->last_key >> 32) + packet_interval) {
234                libtrace_generic_t tmp = {.ptr = td->results};
235                trace_publish_result(trace, t, key,
236                                tmp, RESULT_USER);
237                trace_post_reporter(trace);
238                td->last_key = key;
239                td->results = calloc(1, sizeof(result_t) +
240                                sizeof(statistic_t) * filter_count);
241        }
242        wlen = trace_get_wire_length(packet);
243        if (wlen == 0) {
244                /* Don't count ERF provenance and similar packets */
245                return packet;
246        }
247        for(i=0;i<filter_count;++i) {
248                if(trace_apply_filter(filters[i].filter, packet)) {
249                        td->results->filters[i].count++;
250                        td->results->filters[i].bytes+=wlen;
251                }
252        }
253
254        td->results->total.count++;
255        td->results->total.bytes += wlen;
256        return packet;
257}
258
259static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
260                void *global UNUSED, void *tls) {
261
262        thread_data_t *td = (thread_data_t *)tls;
263        if (td->results->total.count) {
264                libtrace_generic_t tmp = {.ptr = td->results};
265                trace_publish_result(trace, t, td->last_key, tmp, RESULT_USER);
266                trace_post_reporter(trace);
267                td->results = NULL;
268        }
269}
270
271static void cb_tick(libtrace_t *trace, libtrace_thread_t *t,
272                void *global UNUSED, void *tls, uint64_t order) {
273
274        thread_data_t *td = (thread_data_t *)tls;
275        if (order > td->last_key) {
276                libtrace_generic_t tmp = {.ptr = td->results};
277                trace_publish_result(trace, t, order, tmp, RESULT_USER);
278                trace_post_reporter(trace);
279                td->last_key = order;
280                td->results = calloc(1, sizeof(result_t) +
281                                sizeof(statistic_t) * filter_count);
282        }
283}
284
285/* Process a trace, counting packets that match filter(s) */
286static void run_trace(char *uri)
287{
288        libtrace_t *trace = NULL;
289        libtrace_callback_set_t *pktcbs, *repcbs;
290        libtrace_stat_t *stats = NULL;
291
292        if (!merge_inputs) 
293                create_output(uri);
294
295        if (output == NULL)
296                return;
297
298        trace = trace_create(uri);
299        if (trace_is_err(trace)) {
300                trace_perror(trace,"trace_create");
301                trace_destroy(trace);
302                if (!merge_inputs)
303                        output_destroy(output);
304                return;
305        }
306        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
307        trace_set_perpkt_threads(trace, threadcount);
308        trace_set_burst_size(trace, burstsize);
309
310        if (trace_get_information(trace)->live) {
311                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
312        } else {
313                trace_set_tracetime(trace, true);
314        }
315
316        pktcbs = trace_create_callback_set();
317        trace_set_starting_cb(pktcbs, cb_starting);
318        trace_set_stopping_cb(pktcbs, cb_stopping);
319        trace_set_packet_cb(pktcbs, cb_packet);
320        trace_set_tick_count_cb(pktcbs, cb_tick);
321        trace_set_tick_interval_cb(pktcbs, cb_tick);
322
323        repcbs = trace_create_callback_set();
324        trace_set_result_cb(repcbs, cb_result);
325
326        currenttrace = trace;
327        if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) {
328                trace_perror(trace,"Failed to start trace");
329                trace_destroy(trace);
330                trace_destroy_callback_set(pktcbs);
331                trace_destroy_callback_set(repcbs);
332                if (!merge_inputs)
333                        output_destroy(output);
334                return;
335        }
336
337
338        // Wait for all threads to stop
339        trace_join(trace);
340       
341        // Flush the last one out
342        if (report_drops) {
343                stats = trace_create_statistics();
344                stats = trace_get_statistics(trace, stats);
345        }
346        report_results((glob_last_ts >> 32), count, bytes, stats);
347        if (trace_is_err(trace))
348                trace_perror(trace,"%s",uri);
349
350        if (stats) {
351                free(stats);
352        }
353        trace_destroy(trace);
354        trace_destroy_callback_set(pktcbs);
355        trace_destroy_callback_set(repcbs);
356
357        if (!merge_inputs)
358                output_destroy(output);
359       
360}
361
362// TODO Decide what to do with -c option
363static void usage(char *argv0)
364{
365        fprintf(stderr,"Usage:\n"
366        "%s flags libtraceuri [libtraceuri...]\n"
367        "-i --interval=seconds  Duration of reporting interval in seconds\n"
368        "-c --count=packets     Exit after count packets have been processed\n"
369        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
370        "-o --output-format=txt|csv|html|png Reporting output format\n"
371        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
372        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
373        "-N --nobuffer          Disable packet buffering within libtrace to force faster\n"
374        "                       updates at very low traffic rates\n"
375        "-d --report-drops      Include statistics about number of packets dropped or\n"
376        "                       lost by the capture process\n"
377        "-h --help      Print this usage statement\n"
378        ,argv0);
379}
380
381int main(int argc, char *argv[]) {
382
383        int i;
384        struct sigaction sigact;
385       
386        while(1) {
387                int option_index;
388                struct option long_options[] = {
389                        { "filter",             1, 0, 'f' },
390                        { "interval",           1, 0, 'i' },
391                        { "count",              1, 0, 'c' },
392                        { "output-format",      1, 0, 'o' },
393                        { "help",               0, 0, 'h' },
394                        { "merge-inputs",       0, 0, 'm' },
395                        { "threads",            1, 0, 't' },
396                        { "nobuffer",           0, 0, 'N' },
397                        { "report-drops",       0, 0, 'd' },
398                        { NULL,                 0, 0, 0   },
399                };
400
401                int c=getopt_long(argc, argv, "c:f:i:o:t:dhmN",
402                                long_options, &option_index);
403
404                if (c==-1)
405                        break;
406
407                switch (c) {
408                        case 'd':
409                                report_drops = 1;
410                                break;
411                        case 'N':
412                                burstsize = 1;
413                                break;
414                        case 'f':
415                                ++filter_count;
416                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
417                                filters[filter_count-1].expr=strdup(optarg);
418                                filters[filter_count-1].filter=trace_create_filter(optarg);
419                                filters[filter_count-1].count=0;
420                                filters[filter_count-1].bytes=0;
421                                break;
422                        case 't':
423                                threadcount = atoi(optarg);
424                                if (threadcount <= 0)
425                                        threadcount = 1;
426                                break;
427                        case 'i':
428                                packet_interval=atof(optarg);
429                                break;
430                        case 'c':
431                                packet_count=strtoul(optarg, NULL, 10);
432                                break;
433                        case 'o':
434                                if (output_format) free(output_format);
435                                output_format=strdup(optarg);
436                                break;
437                        case 'm':
438                                merge_inputs = 1;
439                                break;
440                        case 'h':
441                                  usage(argv[0]);
442                                  return 1;
443                        default:
444                                fprintf(stderr,"Unknown option: %c\n",c);
445                                usage(argv[0]);
446                                return 1;
447                }
448        }
449
450        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
451                packet_interval = 60; /* every minute */
452        }
453
454        if (optind >= argc)
455                return 0;
456
457        if (output_format)
458                fprintf(stderr,"output format: '%s'\n",output_format);
459        else
460                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
461       
462       
463        if (merge_inputs) {
464                /* If we're merging the inputs, we only want to create all
465                 * the column headers etc. once rather than doing them once
466                 * per trace */
467
468                /* This is going to "name" the output based on the first
469                 * provided URI - admittedly not ideal */
470                create_output(argv[optind]);
471                if (output == NULL)
472                        return 0;
473        }
474       
475        sigact.sa_handler = cleanup_signal;
476        sigemptyset(&sigact.sa_mask);
477        sigact.sa_flags = SA_RESTART;
478
479        sigaction(SIGINT, &sigact, NULL);
480        sigaction(SIGTERM, &sigact, NULL);
481
482
483        for(i=optind;i<argc;++i) {
484                run_trace(argv[i]);
485        }
486
487        if (merge_inputs) {
488                /* Clean up after ourselves */
489                output_destroy(output);
490        }
491
492
493        return 0;
494}
Note: See TracBrowser for help on using the repository browser.