source: tools/tracertstats/tracertstats.c @ d2df7c4

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since d2df7c4 was d2df7c4, checked in by Shane Alcock <salcock@…>, 7 years ago

Make -c option work for tracertstats

It's only an approximate stopping point (a lower bound to be exact), as we
only check whether we've passed the count in the reporting thread.

Removed references to tracertstats_parallel from Makefile.am

  • Property mode set to 100644
File size: 10.6 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31/* This program takes a series of traces and bpf filters and outputs how many
32 * bytes/packets every time interval
33 */
34
35#include <stdio.h>
36#include <stdlib.h>
37#include <assert.h>
38#include <string.h>
39#include <sys/time.h>
40#include <sys/types.h>
41#include <time.h>
42
43#include <netinet/in.h>
44#include <netinet/in_systm.h>
45#include <netinet/tcp.h>
46#include <netinet/ip.h>
47#include <netinet/ip_icmp.h>
48#include <arpa/inet.h>
49#include <sys/socket.h>
50#include <getopt.h>
51#include <inttypes.h>
52#include <lt_inttypes.h>
53
54#include "libtrace_parallel.h"
55#include "output.h"
56#include "rt_protocol.h"
57#include "dagformat.h"
58
59#ifndef UINT32_MAX
60        #define UINT32_MAX      0xffffffffU
61#endif
62
63#define DEFAULT_OUTPUT_FMT "txt"
64
65char *output_format=NULL;
66int merge_inputs = 0;
67int threadcount = 4;
68int filter_count=0;
69
70struct filter_t {
71        char *expr;
72        struct libtrace_filter_t *filter;
73        uint64_t count;
74        uint64_t bytes;
75} *filters = NULL;
76
77uint64_t packet_count=UINT64_MAX;
78double packet_interval=UINT32_MAX;
79
80struct output_data_t *output = NULL;
81
82static void report_results(double ts,uint64_t count,uint64_t bytes)
83{
84        int i=0;
85        output_set_data_time(output,0,ts);
86        output_set_data_int(output,1,count);
87        output_set_data_int(output,2,bytes);
88        for(i=0;i<filter_count;++i) {
89                output_set_data_int(output,i*2+3,filters[i].count);
90                output_set_data_int(output,i*2+4,filters[i].bytes);
91                filters[i].count=filters[i].bytes=0;
92        }
93        output_flush_row(output);
94}
95
96static void create_output(char *title) {
97        int i;
98       
99        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
100        if (!output) {
101                fprintf(stderr,"Failed to create output file\n");
102                return;
103        }
104        output_add_column(output,"ts");
105        output_add_column(output,"packets");
106        output_add_column(output,"bytes");
107        for(i=0;i<filter_count;++i) {
108                char buff[1024];
109                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
110                output_add_column(output,buff);
111                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
112                output_add_column(output,buff);
113        }
114        output_flush_headings(output);
115
116}
117
118uint64_t count;
119uint64_t bytes;
120
121typedef struct statistic {
122        uint64_t count;
123        uint64_t bytes;
124} statistic_t;
125
126typedef struct result {
127        struct statistic total;
128        struct statistic filters[0];
129} result_t;
130
131static uint64_t glob_last_ts = 0;
132static void process_result(libtrace_t *trace, int mesg,
133                           libtrace_generic_t data,
134                           libtrace_thread_t *sender UNUSED) {
135        uint64_t ts = 0;
136        static bool stopped = false;
137        static uint64_t packets_seen = 0;
138        int j;
139        result_t *res;
140
141        if (stopped)
142                return;
143
144        switch (mesg) {
145                case MESSAGE_RESULT:
146                ts = data.res->key;
147                res = data.res->value.ptr;
148                if (glob_last_ts == 0)
149                        glob_last_ts = ts;
150                while ((glob_last_ts >> 32) < (ts >> 32)) {
151                        report_results(glob_last_ts >> 32, count, bytes);
152                        count = 0;
153                        bytes = 0;
154                        for (j = 0; j < filter_count; j++)
155                                filters[j].count = filters[j].bytes = 0;
156                        glob_last_ts = ts;
157                }
158                count += res->total.count;
159                packets_seen += res->total.count;
160                bytes += res->total.bytes;
161                for (j = 0; j < filter_count; j++) {
162                        filters[j].count += res->filters[j].count;
163                        filters[j].bytes += res->filters[j].bytes;
164                }
165                free(res);
166        }
167
168        /* Be careful to only call pstop once from within this thread! */
169        if (packets_seen > packet_count) {
170                trace_pstop(trace);
171                stopped = true;
172        }
173}
174
175typedef struct timestamp_sync {
176        int64_t difference_usecs;
177        uint64_t first_interval_number;
178} timestamp_sync_t;
179
180static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
181                        int mesg, libtrace_generic_t data,
182                        libtrace_thread_t *sender UNUSED)
183{
184        int i;
185        static __thread result_t * results = NULL;
186        uint64_t key;
187        static __thread uint64_t last_key = 0;
188
189        switch(mesg) {
190        case MESSAGE_PACKET:
191                key = trace_get_erf_timestamp(data.pkt);
192                if ((key >> 32) > (last_key >> 32) + packet_interval) {
193                        libtrace_generic_t tmp = {.ptr = results};
194                        trace_publish_result(trace, t, key, 
195                                        tmp, RESULT_USER);
196                        trace_post_reporter(trace);
197                        last_key = key;
198                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
199
200                }
201
202                for(i=0;i<filter_count;++i) {
203                        if(trace_apply_filter(filters[i].filter, data.pkt)) {
204                                results->filters[i].count++;
205                                results->filters[i].bytes+=trace_get_wire_length(data.pkt);
206                        }
207                }
208
209                results->total.count++;
210                results->total.bytes +=trace_get_wire_length(data.pkt);
211                return data.pkt;
212
213        case MESSAGE_STARTING:
214                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
215                break;
216
217        case MESSAGE_STOPPING:
218                // Should we always post this?
219                if (results->total.count) {
220                        libtrace_generic_t tmp = {.ptr = results};
221                        trace_publish_result(trace, t, last_key, tmp, RESULT_USER);
222                        trace_post_reporter(trace);
223                        results = NULL;
224                }
225                break;
226
227        case MESSAGE_TICK_INTERVAL:
228        case MESSAGE_TICK_COUNT:
229                {
230                        if (data.uint64 > last_key) {
231                                libtrace_generic_t tmp = {.ptr = results};
232                                trace_publish_result(trace, t, data.uint64, 
233                                                tmp, RESULT_USER);
234                                trace_post_reporter(trace);
235                                last_key = data.uint64;
236                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
237                        }
238                        break;
239                }
240        }
241        return NULL;
242}
243
244/* Process a trace, counting packets that match filter(s) */
245static void run_trace(char *uri)
246{
247        libtrace_t *trace = NULL;
248        if (!merge_inputs) 
249                create_output(uri);
250
251        if (output == NULL)
252                return;
253
254        trace = trace_create(uri);
255        if (trace_is_err(trace)) {
256                trace_perror(trace,"trace_create");
257                trace_destroy(trace);
258                if (!merge_inputs)
259                        output_destroy(output);
260                return;
261        }
262        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
263        trace_set_tracetime(trace, true);
264        trace_set_perpkt_threads(trace, threadcount);
265
266        if (trace_get_information(trace)->live) {
267                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
268        }
269
270        if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
271                trace_perror(trace,"Failed to start trace");
272                trace_destroy(trace);
273                if (!merge_inputs)
274                        output_destroy(output);
275                return;
276        }
277
278
279        // Wait for all threads to stop
280        trace_join(trace);
281       
282        // Flush the last one out
283        report_results((glob_last_ts >> 32), count, bytes);
284        if (trace_is_err(trace))
285                trace_perror(trace,"%s",uri);
286
287        trace_destroy(trace);
288
289        if (!merge_inputs)
290                output_destroy(output);
291       
292}
293
294// TODO Decide what to do with -c option
295static void usage(char *argv0)
296{
297        fprintf(stderr,"Usage:\n"
298        "%s flags libtraceuri [libtraceuri...]\n"
299        "-i --interval=seconds  Duration of reporting interval in seconds\n"
300        "-c --count=packets     Exit after count packets have been processed\n"
301        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
302        "-o --output-format=txt|csv|html|png Reporting output format\n"
303        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
304        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
305        "-H --libtrace-help     Print libtrace runtime documentation\n"
306        ,argv0);
307}
308
309int main(int argc, char *argv[]) {
310
311        int i;
312       
313        while(1) {
314                int option_index;
315                struct option long_options[] = {
316                        { "filter",             1, 0, 'f' },
317                        { "interval",           1, 0, 'i' },
318                        { "count",              1, 0, 'c' },
319                        { "output-format",      1, 0, 'o' },
320                        { "libtrace-help",      0, 0, 'H' },
321                        { "merge-inputs",       0, 0, 'm' },
322                        { "threads",            1, 0, 't' },
323                        { NULL,                 0, 0, 0   },
324                };
325
326                int c=getopt_long(argc, argv, "c:f:i:o:t:Hm",
327                                long_options, &option_index);
328
329                if (c==-1)
330                        break;
331
332                switch (c) {
333                        case 'f':
334                                ++filter_count;
335                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
336                                filters[filter_count-1].expr=strdup(optarg);
337                                filters[filter_count-1].filter=trace_create_filter(optarg);
338                                filters[filter_count-1].count=0;
339                                filters[filter_count-1].bytes=0;
340                                break;
341                        case 't':
342                                threadcount = atoi(optarg);
343                                if (threadcount <= 0)
344                                        threadcount = 1;
345                                break;
346                        case 'i':
347                                packet_interval=atof(optarg);
348                                break;
349                        case 'c':
350                                packet_count=strtoul(optarg, NULL, 10);
351                                break;
352                        case 'o':
353                                if (output_format) free(output_format);
354                                output_format=strdup(optarg);
355                                break;
356                        case 'm':
357                                merge_inputs = 1;
358                                break;
359                        case 'H':
360                                  trace_help(); 
361                                  exit(1); 
362                                  break;       
363                        default:
364                                fprintf(stderr,"Unknown option: %c\n",c);
365                                usage(argv[0]);
366                                return 1;
367                }
368        }
369
370        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
371                packet_interval = 300; /* every 5 minutes */
372        }
373
374        if (optind >= argc)
375                return 0;
376
377        if (output_format)
378                fprintf(stderr,"output format: '%s'\n",output_format);
379        else
380                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
381       
382       
383        if (merge_inputs) {
384                /* If we're merging the inputs, we only want to create all
385                 * the column headers etc. once rather than doing them once
386                 * per trace */
387
388                /* This is going to "name" the output based on the first
389                 * provided URI - admittedly not ideal */
390                create_output(argv[optind]);
391                if (output == NULL)
392                        return 0;
393        }
394               
395        for(i=optind;i<argc;++i) {
396                run_trace(argv[i]);
397        }
398
399        if (merge_inputs) {
400                /* Clean up after ourselves */
401                output_destroy(output);
402        }
403
404
405        return 0;
406}
Note: See TracBrowser for help on using the repository browser.