source: tools/tracertstats/tracertstats.c @ ee6e802

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since ee6e802 was ee6e802, checked in by Shane Alcock <salcock@…>, 4 years ago

Updated copyright blurb on all source files

In some cases, this meant adding copyright blurbs to files that
had never had them before.

  • Property mode set to 100644
File size: 11.7 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28/* This program takes a series of traces and bpf filters and outputs how many
29 * bytes/packets every time interval
30 */
31
32#include <stdio.h>
33#include <stdlib.h>
34#include <assert.h>
35#include <string.h>
36#include <sys/time.h>
37#include <sys/types.h>
38#include <time.h>
39
40#include <netinet/in.h>
41#include <netinet/in_systm.h>
42#include <netinet/tcp.h>
43#include <netinet/ip.h>
44#include <netinet/ip_icmp.h>
45#include <arpa/inet.h>
46#include <sys/socket.h>
47#include <getopt.h>
48#include <inttypes.h>
49#include <lt_inttypes.h>
50
51#include "libtrace_parallel.h"
52#include "output.h"
53#include "rt_protocol.h"
54#include "dagformat.h"
55
56#ifndef UINT32_MAX
57        #define UINT32_MAX      0xffffffffU
58#endif
59
60#define DEFAULT_OUTPUT_FMT "txt"
61
62char *output_format=NULL;
63int merge_inputs = 0;
64int threadcount = 4;
65int filter_count=0;
66int burstsize=10;
67
68struct filter_t {
69        char *expr;
70        struct libtrace_filter_t *filter;
71        uint64_t count;
72        uint64_t bytes;
73} *filters = NULL;
74
75uint64_t packet_count=UINT64_MAX;
76double packet_interval=UINT32_MAX;
77
78struct output_data_t *output = NULL;
79
80uint64_t count;
81uint64_t bytes;
82
83static void report_results(double ts,uint64_t count,uint64_t bytes)
84{
85        int i=0;
86        output_set_data_time(output,0,ts);
87        output_set_data_int(output,1,count);
88        output_set_data_int(output,2,bytes);
89        for(i=0;i<filter_count;++i) {
90                output_set_data_int(output,i*2+3,filters[i].count);
91                output_set_data_int(output,i*2+4,filters[i].bytes);
92                filters[i].count=filters[i].bytes=0;
93        }
94        output_flush_row(output);
95}
96
97static void create_output(char *title) {
98        int i;
99       
100        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
101        if (!output) {
102                fprintf(stderr,"Failed to create output file\n");
103                return;
104        }
105        output_add_column(output,"ts");
106        output_add_column(output,"packets");
107        output_add_column(output,"bytes");
108        for(i=0;i<filter_count;++i) {
109                char buff[1024];
110                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
111                output_add_column(output,buff);
112                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
113                output_add_column(output,buff);
114        }
115        output_flush_headings(output);
116
117}
118
119typedef struct statistic {
120        uint64_t count;
121        uint64_t bytes;
122} statistic_t;
123
124typedef struct result {
125        struct statistic total;
126        struct statistic filters[0];
127} result_t;
128
129static uint64_t glob_last_ts = 0;
130static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED,
131                void *global UNUSED, void *tls UNUSED,
132                libtrace_result_t *result) {
133        uint64_t ts = 0;
134        static bool stopped = false;
135        static uint64_t packets_seen = 0;
136        int j;
137        result_t *res;
138
139        if (stopped)
140                return;
141
142        ts = result->key;
143        res = result->value.ptr;
144        if (glob_last_ts == 0)
145                glob_last_ts = ts;
146        while ((glob_last_ts >> 32) < (ts >> 32)) {
147                report_results(glob_last_ts >> 32, count, bytes);
148                count = 0;
149                bytes = 0;
150                for (j = 0; j < filter_count; j++)
151                        filters[j].count = filters[j].bytes = 0;
152                glob_last_ts = ts;
153        }
154        count += res->total.count;
155        packets_seen += res->total.count;
156        bytes += res->total.bytes;
157        for (j = 0; j < filter_count; j++) {
158                filters[j].count += res->filters[j].count;
159                filters[j].bytes += res->filters[j].bytes;
160        }
161        free(res);
162
163        /* Be careful to only call pstop once from within this thread! */
164        if (packets_seen > packet_count) {
165                trace_pstop(trace);
166                stopped = true;
167        }
168}
169
170typedef struct threadlocal {
171        result_t *results;
172        uint64_t last_key;
173} thread_data_t;
174
175static void *cb_starting(libtrace_t *trace UNUSED,
176        libtrace_thread_t *t UNUSED, void *global UNUSED)
177{
178        thread_data_t *td = calloc(1, sizeof(thread_data_t));
179        td->results = calloc(1, sizeof(result_t) +
180                        sizeof(statistic_t) * filter_count);
181        return td;
182}
183
184static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
185                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
186
187        uint64_t key;
188        thread_data_t *td = (thread_data_t *)tls;
189        int i;
190
191        key = trace_get_erf_timestamp(packet);
192        if ((key >> 32) >= (td->last_key >> 32) + packet_interval) {
193                libtrace_generic_t tmp = {.ptr = td->results};
194                trace_publish_result(trace, t, key,
195                                tmp, RESULT_USER);
196                trace_post_reporter(trace);
197                td->last_key = key;
198                td->results = calloc(1, sizeof(result_t) +
199                                sizeof(statistic_t) * filter_count);
200        }
201        for(i=0;i<filter_count;++i) {
202                if(trace_apply_filter(filters[i].filter, packet)) {
203                        td->results->filters[i].count++;
204                        td->results->filters[i].bytes+=trace_get_wire_length(packet);
205                }
206        }
207
208        td->results->total.count++;
209        td->results->total.bytes +=trace_get_wire_length(packet);
210        return packet;
211}
212
213static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
214                void *global UNUSED, void *tls) {
215
216        thread_data_t *td = (thread_data_t *)tls;
217        if (td->results->total.count) {
218                libtrace_generic_t tmp = {.ptr = td->results};
219                trace_publish_result(trace, t, td->last_key, tmp, RESULT_USER);
220                trace_post_reporter(trace);
221                td->results = NULL;
222        }
223}
224
225static void cb_tick(libtrace_t *trace, libtrace_thread_t *t,
226                void *global UNUSED, void *tls, uint64_t order) {
227
228        thread_data_t *td = (thread_data_t *)tls;
229        if (order > td->last_key) {
230                libtrace_generic_t tmp = {.ptr = td->results};
231                trace_publish_result(trace, t, order, tmp, RESULT_USER);
232                trace_post_reporter(trace);
233                td->last_key = order;
234                td->results = calloc(1, sizeof(result_t) +
235                                sizeof(statistic_t) * filter_count);
236        }
237}
238
239/* Process a trace, counting packets that match filter(s) */
240static void run_trace(char *uri)
241{
242        libtrace_t *trace = NULL;
243        libtrace_callback_set_t *pktcbs, *repcbs;
244
245        if (!merge_inputs) 
246                create_output(uri);
247
248        if (output == NULL)
249                return;
250
251        trace = trace_create(uri);
252        if (trace_is_err(trace)) {
253                trace_perror(trace,"trace_create");
254                trace_destroy(trace);
255                if (!merge_inputs)
256                        output_destroy(output);
257                return;
258        }
259        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
260        trace_set_tracetime(trace, true);
261        trace_set_perpkt_threads(trace, threadcount);
262        trace_set_burst_size(trace, burstsize);
263
264        if (trace_get_information(trace)->live) {
265                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
266        }
267
268        pktcbs = trace_create_callback_set();
269        trace_set_starting_cb(pktcbs, cb_starting);
270        trace_set_stopping_cb(pktcbs, cb_stopping);
271        trace_set_packet_cb(pktcbs, cb_packet);
272        trace_set_tick_count_cb(pktcbs, cb_tick);
273        trace_set_tick_interval_cb(pktcbs, cb_tick);
274
275        repcbs = trace_create_callback_set();
276        trace_set_result_cb(repcbs, cb_result);
277
278        if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) {
279                trace_perror(trace,"Failed to start trace");
280                trace_destroy(trace);
281                trace_destroy_callback_set(pktcbs);
282                trace_destroy_callback_set(repcbs);
283                if (!merge_inputs)
284                        output_destroy(output);
285                return;
286        }
287
288
289        // Wait for all threads to stop
290        trace_join(trace);
291       
292        // Flush the last one out
293        report_results((glob_last_ts >> 32), count, bytes);
294        if (trace_is_err(trace))
295                trace_perror(trace,"%s",uri);
296
297        trace_destroy(trace);
298        trace_destroy_callback_set(pktcbs);
299        trace_destroy_callback_set(repcbs);
300
301        if (!merge_inputs)
302                output_destroy(output);
303       
304}
305
306// TODO Decide what to do with -c option
307static void usage(char *argv0)
308{
309        fprintf(stderr,"Usage:\n"
310        "%s flags libtraceuri [libtraceuri...]\n"
311        "-i --interval=seconds  Duration of reporting interval in seconds\n"
312        "-c --count=packets     Exit after count packets have been processed\n"
313        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
314        "-o --output-format=txt|csv|html|png Reporting output format\n"
315        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
316        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
317        "-N --nobuffer          Disable packet buffering within libtrace to force faster\n"
318        "                       updates at very low traffic rates\n"
319        "-h --help      Print this usage statement\n"
320        ,argv0);
321}
322
323int main(int argc, char *argv[]) {
324
325        int i;
326       
327        while(1) {
328                int option_index;
329                struct option long_options[] = {
330                        { "filter",             1, 0, 'f' },
331                        { "interval",           1, 0, 'i' },
332                        { "count",              1, 0, 'c' },
333                        { "output-format",      1, 0, 'o' },
334                        { "help",               0, 0, 'h' },
335                        { "merge-inputs",       0, 0, 'm' },
336                        { "threads",            1, 0, 't' },
337                        { "nobuffer",           0, 0, 'N' },
338                        { NULL,                 0, 0, 0   },
339                };
340
341                int c=getopt_long(argc, argv, "c:f:i:o:t:hmN",
342                                long_options, &option_index);
343
344                if (c==-1)
345                        break;
346
347                switch (c) {
348                        case 'N':
349                                burstsize = 1;
350                                break;
351                        case 'f':
352                                ++filter_count;
353                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
354                                filters[filter_count-1].expr=strdup(optarg);
355                                filters[filter_count-1].filter=trace_create_filter(optarg);
356                                filters[filter_count-1].count=0;
357                                filters[filter_count-1].bytes=0;
358                                break;
359                        case 't':
360                                threadcount = atoi(optarg);
361                                if (threadcount <= 0)
362                                        threadcount = 1;
363                                break;
364                        case 'i':
365                                packet_interval=atof(optarg);
366                                break;
367                        case 'c':
368                                packet_count=strtoul(optarg, NULL, 10);
369                                break;
370                        case 'o':
371                                if (output_format) free(output_format);
372                                output_format=strdup(optarg);
373                                break;
374                        case 'm':
375                                merge_inputs = 1;
376                                break;
377                        case 'h':
378                                  usage(argv[0]);
379                                  return 1;
380                        default:
381                                fprintf(stderr,"Unknown option: %c\n",c);
382                                usage(argv[0]);
383                                return 1;
384                }
385        }
386
387        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
388                packet_interval = 60; /* every minute */
389        }
390
391        if (optind >= argc)
392                return 0;
393
394        if (output_format)
395                fprintf(stderr,"output format: '%s'\n",output_format);
396        else
397                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
398       
399       
400        if (merge_inputs) {
401                /* If we're merging the inputs, we only want to create all
402                 * the column headers etc. once rather than doing them once
403                 * per trace */
404
405                /* This is going to "name" the output based on the first
406                 * provided URI - admittedly not ideal */
407                create_output(argv[optind]);
408                if (output == NULL)
409                        return 0;
410        }
411               
412        for(i=optind;i<argc;++i) {
413                run_trace(argv[i]);
414        }
415
416        if (merge_inputs) {
417                /* Clean up after ourselves */
418                output_destroy(output);
419        }
420
421
422        return 0;
423}
Note: See TracBrowser for help on using the repository browser.