source: tools/tracertstats/tracertstats.c @ 8e11beb

cachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since 8e11beb was 8e11beb, checked in by Shane Alcock <salcock@…>, 3 years ago

Update tools to properly ignore meta records and missing timestamps

pcapng introduces a lot of meta records that we should preserve as
long as possible. Since we aren't automatically discarding them,
we need to make sure that our tools do not try to treat them as
"real" packets, i.e. try to get a timestamp or capture length from
them.

Similarly, simple pcapng packet records do not have a timestamp
so we need to make sure the tools do the right thing when
trace_get_seconds() returns a timestamp of zero on a packet. For
starters, we don't want to set our "first" packet time to zero
in that case!

  • Property mode set to 100644
File size: 11.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28/* This program takes a series of traces and bpf filters and outputs how many
29 * bytes/packets every time interval
30 */
31
32#include <stdio.h>
33#include <stdlib.h>
34#include <assert.h>
35#include <string.h>
36#include <sys/time.h>
37#include <sys/types.h>
38#include <time.h>
39
40#include <netinet/in.h>
41#include <netinet/in_systm.h>
42#include <netinet/tcp.h>
43#include <netinet/ip.h>
44#include <netinet/ip_icmp.h>
45#include <arpa/inet.h>
46#include <sys/socket.h>
47#include <getopt.h>
48#include <inttypes.h>
49#include <lt_inttypes.h>
50
51#include "libtrace_parallel.h"
52#include "output.h"
53#include "rt_protocol.h"
54#include "dagformat.h"
55
56#ifndef UINT32_MAX
57        #define UINT32_MAX      0xffffffffU
58#endif
59
60#define DEFAULT_OUTPUT_FMT "txt"
61
62char *output_format=NULL;
63int merge_inputs = 0;
64int threadcount = 4;
65int filter_count=0;
66int burstsize=10;
67
68struct filter_t {
69        char *expr;
70        struct libtrace_filter_t *filter;
71        uint64_t count;
72        uint64_t bytes;
73} *filters = NULL;
74
75uint64_t packet_count=UINT64_MAX;
76double packet_interval=UINT32_MAX;
77
78struct output_data_t *output = NULL;
79
80uint64_t count;
81uint64_t bytes;
82
83static void report_results(double ts,uint64_t count,uint64_t bytes)
84{
85        int i=0;
86        output_set_data_time(output,0,ts);
87        output_set_data_int(output,1,count);
88        output_set_data_int(output,2,bytes);
89        for(i=0;i<filter_count;++i) {
90                output_set_data_int(output,i*2+3,filters[i].count);
91                output_set_data_int(output,i*2+4,filters[i].bytes);
92                filters[i].count=filters[i].bytes=0;
93        }
94        output_flush_row(output);
95}
96
97static void create_output(char *title) {
98        int i;
99       
100        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
101        if (!output) {
102                fprintf(stderr,"Failed to create output file\n");
103                return;
104        }
105        output_add_column(output,"ts");
106        output_add_column(output,"packets");
107        output_add_column(output,"bytes");
108        for(i=0;i<filter_count;++i) {
109                char buff[1024];
110                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
111                output_add_column(output,buff);
112                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
113                output_add_column(output,buff);
114        }
115        output_flush_headings(output);
116
117}
118
119typedef struct statistic {
120        uint64_t count;
121        uint64_t bytes;
122} statistic_t;
123
124typedef struct result {
125        struct statistic total;
126        struct statistic filters[0];
127} result_t;
128
129static uint64_t glob_last_ts = 0;
130static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED,
131                void *global UNUSED, void *tls UNUSED,
132                libtrace_result_t *result) {
133        uint64_t ts = 0;
134        static bool stopped = false;
135        static uint64_t packets_seen = 0;
136        int j;
137        result_t *res;
138
139        if (stopped)
140                return;
141
142        ts = result->key;
143        res = result->value.ptr;
144        if (glob_last_ts == 0)
145                glob_last_ts = ts;
146        while ((glob_last_ts >> 32) < (ts >> 32)) {
147                report_results(glob_last_ts >> 32, count, bytes);
148                count = 0;
149                bytes = 0;
150                for (j = 0; j < filter_count; j++)
151                        filters[j].count = filters[j].bytes = 0;
152                glob_last_ts = ts;
153        }
154        count += res->total.count;
155        packets_seen += res->total.count;
156        bytes += res->total.bytes;
157        for (j = 0; j < filter_count; j++) {
158                filters[j].count += res->filters[j].count;
159                filters[j].bytes += res->filters[j].bytes;
160        }
161        free(res);
162
163        /* Be careful to only call pstop once from within this thread! */
164        if (packets_seen > packet_count) {
165                trace_pstop(trace);
166                stopped = true;
167        }
168}
169
170typedef struct threadlocal {
171        result_t *results;
172        uint64_t last_key;
173} thread_data_t;
174
175static void *cb_starting(libtrace_t *trace UNUSED,
176        libtrace_thread_t *t UNUSED, void *global UNUSED)
177{
178        thread_data_t *td = calloc(1, sizeof(thread_data_t));
179        td->results = calloc(1, sizeof(result_t) +
180                        sizeof(statistic_t) * filter_count);
181        return td;
182}
183
184static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
185                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
186
187        uint64_t key;
188        thread_data_t *td = (thread_data_t *)tls;
189        int i;
190
191        if (IS_LIBTRACE_META_PACKET(packet)) {
192                return packet;
193        }
194
195        key = trace_get_erf_timestamp(packet);
196        if ((key >> 32) >= (td->last_key >> 32) + packet_interval) {
197                libtrace_generic_t tmp = {.ptr = td->results};
198                trace_publish_result(trace, t, key,
199                                tmp, RESULT_USER);
200                trace_post_reporter(trace);
201                td->last_key = key;
202                td->results = calloc(1, sizeof(result_t) +
203                                sizeof(statistic_t) * filter_count);
204        }
205        for(i=0;i<filter_count;++i) {
206                if(trace_apply_filter(filters[i].filter, packet)) {
207                        td->results->filters[i].count++;
208                        td->results->filters[i].bytes+=trace_get_wire_length(packet);
209                }
210        }
211
212        td->results->total.count++;
213        td->results->total.bytes +=trace_get_wire_length(packet);
214        return packet;
215}
216
217static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
218                void *global UNUSED, void *tls) {
219
220        thread_data_t *td = (thread_data_t *)tls;
221        if (td->results->total.count) {
222                libtrace_generic_t tmp = {.ptr = td->results};
223                trace_publish_result(trace, t, td->last_key, tmp, RESULT_USER);
224                trace_post_reporter(trace);
225                td->results = NULL;
226        }
227}
228
229static void cb_tick(libtrace_t *trace, libtrace_thread_t *t,
230                void *global UNUSED, void *tls, uint64_t order) {
231
232        thread_data_t *td = (thread_data_t *)tls;
233        if (order > td->last_key) {
234                libtrace_generic_t tmp = {.ptr = td->results};
235                trace_publish_result(trace, t, order, tmp, RESULT_USER);
236                trace_post_reporter(trace);
237                td->last_key = order;
238                td->results = calloc(1, sizeof(result_t) +
239                                sizeof(statistic_t) * filter_count);
240        }
241}
242
243/* Process a trace, counting packets that match filter(s) */
244static void run_trace(char *uri)
245{
246        libtrace_t *trace = NULL;
247        libtrace_callback_set_t *pktcbs, *repcbs;
248
249        if (!merge_inputs) 
250                create_output(uri);
251
252        if (output == NULL)
253                return;
254
255        trace = trace_create(uri);
256        if (trace_is_err(trace)) {
257                trace_perror(trace,"trace_create");
258                trace_destroy(trace);
259                if (!merge_inputs)
260                        output_destroy(output);
261                return;
262        }
263        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
264        trace_set_perpkt_threads(trace, threadcount);
265        trace_set_burst_size(trace, burstsize);
266
267        if (trace_get_information(trace)->live) {
268                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
269        } else {
270                trace_set_tracetime(trace, true);
271        }
272
273        pktcbs = trace_create_callback_set();
274        trace_set_starting_cb(pktcbs, cb_starting);
275        trace_set_stopping_cb(pktcbs, cb_stopping);
276        trace_set_packet_cb(pktcbs, cb_packet);
277        trace_set_tick_count_cb(pktcbs, cb_tick);
278        trace_set_tick_interval_cb(pktcbs, cb_tick);
279
280        repcbs = trace_create_callback_set();
281        trace_set_result_cb(repcbs, cb_result);
282
283        if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) {
284                trace_perror(trace,"Failed to start trace");
285                trace_destroy(trace);
286                trace_destroy_callback_set(pktcbs);
287                trace_destroy_callback_set(repcbs);
288                if (!merge_inputs)
289                        output_destroy(output);
290                return;
291        }
292
293
294        // Wait for all threads to stop
295        trace_join(trace);
296       
297        // Flush the last one out
298        report_results((glob_last_ts >> 32), count, bytes);
299        if (trace_is_err(trace))
300                trace_perror(trace,"%s",uri);
301
302        trace_destroy(trace);
303        trace_destroy_callback_set(pktcbs);
304        trace_destroy_callback_set(repcbs);
305
306        if (!merge_inputs)
307                output_destroy(output);
308       
309}
310
311// TODO Decide what to do with -c option
312static void usage(char *argv0)
313{
314        fprintf(stderr,"Usage:\n"
315        "%s flags libtraceuri [libtraceuri...]\n"
316        "-i --interval=seconds  Duration of reporting interval in seconds\n"
317        "-c --count=packets     Exit after count packets have been processed\n"
318        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
319        "-o --output-format=txt|csv|html|png Reporting output format\n"
320        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
321        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
322        "-N --nobuffer          Disable packet buffering within libtrace to force faster\n"
323        "                       updates at very low traffic rates\n"
324        "-h --help      Print this usage statement\n"
325        ,argv0);
326}
327
328int main(int argc, char *argv[]) {
329
330        int i;
331       
332        while(1) {
333                int option_index;
334                struct option long_options[] = {
335                        { "filter",             1, 0, 'f' },
336                        { "interval",           1, 0, 'i' },
337                        { "count",              1, 0, 'c' },
338                        { "output-format",      1, 0, 'o' },
339                        { "help",               0, 0, 'h' },
340                        { "merge-inputs",       0, 0, 'm' },
341                        { "threads",            1, 0, 't' },
342                        { "nobuffer",           0, 0, 'N' },
343                        { NULL,                 0, 0, 0   },
344                };
345
346                int c=getopt_long(argc, argv, "c:f:i:o:t:hmN",
347                                long_options, &option_index);
348
349                if (c==-1)
350                        break;
351
352                switch (c) {
353                        case 'N':
354                                burstsize = 1;
355                                break;
356                        case 'f':
357                                ++filter_count;
358                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
359                                filters[filter_count-1].expr=strdup(optarg);
360                                filters[filter_count-1].filter=trace_create_filter(optarg);
361                                filters[filter_count-1].count=0;
362                                filters[filter_count-1].bytes=0;
363                                break;
364                        case 't':
365                                threadcount = atoi(optarg);
366                                if (threadcount <= 0)
367                                        threadcount = 1;
368                                break;
369                        case 'i':
370                                packet_interval=atof(optarg);
371                                break;
372                        case 'c':
373                                packet_count=strtoul(optarg, NULL, 10);
374                                break;
375                        case 'o':
376                                if (output_format) free(output_format);
377                                output_format=strdup(optarg);
378                                break;
379                        case 'm':
380                                merge_inputs = 1;
381                                break;
382                        case 'h':
383                                  usage(argv[0]);
384                                  return 1;
385                        default:
386                                fprintf(stderr,"Unknown option: %c\n",c);
387                                usage(argv[0]);
388                                return 1;
389                }
390        }
391
392        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
393                packet_interval = 60; /* every minute */
394        }
395
396        if (optind >= argc)
397                return 0;
398
399        if (output_format)
400                fprintf(stderr,"output format: '%s'\n",output_format);
401        else
402                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
403       
404       
405        if (merge_inputs) {
406                /* If we're merging the inputs, we only want to create all
407                 * the column headers etc. once rather than doing them once
408                 * per trace */
409
410                /* This is going to "name" the output based on the first
411                 * provided URI - admittedly not ideal */
412                create_output(argv[optind]);
413                if (output == NULL)
414                        return 0;
415        }
416               
417        for(i=optind;i<argc;++i) {
418                run_trace(argv[i]);
419        }
420
421        if (merge_inputs) {
422                /* Clean up after ourselves */
423                output_destroy(output);
424        }
425
426
427        return 0;
428}
Note: See TracBrowser for help on using the repository browser.