source: tools/tracestats/tracestats_parallel.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 8.5 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31/*
32 * This program takes a series of traces and bpf filters and outputs how many
33 * bytes/packets
34 */
35
36#include <stdio.h>
37#include <stdlib.h>
38#include <assert.h>
39#include <string.h>
40#include <sys/time.h>
41#include <sys/types.h>
42#include <time.h>
43
44#include <netinet/in.h>
45#include <netinet/in_systm.h>
46#include <netinet/tcp.h>
47#include <netinet/ip.h>
48#include <netinet/ip_icmp.h>
49#include <arpa/inet.h>
50#include <sys/socket.h>
51#include <getopt.h>
52#include <inttypes.h>
53#include <signal.h>
54
55#include "libtrace.h"
56#include "lt_inttypes.h"
57#include "data-struct/vector.h"
58#include "data-struct/message_queue.h"
59#include <pthread.h>
60
61struct libtrace_t *trace = NULL;
62
63static void cleanup_signal(int signal)
64{
65        static int s = 0;
66        (void)signal;
67        // trace_interrupt();
68        // trace_pstop isn't really signal safe because its got lots of locks in it
69    trace_pstop(trace);
70    /*if (s == 0) {
71                if (trace_ppause(trace) == -1)
72                        trace_perror(trace, "Pause failed");
73        }
74        else {
75                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
76                        trace_perror(trace, "Start failed");
77    }*/
78        s = !s;
79}
80
81struct filter_t {
82        char *expr;
83        struct libtrace_filter_t *filter;
84        uint64_t count;
85        uint64_t bytes;
86} *filters = NULL;
87int filter_count=0;
88volatile uint64_t totcount = 0;
89volatile uint64_t totbytes = 0;
90
91
92typedef struct global_blob {
93        uint64_t * totcount;
94        uint64_t * totbytes;
95} global_blob_t;
96
97typedef struct statistics {
98        uint64_t count;
99        uint64_t bytes;
100} statistics_t;
101
102
103static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
104{
105        // Using first entry as total and those after for filter counts
106        static __thread statistics_t * results = NULL;
107        int i;
108       
109        if (pkt) {
110                int wlen = trace_get_wire_length(pkt);
111                for(i=0;i<filter_count;++i) {
112                        if (filters[i].filter == NULL)
113                                continue;
114                        if(trace_apply_filter(filters[i].filter,pkt) > 0) {
115                                results[i+1].count++;
116                                results[i+1].bytes+=wlen;
117                        }
118                        if (trace_is_err(trace)) {
119                                trace_perror(trace, "trace_apply_filter");
120                                fprintf(stderr, "Removing filter from filterlist\n");
121                                // XXX might be a problem across threads below
122                                filters[i].filter = NULL;
123                        }
124                }
125                results[0].count++;
126                results[0].bytes +=wlen;
127        }
128        if (mesg) {
129                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
130                switch (mesg->code) {
131                        case MESSAGE_STOPPING:
132                                trace_publish_result(trace, t, 0, results, RESULT_NORMAL); // Only ever using a single key 0
133                                fprintf(stderr, "Thread published resuslts WOWW\n");
134                                break;
135                        case MESSAGE_STARTING:
136                                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
137                                break;
138                        case MESSAGE_DO_PAUSE:
139                                fprintf(stderr, "GOT Asked to pause ahh\n");
140                                break;
141                        case MESSAGE_PAUSING:
142                                fprintf(stderr, "Thread is pausing\n");
143                                break;
144                        case MESSAGE_RESUMING:
145                                fprintf(stderr, "Thread has paused\n");
146                                break;
147                }
148        }
149        return pkt;
150}
151
152static void report_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg) {
153        static uint64_t count=0, bytes=0;
154        uint64_t packets;
155        int i;
156        if (result) {
157                int j;
158                /* Get the results from each core and sum 'em up */
159                assert(libtrace_result_get_key(result) == 0);
160                statistics_t * res = libtrace_result_get_value(result);
161                count += res[0].count;
162                bytes += res[0].bytes;
163                for (j = 0; j < filter_count; j++) {
164                        filters[j].count += res[j+1].count;
165                        filters[j].bytes += res[j+1].bytes;
166                }
167                free(res);
168        } else switch (mesg->code) {
169                case MESSAGE_STOPPING:
170                        printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
171                        for(i=0;i<filter_count;++i) {
172                                printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
173                                filters[i].bytes=0;
174                                filters[i].count=0;
175                        }
176                        packets=trace_get_received_packets(trace);
177                        if (packets!=UINT64_MAX)
178                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
179                                                "Input packets", packets);
180                        packets=trace_get_filtered_packets(trace);
181                        if (packets!=UINT64_MAX)
182                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
183                                                "Filtered packets", packets);
184                        packets=trace_get_dropped_packets(trace);
185                        if (packets!=UINT64_MAX)
186                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
187                                                "Dropped packets",packets);
188                        packets=trace_get_accepted_packets(trace);
189                        if (packets!=UINT64_MAX)
190                                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
191                                                "Accepted Packets",packets);
192                        printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
193                        totcount+=count;
194                        totbytes+=bytes;
195        }
196}
197
198static uint64_t rand_hash(libtrace_packet_t * pkt, void *data) {
199        return rand();
200}
201
202static uint64_t bad_hash(libtrace_packet_t * pkt, void *data) {
203        return 0;
204}
205
206struct user_configuration uc;
207
208
209/* Process a trace, counting packets that match filter(s) */
210static void run_trace(char *uri) 
211{
212
213        fprintf(stderr,"%s:\n",uri);
214
215        trace = trace_create(uri);
216
217        if (trace_is_err(trace)) {
218                trace_perror(trace,"Failed to create trace");
219                return;
220        }
221       
222        int option = 2;
223        //option = 10000;
224    //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
225        option = 2;
226        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
227        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
228        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &uc);
229
230        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
231
232        /* OPTIONALLY SETUP CORES HERE BUT WE DON'T CARE ABOUT THAT YET XXX */
233
234        /*if (trace_start(trace)==-1) {
235        trace_perror(trace,"Failed to start trace");
236        return;
237        }*/
238        global_blob_t blob;
239
240
241        if (trace_pstart(trace, (void *)&blob, &per_packet, report_result)==-1) {
242                trace_perror(trace,"Failed to start trace");
243                return;
244        }
245
246        // Wait for all threads to stop
247        trace_join(trace);
248
249        //map_pair_iterator_t * results = NULL;
250        //trace_get_results(trace, &results);
251
252        //if (results != NULL) {
253        //      reduce(trace, global_blob, results);
254        //}
255        if (trace_is_err(trace))
256                trace_perror(trace,"%s",uri);
257
258        print_contention_stats(trace);
259        trace_destroy(trace);
260}
261
262static void usage(char *argv0)
263{
264        fprintf(stderr,"Usage: %s [-H|--libtrace-help] [--filter|-f bpf ]... libtraceuri...\n",argv0);
265}
266
267int main(int argc, char *argv[]) {
268
269        int i;
270        struct sigaction sigact;
271        ZERO_USER_CONFIG(uc);
272        while(1) {
273                int option_index;
274                struct option long_options[] = {
275                        { "filter",        1, 0, 'f' },
276                        { "libtrace-help", 0, 0, 'H' },
277                        { "config",             1, 0, 'u' },
278                        { "config-file",                1, 0, 'U' },
279                        { NULL,            0, 0, 0   },
280                };
281
282                int c=getopt_long(argc, argv, "f:Hu:U:",
283                                long_options, &option_index);
284
285                if (c==-1)
286                        break;
287
288                switch (c) {
289                        case 'f':
290                                ++filter_count;
291                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
292                                filters[filter_count-1].expr=strdup(optarg);
293                                filters[filter_count-1].filter=trace_create_filter(optarg);
294                                filters[filter_count-1].count=0;
295                                filters[filter_count-1].bytes=0;
296                                break;
297                        case 'H':
298                                trace_help();
299                                exit(1);
300                                break;
301                        case 'u':
302                                  parse_user_config(&uc, optarg);
303                                  break;
304                        case 'U':;
305                                FILE * f = fopen(optarg, "r");
306                                if (f != NULL) {
307                                        parse_user_config_file(&uc, f);
308                                } else {
309                                        perror("Failed to open configuration file\n");
310                                        usage(argv[0]);
311                                }
312                                break;
313                        default:
314                                fprintf(stderr,"Unknown option: %c\n",c);
315                                usage(argv[0]);
316                                return 1;
317                }
318        }
319
320        sigact.sa_handler = cleanup_signal;
321        sigemptyset(&sigact.sa_mask);
322        sigact.sa_flags = SA_RESTART;
323
324        sigaction(SIGINT, &sigact, NULL);
325        sigaction(SIGTERM, &sigact, NULL);
326       
327        for(i=optind;i<argc;++i) {
328                run_trace(argv[i]);
329        }
330        if (optind+1<argc) {
331                printf("Grand total:\n");
332                printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",totcount,totbytes);
333        }
334       
335        return 0;
336}
Note: See TracBrowser for help on using the repository browser.