source: tools/tracertstats/tracertstats.c @ 02d13cb

cachetimestampsdevelopetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 02d13cb was 02d13cb, checked in by Shane Alcock <salcock@…>, 3 years ago

Don't count ERF provenance records during tracestats, tracertstats

Or any other records with 0 wire length for that matter.

This was especially off-putting when using filters as provenance
records automatically match all filters, so you could get some
very misleading results.

  • Property mode set to 100644
File size: 12.4 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28/* This program takes a series of traces and bpf filters and outputs how many
29 * bytes/packets every time interval
30 */
31
32#include <stdio.h>
33#include <stdlib.h>
34#include <assert.h>
35#include <string.h>
36#include <sys/time.h>
37#include <sys/types.h>
38#include <time.h>
39
40#include <netinet/in.h>
41#include <netinet/in_systm.h>
42#include <netinet/tcp.h>
43#include <netinet/ip.h>
44#include <netinet/ip_icmp.h>
45#include <arpa/inet.h>
46#include <sys/socket.h>
47#include <getopt.h>
48#include <inttypes.h>
49#include <signal.h>
50
51#include <lt_inttypes.h>
52#include "libtrace_parallel.h"
53#include "output.h"
54#include "rt_protocol.h"
55#include "dagformat.h"
56
57#ifndef UINT32_MAX
58        #define UINT32_MAX      0xffffffffU
59#endif
60
61#define DEFAULT_OUTPUT_FMT "txt"
62
63char *output_format=NULL;
64int merge_inputs = 0;
65int threadcount = 4;
66int filter_count=0;
67int burstsize=10;
68
69struct filter_t {
70        char *expr;
71        struct libtrace_filter_t *filter;
72        uint64_t count;
73        uint64_t bytes;
74} *filters = NULL;
75
76uint64_t packet_count=UINT64_MAX;
77double packet_interval=UINT32_MAX;
78
79struct output_data_t *output = NULL;
80
81uint64_t count;
82uint64_t bytes;
83
84struct libtrace_t *currenttrace;
85
86static void cleanup_signal(int signal UNUSED) {
87        if (currenttrace) {
88                trace_pstop(currenttrace);
89        }
90}
91
92static void report_results(double ts,uint64_t count,uint64_t bytes)
93{
94        int i=0;
95        output_set_data_time(output,0,ts);
96        output_set_data_int(output,1,count);
97        output_set_data_int(output,2,bytes);
98        for(i=0;i<filter_count;++i) {
99                output_set_data_int(output,i*2+3,filters[i].count);
100                output_set_data_int(output,i*2+4,filters[i].bytes);
101                filters[i].count=filters[i].bytes=0;
102        }
103        output_flush_row(output);
104}
105
106static void create_output(char *title) {
107        int i;
108       
109        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
110        if (!output) {
111                fprintf(stderr,"Failed to create output file\n");
112                return;
113        }
114        output_add_column(output,"ts");
115        output_add_column(output,"packets");
116        output_add_column(output,"bytes");
117        for(i=0;i<filter_count;++i) {
118                char buff[1024];
119                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
120                output_add_column(output,buff);
121                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
122                output_add_column(output,buff);
123        }
124        output_flush_headings(output);
125
126}
127
128typedef struct statistic {
129        uint64_t count;
130        uint64_t bytes;
131} statistic_t;
132
133typedef struct result {
134        struct statistic total;
135        struct statistic filters[0];
136} result_t;
137
138static uint64_t glob_last_ts = 0;
139static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED,
140                void *global UNUSED, void *tls UNUSED,
141                libtrace_result_t *result) {
142        uint64_t ts = 0;
143        static bool stopped = false;
144        static uint64_t packets_seen = 0;
145        int j;
146        result_t *res;
147
148        if (stopped)
149                return;
150
151        ts = result->key;
152        res = result->value.ptr;
153        if (glob_last_ts == 0)
154                glob_last_ts = ts;
155        while ((glob_last_ts >> 32) < (ts >> 32)) {
156                report_results(glob_last_ts >> 32, count, bytes);
157                count = 0;
158                bytes = 0;
159                for (j = 0; j < filter_count; j++)
160                        filters[j].count = filters[j].bytes = 0;
161                glob_last_ts = ts;
162        }
163        count += res->total.count;
164        packets_seen += res->total.count;
165        bytes += res->total.bytes;
166        for (j = 0; j < filter_count; j++) {
167                filters[j].count += res->filters[j].count;
168                filters[j].bytes += res->filters[j].bytes;
169        }
170        free(res);
171
172        /* Be careful to only call pstop once from within this thread! */
173        if (packets_seen > packet_count) {
174                trace_pstop(trace);
175                stopped = true;
176        }
177}
178
179typedef struct threadlocal {
180        result_t *results;
181        uint64_t last_key;
182} thread_data_t;
183
184static void *cb_starting(libtrace_t *trace UNUSED,
185        libtrace_thread_t *t UNUSED, void *global UNUSED)
186{
187        thread_data_t *td = calloc(1, sizeof(thread_data_t));
188        td->results = calloc(1, sizeof(result_t) +
189                        sizeof(statistic_t) * filter_count);
190        return td;
191}
192
193static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
194                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
195
196        uint64_t key;
197        thread_data_t *td = (thread_data_t *)tls;
198        int i;
199        size_t wlen;
200
201        if (IS_LIBTRACE_META_PACKET(packet)) {
202                return packet;
203        }
204
205        key = trace_get_erf_timestamp(packet);
206        if ((key >> 32) >= (td->last_key >> 32) + packet_interval) {
207                libtrace_generic_t tmp = {.ptr = td->results};
208                trace_publish_result(trace, t, key,
209                                tmp, RESULT_USER);
210                trace_post_reporter(trace);
211                td->last_key = key;
212                td->results = calloc(1, sizeof(result_t) +
213                                sizeof(statistic_t) * filter_count);
214        }
215        wlen = trace_get_wire_length(packet);
216        if (wlen == 0) {
217                /* Don't count ERF provenance and similar packets */
218                return packet;
219        }
220        for(i=0;i<filter_count;++i) {
221                if(trace_apply_filter(filters[i].filter, packet)) {
222                        td->results->filters[i].count++;
223                        td->results->filters[i].bytes+=wlen;
224                }
225        }
226
227        td->results->total.count++;
228        td->results->total.bytes += wlen;
229        return packet;
230}
231
232static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
233                void *global UNUSED, void *tls) {
234
235        thread_data_t *td = (thread_data_t *)tls;
236        if (td->results->total.count) {
237                libtrace_generic_t tmp = {.ptr = td->results};
238                trace_publish_result(trace, t, td->last_key, tmp, RESULT_USER);
239                trace_post_reporter(trace);
240                td->results = NULL;
241        }
242}
243
244static void cb_tick(libtrace_t *trace, libtrace_thread_t *t,
245                void *global UNUSED, void *tls, uint64_t order) {
246
247        thread_data_t *td = (thread_data_t *)tls;
248        if (order > td->last_key) {
249                libtrace_generic_t tmp = {.ptr = td->results};
250                trace_publish_result(trace, t, order, tmp, RESULT_USER);
251                trace_post_reporter(trace);
252                td->last_key = order;
253                td->results = calloc(1, sizeof(result_t) +
254                                sizeof(statistic_t) * filter_count);
255        }
256}
257
258/* Process a trace, counting packets that match filter(s) */
259static void run_trace(char *uri)
260{
261        libtrace_t *trace = NULL;
262        libtrace_callback_set_t *pktcbs, *repcbs;
263
264        if (!merge_inputs) 
265                create_output(uri);
266
267        if (output == NULL)
268                return;
269
270        trace = trace_create(uri);
271        if (trace_is_err(trace)) {
272                trace_perror(trace,"trace_create");
273                trace_destroy(trace);
274                if (!merge_inputs)
275                        output_destroy(output);
276                return;
277        }
278        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
279        trace_set_perpkt_threads(trace, threadcount);
280        trace_set_burst_size(trace, burstsize);
281
282        if (trace_get_information(trace)->live) {
283                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
284        } else {
285                trace_set_tracetime(trace, true);
286        }
287
288        pktcbs = trace_create_callback_set();
289        trace_set_starting_cb(pktcbs, cb_starting);
290        trace_set_stopping_cb(pktcbs, cb_stopping);
291        trace_set_packet_cb(pktcbs, cb_packet);
292        trace_set_tick_count_cb(pktcbs, cb_tick);
293        trace_set_tick_interval_cb(pktcbs, cb_tick);
294
295        repcbs = trace_create_callback_set();
296        trace_set_result_cb(repcbs, cb_result);
297
298        currenttrace = trace;
299        if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) {
300                trace_perror(trace,"Failed to start trace");
301                trace_destroy(trace);
302                trace_destroy_callback_set(pktcbs);
303                trace_destroy_callback_set(repcbs);
304                if (!merge_inputs)
305                        output_destroy(output);
306                return;
307        }
308
309
310        // Wait for all threads to stop
311        trace_join(trace);
312       
313        // Flush the last one out
314        report_results((glob_last_ts >> 32), count, bytes);
315        if (trace_is_err(trace))
316                trace_perror(trace,"%s",uri);
317
318        trace_destroy(trace);
319        trace_destroy_callback_set(pktcbs);
320        trace_destroy_callback_set(repcbs);
321
322        if (!merge_inputs)
323                output_destroy(output);
324       
325}
326
327// TODO Decide what to do with -c option
328static void usage(char *argv0)
329{
330        fprintf(stderr,"Usage:\n"
331        "%s flags libtraceuri [libtraceuri...]\n"
332        "-i --interval=seconds  Duration of reporting interval in seconds\n"
333        "-c --count=packets     Exit after count packets have been processed\n"
334        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
335        "-o --output-format=txt|csv|html|png Reporting output format\n"
336        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
337        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
338        "-N --nobuffer          Disable packet buffering within libtrace to force faster\n"
339        "                       updates at very low traffic rates\n"
340        "-h --help      Print this usage statement\n"
341        ,argv0);
342}
343
344int main(int argc, char *argv[]) {
345
346        int i;
347        struct sigaction sigact;
348       
349        while(1) {
350                int option_index;
351                struct option long_options[] = {
352                        { "filter",             1, 0, 'f' },
353                        { "interval",           1, 0, 'i' },
354                        { "count",              1, 0, 'c' },
355                        { "output-format",      1, 0, 'o' },
356                        { "help",               0, 0, 'h' },
357                        { "merge-inputs",       0, 0, 'm' },
358                        { "threads",            1, 0, 't' },
359                        { "nobuffer",           0, 0, 'N' },
360                        { NULL,                 0, 0, 0   },
361                };
362
363                int c=getopt_long(argc, argv, "c:f:i:o:t:hmN",
364                                long_options, &option_index);
365
366                if (c==-1)
367                        break;
368
369                switch (c) {
370                        case 'N':
371                                burstsize = 1;
372                                break;
373                        case 'f':
374                                ++filter_count;
375                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
376                                filters[filter_count-1].expr=strdup(optarg);
377                                filters[filter_count-1].filter=trace_create_filter(optarg);
378                                filters[filter_count-1].count=0;
379                                filters[filter_count-1].bytes=0;
380                                break;
381                        case 't':
382                                threadcount = atoi(optarg);
383                                if (threadcount <= 0)
384                                        threadcount = 1;
385                                break;
386                        case 'i':
387                                packet_interval=atof(optarg);
388                                break;
389                        case 'c':
390                                packet_count=strtoul(optarg, NULL, 10);
391                                break;
392                        case 'o':
393                                if (output_format) free(output_format);
394                                output_format=strdup(optarg);
395                                break;
396                        case 'm':
397                                merge_inputs = 1;
398                                break;
399                        case 'h':
400                                  usage(argv[0]);
401                                  return 1;
402                        default:
403                                fprintf(stderr,"Unknown option: %c\n",c);
404                                usage(argv[0]);
405                                return 1;
406                }
407        }
408
409        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
410                packet_interval = 60; /* every minute */
411        }
412
413        if (optind >= argc)
414                return 0;
415
416        if (output_format)
417                fprintf(stderr,"output format: '%s'\n",output_format);
418        else
419                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
420       
421       
422        if (merge_inputs) {
423                /* If we're merging the inputs, we only want to create all
424                 * the column headers etc. once rather than doing them once
425                 * per trace */
426
427                /* This is going to "name" the output based on the first
428                 * provided URI - admittedly not ideal */
429                create_output(argv[optind]);
430                if (output == NULL)
431                        return 0;
432        }
433       
434        sigact.sa_handler = cleanup_signal;
435        sigemptyset(&sigact.sa_mask);
436        sigact.sa_flags = SA_RESTART;
437
438        sigaction(SIGINT, &sigact, NULL);
439        sigaction(SIGTERM, &sigact, NULL);
440
441
442        for(i=optind;i<argc;++i) {
443                run_trace(argv[i]);
444        }
445
446        if (merge_inputs) {
447                /* Clean up after ourselves */
448                output_destroy(output);
449        }
450
451
452        return 0;
453}
Note: See TracBrowser for help on using the repository browser.