source: tools/tracertstats/tracertstats.c @ a8cfe71

cachetimestampsdevelopdpdk-ndagetsiliverc-4.0.3rc-4.0.4ringdecrementfixringperformance
Last change on this file since a8cfe71 was a8cfe71, checked in by Shane Alcock <salcock@…>, 3 years ago

Add proper SIGINT / SIGTERM handling to tracertstats

  • Property mode set to 100644
File size: 12.2 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28/* This program takes a series of traces and bpf filters and outputs how many
29 * bytes/packets every time interval
30 */
31
32#include <stdio.h>
33#include <stdlib.h>
34#include <assert.h>
35#include <string.h>
36#include <sys/time.h>
37#include <sys/types.h>
38#include <time.h>
39
40#include <netinet/in.h>
41#include <netinet/in_systm.h>
42#include <netinet/tcp.h>
43#include <netinet/ip.h>
44#include <netinet/ip_icmp.h>
45#include <arpa/inet.h>
46#include <sys/socket.h>
47#include <getopt.h>
48#include <inttypes.h>
49#include <signal.h>
50
51#include <lt_inttypes.h>
52#include "libtrace_parallel.h"
53#include "output.h"
54#include "rt_protocol.h"
55#include "dagformat.h"
56
57#ifndef UINT32_MAX
58        #define UINT32_MAX      0xffffffffU
59#endif
60
61#define DEFAULT_OUTPUT_FMT "txt"
62
63char *output_format=NULL;
64int merge_inputs = 0;
65int threadcount = 4;
66int filter_count=0;
67int burstsize=10;
68
69struct filter_t {
70        char *expr;
71        struct libtrace_filter_t *filter;
72        uint64_t count;
73        uint64_t bytes;
74} *filters = NULL;
75
76uint64_t packet_count=UINT64_MAX;
77double packet_interval=UINT32_MAX;
78
79struct output_data_t *output = NULL;
80
81uint64_t count;
82uint64_t bytes;
83
84struct libtrace_t *currenttrace;
85
86static void cleanup_signal(int signal UNUSED) {
87        if (currenttrace) {
88                trace_pstop(currenttrace);
89        }
90}
91
92static void report_results(double ts,uint64_t count,uint64_t bytes)
93{
94        int i=0;
95        output_set_data_time(output,0,ts);
96        output_set_data_int(output,1,count);
97        output_set_data_int(output,2,bytes);
98        for(i=0;i<filter_count;++i) {
99                output_set_data_int(output,i*2+3,filters[i].count);
100                output_set_data_int(output,i*2+4,filters[i].bytes);
101                filters[i].count=filters[i].bytes=0;
102        }
103        output_flush_row(output);
104}
105
106static void create_output(char *title) {
107        int i;
108       
109        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
110        if (!output) {
111                fprintf(stderr,"Failed to create output file\n");
112                return;
113        }
114        output_add_column(output,"ts");
115        output_add_column(output,"packets");
116        output_add_column(output,"bytes");
117        for(i=0;i<filter_count;++i) {
118                char buff[1024];
119                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
120                output_add_column(output,buff);
121                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
122                output_add_column(output,buff);
123        }
124        output_flush_headings(output);
125
126}
127
128typedef struct statistic {
129        uint64_t count;
130        uint64_t bytes;
131} statistic_t;
132
133typedef struct result {
134        struct statistic total;
135        struct statistic filters[0];
136} result_t;
137
138static uint64_t glob_last_ts = 0;
139static void cb_result(libtrace_t *trace, libtrace_thread_t *sender UNUSED,
140                void *global UNUSED, void *tls UNUSED,
141                libtrace_result_t *result) {
142        uint64_t ts = 0;
143        static bool stopped = false;
144        static uint64_t packets_seen = 0;
145        int j;
146        result_t *res;
147
148        if (stopped)
149                return;
150
151        ts = result->key;
152        res = result->value.ptr;
153        if (glob_last_ts == 0)
154                glob_last_ts = ts;
155        while ((glob_last_ts >> 32) < (ts >> 32)) {
156                report_results(glob_last_ts >> 32, count, bytes);
157                count = 0;
158                bytes = 0;
159                for (j = 0; j < filter_count; j++)
160                        filters[j].count = filters[j].bytes = 0;
161                glob_last_ts = ts;
162        }
163        count += res->total.count;
164        packets_seen += res->total.count;
165        bytes += res->total.bytes;
166        for (j = 0; j < filter_count; j++) {
167                filters[j].count += res->filters[j].count;
168                filters[j].bytes += res->filters[j].bytes;
169        }
170        free(res);
171
172        /* Be careful to only call pstop once from within this thread! */
173        if (packets_seen > packet_count) {
174                trace_pstop(trace);
175                stopped = true;
176        }
177}
178
179typedef struct threadlocal {
180        result_t *results;
181        uint64_t last_key;
182} thread_data_t;
183
184static void *cb_starting(libtrace_t *trace UNUSED,
185        libtrace_thread_t *t UNUSED, void *global UNUSED)
186{
187        thread_data_t *td = calloc(1, sizeof(thread_data_t));
188        td->results = calloc(1, sizeof(result_t) +
189                        sizeof(statistic_t) * filter_count);
190        return td;
191}
192
193static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
194                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
195
196        uint64_t key;
197        thread_data_t *td = (thread_data_t *)tls;
198        int i;
199
200        if (IS_LIBTRACE_META_PACKET(packet)) {
201                return packet;
202        }
203
204        key = trace_get_erf_timestamp(packet);
205        if ((key >> 32) >= (td->last_key >> 32) + packet_interval) {
206                libtrace_generic_t tmp = {.ptr = td->results};
207                trace_publish_result(trace, t, key,
208                                tmp, RESULT_USER);
209                trace_post_reporter(trace);
210                td->last_key = key;
211                td->results = calloc(1, sizeof(result_t) +
212                                sizeof(statistic_t) * filter_count);
213        }
214        for(i=0;i<filter_count;++i) {
215                if(trace_apply_filter(filters[i].filter, packet)) {
216                        td->results->filters[i].count++;
217                        td->results->filters[i].bytes+=trace_get_wire_length(packet);
218                }
219        }
220
221        td->results->total.count++;
222        td->results->total.bytes +=trace_get_wire_length(packet);
223        return packet;
224}
225
226static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
227                void *global UNUSED, void *tls) {
228
229        thread_data_t *td = (thread_data_t *)tls;
230        if (td->results->total.count) {
231                libtrace_generic_t tmp = {.ptr = td->results};
232                trace_publish_result(trace, t, td->last_key, tmp, RESULT_USER);
233                trace_post_reporter(trace);
234                td->results = NULL;
235        }
236}
237
238static void cb_tick(libtrace_t *trace, libtrace_thread_t *t,
239                void *global UNUSED, void *tls, uint64_t order) {
240
241        thread_data_t *td = (thread_data_t *)tls;
242        if (order > td->last_key) {
243                libtrace_generic_t tmp = {.ptr = td->results};
244                trace_publish_result(trace, t, order, tmp, RESULT_USER);
245                trace_post_reporter(trace);
246                td->last_key = order;
247                td->results = calloc(1, sizeof(result_t) +
248                                sizeof(statistic_t) * filter_count);
249        }
250}
251
252/* Process a trace, counting packets that match filter(s) */
253static void run_trace(char *uri)
254{
255        libtrace_t *trace = NULL;
256        libtrace_callback_set_t *pktcbs, *repcbs;
257
258        if (!merge_inputs) 
259                create_output(uri);
260
261        if (output == NULL)
262                return;
263
264        trace = trace_create(uri);
265        if (trace_is_err(trace)) {
266                trace_perror(trace,"trace_create");
267                trace_destroy(trace);
268                if (!merge_inputs)
269                        output_destroy(output);
270                return;
271        }
272        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
273        trace_set_perpkt_threads(trace, threadcount);
274        trace_set_burst_size(trace, burstsize);
275
276        if (trace_get_information(trace)->live) {
277                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
278        } else {
279                trace_set_tracetime(trace, true);
280        }
281
282        pktcbs = trace_create_callback_set();
283        trace_set_starting_cb(pktcbs, cb_starting);
284        trace_set_stopping_cb(pktcbs, cb_stopping);
285        trace_set_packet_cb(pktcbs, cb_packet);
286        trace_set_tick_count_cb(pktcbs, cb_tick);
287        trace_set_tick_interval_cb(pktcbs, cb_tick);
288
289        repcbs = trace_create_callback_set();
290        trace_set_result_cb(repcbs, cb_result);
291
292        currenttrace = trace;
293        if (trace_pstart(trace, NULL, pktcbs, repcbs)==-1) {
294                trace_perror(trace,"Failed to start trace");
295                trace_destroy(trace);
296                trace_destroy_callback_set(pktcbs);
297                trace_destroy_callback_set(repcbs);
298                if (!merge_inputs)
299                        output_destroy(output);
300                return;
301        }
302
303
304        // Wait for all threads to stop
305        trace_join(trace);
306       
307        // Flush the last one out
308        report_results((glob_last_ts >> 32), count, bytes);
309        if (trace_is_err(trace))
310                trace_perror(trace,"%s",uri);
311
312        trace_destroy(trace);
313        trace_destroy_callback_set(pktcbs);
314        trace_destroy_callback_set(repcbs);
315
316        if (!merge_inputs)
317                output_destroy(output);
318       
319}
320
321// TODO Decide what to do with -c option
322static void usage(char *argv0)
323{
324        fprintf(stderr,"Usage:\n"
325        "%s flags libtraceuri [libtraceuri...]\n"
326        "-i --interval=seconds  Duration of reporting interval in seconds\n"
327        "-c --count=packets     Exit after count packets have been processed\n"
328        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
329        "-o --output-format=txt|csv|html|png Reporting output format\n"
330        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
331        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
332        "-N --nobuffer          Disable packet buffering within libtrace to force faster\n"
333        "                       updates at very low traffic rates\n"
334        "-h --help      Print this usage statement\n"
335        ,argv0);
336}
337
338int main(int argc, char *argv[]) {
339
340        int i;
341        struct sigaction sigact;
342       
343        while(1) {
344                int option_index;
345                struct option long_options[] = {
346                        { "filter",             1, 0, 'f' },
347                        { "interval",           1, 0, 'i' },
348                        { "count",              1, 0, 'c' },
349                        { "output-format",      1, 0, 'o' },
350                        { "help",               0, 0, 'h' },
351                        { "merge-inputs",       0, 0, 'm' },
352                        { "threads",            1, 0, 't' },
353                        { "nobuffer",           0, 0, 'N' },
354                        { NULL,                 0, 0, 0   },
355                };
356
357                int c=getopt_long(argc, argv, "c:f:i:o:t:hmN",
358                                long_options, &option_index);
359
360                if (c==-1)
361                        break;
362
363                switch (c) {
364                        case 'N':
365                                burstsize = 1;
366                                break;
367                        case 'f':
368                                ++filter_count;
369                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
370                                filters[filter_count-1].expr=strdup(optarg);
371                                filters[filter_count-1].filter=trace_create_filter(optarg);
372                                filters[filter_count-1].count=0;
373                                filters[filter_count-1].bytes=0;
374                                break;
375                        case 't':
376                                threadcount = atoi(optarg);
377                                if (threadcount <= 0)
378                                        threadcount = 1;
379                                break;
380                        case 'i':
381                                packet_interval=atof(optarg);
382                                break;
383                        case 'c':
384                                packet_count=strtoul(optarg, NULL, 10);
385                                break;
386                        case 'o':
387                                if (output_format) free(output_format);
388                                output_format=strdup(optarg);
389                                break;
390                        case 'm':
391                                merge_inputs = 1;
392                                break;
393                        case 'h':
394                                  usage(argv[0]);
395                                  return 1;
396                        default:
397                                fprintf(stderr,"Unknown option: %c\n",c);
398                                usage(argv[0]);
399                                return 1;
400                }
401        }
402
403        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
404                packet_interval = 60; /* every minute */
405        }
406
407        if (optind >= argc)
408                return 0;
409
410        if (output_format)
411                fprintf(stderr,"output format: '%s'\n",output_format);
412        else
413                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
414       
415       
416        if (merge_inputs) {
417                /* If we're merging the inputs, we only want to create all
418                 * the column headers etc. once rather than doing them once
419                 * per trace */
420
421                /* This is going to "name" the output based on the first
422                 * provided URI - admittedly not ideal */
423                create_output(argv[optind]);
424                if (output == NULL)
425                        return 0;
426        }
427       
428        sigact.sa_handler = cleanup_signal;
429        sigemptyset(&sigact.sa_mask);
430        sigact.sa_flags = SA_RESTART;
431
432        sigaction(SIGINT, &sigact, NULL);
433        sigaction(SIGTERM, &sigact, NULL);
434
435
436        for(i=optind;i<argc;++i) {
437                run_trace(argv[i]);
438        }
439
440        if (merge_inputs) {
441                /* Clean up after ourselves */
442                output_destroy(output);
443        }
444
445
446        return 0;
447}
Note: See TracBrowser for help on using the repository browser.