source: tools/tracertstats/tracertstats.c @ 2d8a045

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2d8a045 was f625817, checked in by Shane Alcock <salcock@…>, 6 years ago

Reworked callback API and removed old per_msg and reporter functions

Updated tracertstats to use the new callback API.

Extended the callback approach to the reporter thread as well as the per
packet threads.

Added libtrace_callback_set_t structure, which is used to register the
user callback functions.

Added callback functionality for MESSAGE_RESULT (needed now that reporter
threads also do callbacks) and MESSAGE_USER (for user-defined messages). The
MESSAGE_USER callback is essentially the same as the old per_msg function
style.

Updated combiners to use send_message to pass results to the reporter thread.
send_message itself is now no longer static, so that combiners can use it.

Disabled building of tracestats_parallel as it was using the older version
of the callback API. Will update in a future commit.

  • Property mode set to 100644
File size: 11.6 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31/* This program takes a series of traces and bpf filters and outputs how many
32 * bytes/packets every time interval
33 */
34
35#include <stdio.h>
36#include <stdlib.h>
37#include <assert.h>
38#include <string.h>
39#include <sys/time.h>
40#include <sys/types.h>
41#include <time.h>
42
43#include <netinet/in.h>
44#include <netinet/in_systm.h>
45#include <netinet/tcp.h>
46#include <netinet/ip.h>
47#include <netinet/ip_icmp.h>
48#include <arpa/inet.h>
49#include <sys/socket.h>
50#include <getopt.h>
51#include <inttypes.h>
52#include <lt_inttypes.h>
53
54#include "libtrace_parallel.h"
55#include "output.h"
56#include "rt_protocol.h"
57#include "dagformat.h"
58
59#ifndef UINT32_MAX
60        #define UINT32_MAX      0xffffffffU
61#endif
62
63#define DEFAULT_OUTPUT_FMT "txt"
64
65char *output_format=NULL;
66int merge_inputs = 0;
67int threadcount = 4;
68int filter_count=0;
69
70struct filter_t {
71        char *expr;
72        struct libtrace_filter_t *filter;
73        uint64_t count;
74        uint64_t bytes;
75} *filters = NULL;
76
77uint64_t packet_count=UINT64_MAX;
78double packet_interval=UINT32_MAX;
79
80struct output_data_t *output = NULL;
81
82uint64_t count;
83uint64_t bytes;
84
85static void report_results(double ts,uint64_t count,uint64_t bytes)
86{
87        int i=0;
88        output_set_data_time(output,0,ts);
89        output_set_data_int(output,1,count);
90        output_set_data_int(output,2,bytes);
91        for(i=0;i<filter_count;++i) {
92                output_set_data_int(output,i*2+3,filters[i].count);
93                output_set_data_int(output,i*2+4,filters[i].bytes);
94                filters[i].count=filters[i].bytes=0;
95        }
96        output_flush_row(output);
97}
98
99static void create_output(char *title) {
100        int i;
101       
102        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
103        if (!output) {
104                fprintf(stderr,"Failed to create output file\n");
105                return;
106        }
107        output_add_column(output,"ts");
108        output_add_column(output,"packets");
109        output_add_column(output,"bytes");
110        for(i=0;i<filter_count;++i) {
111                char buff[1024];
112                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
113                output_add_column(output,buff);
114                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
115                output_add_column(output,buff);
116        }
117        output_flush_headings(output);
118
119}
120
121typedef struct statistic {
122        uint64_t count;
123        uint64_t bytes;
124} statistic_t;
125
126typedef struct result {
127        struct statistic total;
128        struct statistic filters[0];
129} result_t;
130
131static uint64_t glob_last_ts = 0;
132static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED,
133                void *global UNUSED, void *tls UNUSED,
134                libtrace_result_t *result) {
135        uint64_t ts = 0;
136        static bool stopped = false;
137        static uint64_t packets_seen = 0;
138        int j;
139        result_t *res;
140
141        if (stopped)
142                return;
143
144        ts = result->key;
145        res = result->value.ptr;
146        if (glob_last_ts == 0)
147                glob_last_ts = ts;
148        while ((glob_last_ts >> 32) < (ts >> 32)) {
149                report_results(glob_last_ts >> 32, count, bytes);
150                count = 0;
151                bytes = 0;
152                for (j = 0; j < filter_count; j++)
153                        filters[j].count = filters[j].bytes = 0;
154                glob_last_ts = ts;
155        }
156        count += res->total.count;
157        packets_seen += res->total.count;
158        bytes += res->total.bytes;
159        for (j = 0; j < filter_count; j++) {
160                filters[j].count += res->filters[j].count;
161                filters[j].bytes += res->filters[j].bytes;
162        }
163        free(res);
164
165        /* Be careful to only call pstop once from within this thread! */
166        if (packets_seen > packet_count) {
167                trace_pstop(trace);
168                stopped = true;
169        }
170}
171
172typedef struct threadlocal {
173        result_t *results;
174        uint64_t last_key;
175} thread_data_t;
176
177static void *cb_starting(libtrace_t *trace UNUSED,
178        libtrace_thread_t *t UNUSED, void *global UNUSED)
179{
180        thread_data_t *td = calloc(1, sizeof(thread_data_t));
181        td->results = calloc(1, sizeof(result_t) +
182                        sizeof(statistic_t) * filter_count);
183        return td;
184}
185
186static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
187                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
188
189        uint64_t key;
190        thread_data_t *td = (thread_data_t *)tls;
191        int i;
192
193        key = trace_get_erf_timestamp(packet);
194        if ((key >> 32) > (td->last_key >> 32) + packet_interval) {
195                libtrace_generic_t tmp = {.ptr = td->results};
196                trace_publish_result(trace, t, key,
197                                tmp, RESULT_USER);
198                trace_post_reporter(trace);
199                td->last_key = key;
200                td->results = calloc(1, sizeof(result_t) +
201                                sizeof(statistic_t) * filter_count);
202        }
203        for(i=0;i<filter_count;++i) {
204                if(trace_apply_filter(filters[i].filter, packet)) {
205                        td->results->filters[i].count++;
206                        td->results->filters[i].bytes+=trace_get_wire_length(packet);
207                }
208        }
209
210        td->results->total.count++;
211        td->results->total.bytes +=trace_get_wire_length(packet);
212        return packet;
213}
214
215static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
216                void *global UNUSED, void *tls) {
217
218        thread_data_t *td = (thread_data_t *)tls;
219        if (td->results->total.count) {
220                libtrace_generic_t tmp = {.ptr = td->results};
221                trace_publish_result(trace, t, td->last_key, tmp, RESULT_USER);
222                trace_post_reporter(trace);
223                td->results = NULL;
224        }
225}
226
227static void cb_tick(libtrace_t *trace, libtrace_thread_t *t,
228                void *global UNUSED, void *tls, uint64_t order) {
229
230        thread_data_t *td = (thread_data_t *)tls;
231        if (order > td->last_key) {
232                libtrace_generic_t tmp = {.ptr = td->results};
233                trace_publish_result(trace, t, order, tmp, RESULT_USER);
234                trace_post_reporter(trace);
235                td->last_key = order;
236                td->results = calloc(1, sizeof(result_t) +
237                                sizeof(statistic_t) * filter_count);
238        }
239}
240
241/* Process a trace, counting packets that match filter(s) */
242static void run_trace(char *uri)
243{
244        libtrace_t *trace = NULL;
245        libtrace_callback_set_t *pktcbs, *repcbs;
246
247        if (!merge_inputs) 
248                create_output(uri);
249
250        if (output == NULL)
251                return;
252
253        trace = trace_create(uri);
254        if (trace_is_err(trace)) {
255                trace_perror(trace,"trace_create");
256                trace_destroy(trace);
257                if (!merge_inputs)
258                        output_destroy(output);
259                return;
260        }
261        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
262        trace_set_tracetime(trace, true);
263        trace_set_perpkt_threads(trace, threadcount);
264
265        if (trace_get_information(trace)->live) {
266                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
267        }
268
269        pktcbs = trace_create_callback_set();
270        trace_set_starting_cb(pktcbs, cb_starting);
271        trace_set_stopping_cb(pktcbs, cb_stopping);
272        trace_set_packet_cb(pktcbs, cb_packet);
273        trace_set_tick_count_cb(pktcbs, cb_tick);
274        trace_set_tick_interval_cb(pktcbs, cb_tick);
275
276        repcbs = trace_create_callback_set();
277        trace_set_result_cb(repcbs, cb_result);
278
279        if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) {
280                trace_perror(trace,"Failed to start trace");
281                trace_destroy(trace);
282                trace_destroy_callback_set(pktcbs);
283                trace_destroy_callback_set(repcbs);
284                if (!merge_inputs)
285                        output_destroy(output);
286                return;
287        }
288
289
290        // Wait for all threads to stop
291        trace_join(trace);
292       
293        // Flush the last one out
294        report_results((glob_last_ts >> 32), count, bytes);
295        if (trace_is_err(trace))
296                trace_perror(trace,"%s",uri);
297
298        trace_destroy(trace);
299        trace_destroy_callback_set(pktcbs);
300        trace_destroy_callback_set(repcbs);
301
302        if (!merge_inputs)
303                output_destroy(output);
304       
305}
306
307// TODO Decide what to do with -c option
308static void usage(char *argv0)
309{
310        fprintf(stderr,"Usage:\n"
311        "%s flags libtraceuri [libtraceuri...]\n"
312        "-i --interval=seconds  Duration of reporting interval in seconds\n"
313        "-c --count=packets     Exit after count packets have been processed\n"
314        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
315        "-o --output-format=txt|csv|html|png Reporting output format\n"
316        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
317        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
318        "-h --help      Print this usage statement\n"
319        ,argv0);
320}
321
322int main(int argc, char *argv[]) {
323
324        int i;
325       
326        while(1) {
327                int option_index;
328                struct option long_options[] = {
329                        { "filter",             1, 0, 'f' },
330                        { "interval",           1, 0, 'i' },
331                        { "count",              1, 0, 'c' },
332                        { "output-format",      1, 0, 'o' },
333                        { "help",               0, 0, 'h' },
334                        { "merge-inputs",       0, 0, 'm' },
335                        { "threads",            1, 0, 't' },
336                        { NULL,                 0, 0, 0   },
337                };
338
339                int c=getopt_long(argc, argv, "c:f:i:o:t:hm",
340                                long_options, &option_index);
341
342                if (c==-1)
343                        break;
344
345                switch (c) {
346                        case 'f':
347                                ++filter_count;
348                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
349                                filters[filter_count-1].expr=strdup(optarg);
350                                filters[filter_count-1].filter=trace_create_filter(optarg);
351                                filters[filter_count-1].count=0;
352                                filters[filter_count-1].bytes=0;
353                                break;
354                        case 't':
355                                threadcount = atoi(optarg);
356                                if (threadcount <= 0)
357                                        threadcount = 1;
358                                break;
359                        case 'i':
360                                packet_interval=atof(optarg);
361                                break;
362                        case 'c':
363                                packet_count=strtoul(optarg, NULL, 10);
364                                break;
365                        case 'o':
366                                if (output_format) free(output_format);
367                                output_format=strdup(optarg);
368                                break;
369                        case 'm':
370                                merge_inputs = 1;
371                                break;
372                        case 'h':
373                                  usage(argv[0]);
374                                  return 1;
375                        default:
376                                fprintf(stderr,"Unknown option: %c\n",c);
377                                usage(argv[0]);
378                                return 1;
379                }
380        }
381
382        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
383                packet_interval = 60; /* every minute */
384        }
385
386        if (optind >= argc)
387                return 0;
388
389        if (output_format)
390                fprintf(stderr,"output format: '%s'\n",output_format);
391        else
392                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
393       
394       
395        if (merge_inputs) {
396                /* If we're merging the inputs, we only want to create all
397                 * the column headers etc. once rather than doing them once
398                 * per trace */
399
400                /* This is going to "name" the output based on the first
401                 * provided URI - admittedly not ideal */
402                create_output(argv[optind]);
403                if (output == NULL)
404                        return 0;
405        }
406               
407        for(i=optind;i<argc;++i) {
408                run_trace(argv[i]);
409        }
410
411        if (merge_inputs) {
412                /* Clean up after ourselves */
413                output_destroy(output);
414        }
415
416
417        return 0;
418}
Note: See TracBrowser for help on using the repository browser.