source: tools/tracestats/tracestats_parallel.c @ fac8c46

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since fac8c46 was fac8c46, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Tidies up the pausing so that it now works as expected and a trace can easily be paused and restarted.
Ensures that packets will not be lost if pause is called on a file, any queued packets will be read (a message is sent allowing the user to drop these packets if they are unwanted).
Differentiates packets from other results in the queues to the reducer/reporter and makes a copy of the packets in result queues when pausing

  • this is needed to ensure that bad memory isn't referenced if a zero-copy trace is paused by closing sockets/associated data like in the case of ring:.

Fixed up the re-starting of traces which hadn't been finished to account for different configurations.
Adds a 'state' to libtrace to handle the state of parallel traces, rather than hacking around the existing 'started' boolean. Also provides two levels of checks for consistency if the trace is using existing that are checking started.

Various other bug fixes and tidy ups.

  • Property mode set to 100644
File size: 8.4 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31/*
32 * This program takes a series of traces and bpf filters and outputs how many
33 * bytes/packets
34 */
35
36#include <stdio.h>
37#include <stdlib.h>
38#include <assert.h>
39#include <string.h>
40#include <sys/time.h>
41#include <sys/types.h>
42#include <time.h>
43
44#include <netinet/in.h>
45#include <netinet/in_systm.h>
46#include <netinet/tcp.h>
47#include <netinet/ip.h>
48#include <netinet/ip_icmp.h>
49#include <arpa/inet.h>
50#include <sys/socket.h>
51#include <getopt.h>
52#include <inttypes.h>
53#include <signal.h>
54
55#include "libtrace.h"
56#include "lt_inttypes.h"
57#include "data-struct/vector.h"
58#include "data-struct/message_queue.h"
59#include <pthread.h>
60
61struct libtrace_t *trace = NULL;
62
63static void cleanup_signal(int signal)
64{
65        static int s = 0;
66        (void)signal;
67        // trace_interrupt();
68        // trace_pstop isn't really signal safe because its got lots of locks in it
69        // trace_pstop(trace);
70        if (s == 0) {
71                if (trace_ppause(trace) == -1)
72                        trace_perror(trace, "Pause failed");
73        }
74        else {
75                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
76                        trace_perror(trace, "Start failed");
77        }
78        s = !s;
79}
80
81struct filter_t {
82        char *expr;
83        struct libtrace_filter_t *filter;
84        uint64_t count;
85        uint64_t bytes;
86} *filters = NULL;
87int filter_count=0;
88volatile uint64_t totcount = 0;
89volatile uint64_t totbytes = 0;
90
91
92typedef struct global_blob {
93        uint64_t * totcount;
94        uint64_t * totbytes;
95} global_blob_t;
96
97typedef struct statistics {
98        uint64_t count;
99        uint64_t bytes;
100} statistics_t;
101
102
103static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
104{
105        // Using first entry as total and those after for filter counts
106        static __thread statistics_t * results = NULL;
107        int i;
108       
109        if (pkt) {
110                int wlen = trace_get_wire_length(pkt);
111                for(i=0;i<filter_count;++i) {
112                        if (filters[i].filter == NULL)
113                                continue;
114                        if(trace_apply_filter(filters[i].filter,pkt) > 0) {
115                                results[i+1].count++;
116                                results[i+1].bytes+=wlen;
117                        }
118                        if (trace_is_err(trace)) {
119                                trace_perror(trace, "trace_apply_filter");
120                                fprintf(stderr, "Removing filter from filterlist\n");
121                                // XXX might be a problem across threads below
122                                filters[i].filter = NULL;
123                        }
124                }
125                results[0].count++;
126                results[0].bytes +=wlen;
127        }
128        if (mesg) {
129                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
130                switch (mesg->code) {
131                        case MESSAGE_STOPPED:
132                                trace_publish_result(trace, 0, results); // Only ever using a single key 0
133                                fprintf(stderr, "Thread published resuslts WOWW\n");
134                                break;
135                        case MESSAGE_STARTED:
136                                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
137                                break;
138                        case MESSAGE_DO_PAUSE:
139                                fprintf(stderr, "GOT Asked to pause ahh\n");
140                                break;
141                        case MESSAGE_PAUSING:
142                                fprintf(stderr, "Thread is pausing\n");
143                                break;
144                        case MESSAGE_PAUSED:
145                                fprintf(stderr, "Thread has paused\n");
146                                break;
147                }
148        }
149        return pkt;
150}
151
152static int reduce(libtrace_t* trace, void* global_blob)
153{
154        int i,j;
155        uint64_t count=0, bytes=0;
156        libtrace_vector_t results;
157        libtrace_vector_init(&results, sizeof(libtrace_result_t));
158        trace_get_results(trace, &results);
159        uint64_t packets;
160       
161        /* Get the results from each core and sum 'em up */
162        for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
163                libtrace_result_t result;
164                assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
165                assert(libtrace_result_get_key(&result) == 0);
166                statistics_t * res = libtrace_result_get_value(&result);
167                count += res[0].count;
168                bytes += res[0].bytes;
169                for (j = 0; j < filter_count; j++) {
170                        filters[j].count += res[j+1].count;
171                        filters[j].bytes += res[j+1].bytes;
172                }
173                free(res);
174        }
175        // Done with these results - Free internally and externally
176        libtrace_vector_destroy(&results);
177
178    printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
179        for(i=0;i<filter_count;++i) {
180                printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
181                filters[i].bytes=0;
182                filters[i].count=0;
183        }
184        packets=trace_get_received_packets(trace);
185        if (packets!=UINT64_MAX)
186                fprintf(stderr,"%30s:\t%12" PRIu64"\n", 
187                                "Input packets", packets);
188        packets=trace_get_filtered_packets(trace);
189        if (packets!=UINT64_MAX)
190                fprintf(stderr,"%30s:\t%12" PRIu64"\n", 
191                                "Filtered packets", packets);
192        packets=trace_get_dropped_packets(trace);
193        if (packets!=UINT64_MAX)
194                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
195                                "Dropped packets",packets);
196        packets=trace_get_accepted_packets(trace);
197        if (packets!=UINT64_MAX)
198                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
199                                "Accepted Packets",packets);
200        printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
201        totcount+=count;
202        totbytes+=bytes;
203       
204        return 0;
205}
206
207static uint64_t rand_hash(libtrace_packet_t * pkt, void *data) {
208        return rand();
209}
210
211static uint64_t bad_hash(libtrace_packet_t * pkt, void *data) {
212        return 0;
213}
214
215/* Process a trace, counting packets that match filter(s) */
216static void run_trace(char *uri) 
217{
218
219        fprintf(stderr,"%s:\n",uri);
220
221        trace = trace_create(uri);
222
223        if (trace_is_err(trace)) {
224                trace_perror(trace,"Failed to create trace");
225                return;
226        }
227       
228        int option = 2;
229        //option = 10000;
230        trace_parallel_config(trace, TRACE_OPTION_USE_DEDICATED_HASHER, &option);
231        trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
232        //trace_parallel_config(trace, TRACE_OPTION_USE_SLIDING_WINDOW_BUFFER, &option);
233        option = 2;
234        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
235        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
236
237        /* OPTIONALLY SETUP CORES HERE BUT WE DON'T CARE ABOUT THAT YET XXX */
238
239        /*if (trace_start(trace)==-1) {
240        trace_perror(trace,"Failed to start trace");
241        return;
242        }*/
243        global_blob_t blob;
244
245
246        if (trace_pstart(trace, (void *)&blob, &per_packet, NULL)==-1) {
247                trace_perror(trace,"Failed to start trace");
248                return;
249        }
250
251        // Wait for all threads to stop
252        trace_join(trace);
253        reduce(trace, NULL);
254
255        //map_pair_iterator_t * results = NULL;
256        //trace_get_results(trace, &results);
257
258        //if (results != NULL) {
259        //      reduce(trace, global_blob, results);
260        //}
261        if (trace_is_err(trace))
262                trace_perror(trace,"%s",uri);
263
264        print_contention_stats(trace);
265        trace_destroy(trace);
266}
267
268static void usage(char *argv0)
269{
270        fprintf(stderr,"Usage: %s [-H|--libtrace-help] [--filter|-f bpf ]... libtraceuri...\n",argv0);
271}
272
273int main(int argc, char *argv[]) {
274
275        int i;
276        struct sigaction sigact;
277
278        while(1) {
279                int option_index;
280                struct option long_options[] = {
281                        { "filter",        1, 0, 'f' },
282                        { "libtrace-help", 0, 0, 'H' },
283                        { NULL,            0, 0, 0   },
284                };
285
286                int c=getopt_long(argc, argv, "f:H",
287                                long_options, &option_index);
288
289                if (c==-1)
290                        break;
291
292                switch (c) {
293                        case 'f':
294                                ++filter_count;
295                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
296                                filters[filter_count-1].expr=strdup(optarg);
297                                filters[filter_count-1].filter=trace_create_filter(optarg);
298                                filters[filter_count-1].count=0;
299                                filters[filter_count-1].bytes=0;
300                                break;
301                        case 'H':
302                                trace_help();
303                                exit(1);
304                                break;
305                        default:
306                                fprintf(stderr,"Unknown option: %c\n",c);
307                                usage(argv[0]);
308                                return 1;
309                }
310        }
311
312        sigact.sa_handler = cleanup_signal;
313        sigemptyset(&sigact.sa_mask);
314        sigact.sa_flags = SA_RESTART;
315
316        sigaction(SIGINT, &sigact, NULL);
317        sigaction(SIGTERM, &sigact, NULL);
318       
319        for(i=optind;i<argc;++i) {
320                run_trace(argv[i]);
321        }
322        if (optind+1<argc) {
323                printf("Grand total:\n");
324                printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",totcount,totbytes);
325        }
326       
327        return 0;
328}
Note: See TracBrowser for help on using the repository browser.