source: tools/tracestats/tracestats_parallel.c @ 5ab626a

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5ab626a was 5ab626a, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Deprecate trace_get_filtered/accepted/recevied/dropped() in favour of a single function

Adds the single trace_get_statistics function. This allows the structure to be filled
at a point in time, rather than making multiple calls to the library during which state
might have changed.

This has been designed such that the structure can be added to in the future without
breaking old code.

The old internal get_captured_packets was removed from the formats as it was never used.
Eventually we should completely remove get_filtered and received from the formats and replace
them with get_statistics.

In additon some extra fields have added, such as error and captured and the pre-existing
fields are better defined.

The linux formats have been updated to use this new API, which combined with reading
/proc/net/dev returns a full set of statistics.

  • Property mode set to 100644
File size: 8.7 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31/*
32 * This program takes a series of traces and bpf filters and outputs how many
33 * bytes/packets
34 */
35
36#include <stdio.h>
37#include <stdlib.h>
38#include <assert.h>
39#include <string.h>
40#include <sys/time.h>
41#include <sys/types.h>
42#include <time.h>
43
44#include <netinet/in.h>
45#include <netinet/in_systm.h>
46#include <netinet/tcp.h>
47#include <netinet/ip.h>
48#include <netinet/ip_icmp.h>
49#include <arpa/inet.h>
50#include <sys/socket.h>
51#include <getopt.h>
52#include <inttypes.h>
53#include <signal.h>
54
55#include "libtrace.h"
56#include "lt_inttypes.h"
57#include "data-struct/vector.h"
58#include "data-struct/message_queue.h"
59#include "combiners.h"
60#include <pthread.h>
61
62struct libtrace_t *trace = NULL;
63
64static void cleanup_signal(int signal)
65{
66        static int s = 0;
67        (void)signal;
68        // trace_interrupt();
69        // trace_pstop isn't really signal safe because its got lots of locks in it
70    trace_pstop(trace);
71    /*if (s == 0) {
72                if (trace_ppause(trace) == -1)
73                        trace_perror(trace, "Pause failed");
74        }
75        else {
76                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
77                        trace_perror(trace, "Start failed");
78    }*/
79        s = !s;
80}
81
82struct filter_t {
83        char *expr;
84        struct libtrace_filter_t *filter;
85        uint64_t count;
86        uint64_t bytes;
87} *filters = NULL;
88int filter_count=0;
89volatile uint64_t totcount = 0;
90volatile uint64_t totbytes = 0;
91
92
93typedef struct global_blob {
94        uint64_t * totcount;
95        uint64_t * totbytes;
96} global_blob_t;
97
98typedef struct statistics {
99        uint64_t count;
100        uint64_t bytes;
101} statistics_t;
102
103
104static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
105{
106        // Using first entry as total and those after for filter counts
107        static __thread statistics_t * results = NULL;
108        int i;
109       
110        if (pkt) {
111                int wlen = trace_get_wire_length(pkt);
112                for(i=0;i<filter_count;++i) {
113                        if (filters[i].filter == NULL)
114                                continue;
115                        if(trace_apply_filter(filters[i].filter,pkt) > 0) {
116                                results[i+1].count++;
117                                results[i+1].bytes+=wlen;
118                        }
119                        if (trace_is_err(trace)) {
120                                trace_perror(trace, "trace_apply_filter");
121                                fprintf(stderr, "Removing filter from filterlist\n");
122                                // XXX might be a problem across threads below
123                                filters[i].filter = NULL;
124                        }
125                }
126                results[0].count++;
127                results[0].bytes +=wlen;
128        }
129        if (mesg) {
130                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
131                switch (mesg->code) {
132                        case MESSAGE_STOPPING:
133                                trace_publish_result(trace, t, 0, (libtrace_generic_types_t){.ptr = results}, RESULT_NORMAL); // Only ever using a single key 0
134                                //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
135                                break;
136                        case MESSAGE_STARTING:
137                                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
138                                break;
139                        case MESSAGE_DO_PAUSE:
140                                assert(!"GOT Asked to pause!!!\n");
141                                break;
142                        case MESSAGE_PAUSING:
143                                //fprintf(stderr, "tracestats_parallel:\t pausing thread\n");
144                                break;
145                        case MESSAGE_RESUMING:
146                                //fprintf(stderr, "tracestats_parallel:\t resuming thread\n");
147                                break;
148                }
149        }
150        return pkt;
151}
152
153static void report_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg) {
154        static uint64_t count=0, bytes=0;
155        uint64_t packets;
156        int i;
157        if (result) {
158                int j;
159                /* Get the results from each core and sum 'em up */
160                assert(libtrace_result_get_key(result) == 0);
161                statistics_t * res = libtrace_result_get_value(result).ptr;
162                count += res[0].count;
163                bytes += res[0].bytes;
164                for (j = 0; j < filter_count; j++) {
165                        filters[j].count += res[j+1].count;
166                        filters[j].bytes += res[j+1].bytes;
167                }
168                free(res);
169        } else switch (mesg->code) {
170                libtrace_stat_t *stats;
171                case MESSAGE_STOPPING:
172                        stats = trace_get_statistics(trace, NULL);
173                        printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
174                        for(i=0;i<filter_count;++i) {
175                                printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
176                                filters[i].bytes=0;
177                                filters[i].count=0;
178                        }
179                        if (stats->received_valid)
180                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
181                                                "Input packets", stats->received);
182                        if (stats->filtered_valid)
183                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
184                                                "Filtered packets", stats->filtered);
185                        if (stats->dropped_valid)
186                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
187                                                "Dropped packets",stats->dropped);
188                        if (stats->accepted_valid)
189                                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
190                                                "Accepted packets", stats->accepted);
191                        if (stats->errors_valid)
192                                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
193                                                "Erred packets", stats->errors);
194                        printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
195                        totcount+=count;
196                        totbytes+=bytes;
197        }
198}
199
200static uint64_t rand_hash(libtrace_packet_t * pkt, void *data) {
201        return rand();
202}
203
204static uint64_t bad_hash(libtrace_packet_t * pkt, void *data) {
205        return 0;
206}
207
208struct user_configuration uc;
209
210
211/* Process a trace, counting packets that match filter(s) */
212static void run_trace(char *uri) 
213{
214
215        fprintf(stderr,"%s:\n",uri);
216
217        trace = trace_create(uri);
218
219        if (trace_is_err(trace)) {
220                trace_perror(trace,"Failed to create trace");
221                return;
222        }
223       
224        int option = 2;
225        //option = 10000;
226    //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
227        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
228        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
229        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_types_t){0});
230
231        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
232
233        /* OPTIONALLY SETUP CORES HERE BUT WE DON'T CARE ABOUT THAT YET XXX */
234
235        /*if (trace_start(trace)==-1) {
236        trace_perror(trace,"Failed to start trace");
237        return;
238        }*/
239        global_blob_t blob;
240
241
242        if (trace_pstart(trace, (void *)&blob, &per_packet, report_result)==-1) {
243                trace_perror(trace,"Failed to start trace");
244                return;
245        }
246
247        // Wait for all threads to stop
248        trace_join(trace);
249
250        //map_pair_iterator_t * results = NULL;
251        //trace_get_results(trace, &results);
252
253        //if (results != NULL) {
254        //      reduce(trace, global_blob, results);
255        //}
256        if (trace_is_err(trace))
257                trace_perror(trace,"%s",uri);
258
259        print_contention_stats(trace);
260        trace_destroy(trace);
261}
262
263static void usage(char *argv0)
264{
265        fprintf(stderr,"Usage: %s [-H|--libtrace-help] [--filter|-f bpf ]... libtraceuri...\n",argv0);
266}
267
268int main(int argc, char *argv[]) {
269
270        int i;
271        struct sigaction sigact;
272        ZERO_USER_CONFIG(uc);
273        while(1) {
274                int option_index;
275                struct option long_options[] = {
276                        { "filter",        1, 0, 'f' },
277                        { "libtrace-help", 0, 0, 'H' },
278                        { "config",             1, 0, 'u' },
279                        { "config-file",                1, 0, 'U' },
280                        { NULL,            0, 0, 0   },
281                };
282
283                int c=getopt_long(argc, argv, "f:Hu:U:",
284                                long_options, &option_index);
285
286                if (c==-1)
287                        break;
288
289                switch (c) {
290                        case 'f':
291                                ++filter_count;
292                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
293                                filters[filter_count-1].expr=strdup(optarg);
294                                filters[filter_count-1].filter=trace_create_filter(optarg);
295                                filters[filter_count-1].count=0;
296                                filters[filter_count-1].bytes=0;
297                                break;
298                        case 'H':
299                                trace_help();
300                                exit(1);
301                                break;
302                        case 'u':
303                                  parse_user_config(&uc, optarg);
304                                  break;
305                        case 'U':;
306                                FILE * f = fopen(optarg, "r");
307                                if (f != NULL) {
308                                        parse_user_config_file(&uc, f);
309                                } else {
310                                        perror("Failed to open configuration file\n");
311                                        usage(argv[0]);
312                                }
313                                break;
314                        default:
315                                fprintf(stderr,"Unknown option: %c\n",c);
316                                usage(argv[0]);
317                                return 1;
318                }
319        }
320
321        sigact.sa_handler = cleanup_signal;
322        sigemptyset(&sigact.sa_mask);
323        sigact.sa_flags = SA_RESTART;
324
325        sigaction(SIGINT, &sigact, NULL);
326        sigaction(SIGTERM, &sigact, NULL);
327       
328        for(i=optind;i<argc;++i) {
329                run_trace(argv[i]);
330        }
331        if (optind+1<argc) {
332                printf("Grand total:\n");
333                printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",totcount,totbytes);
334        }
335       
336        return 0;
337}
Note: See TracBrowser for help on using the repository browser.