source: tools/tracertstats/tracertstats_parallel.c @ 2498008

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 2498008 was 2498008, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

  • Property mode set to 100644
File size: 11.1 KB
RevLine 
[29bbef0]1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31/* This program takes a series of traces and bpf filters and outputs how many
32 * bytes/packets every time interval
33 */
34
35#include <stdio.h>
36#include <stdlib.h>
37#include <assert.h>
38#include <string.h>
39#include <sys/time.h>
40#include <sys/types.h>
41#include <time.h>
42
43#include <netinet/in.h>
44#include <netinet/in_systm.h>
45#include <netinet/tcp.h>
46#include <netinet/ip.h>
47#include <netinet/ip_icmp.h>
48#include <arpa/inet.h>
49#include <sys/socket.h>
50#include <getopt.h>
51#include <inttypes.h>
52#include <lt_inttypes.h>
53
54#include "libtrace.h"
55#include "output.h"
56#include "rt_protocol.h"
57#include "dagformat.h"
58
[d6a56b6]59#include "data-struct/vector.h"
60#include "data-struct/message_queue.h"
[29bbef0]61
62#ifndef UINT32_MAX
63        #define UINT32_MAX      0xffffffffU
64#endif
65
66#define DEFAULT_OUTPUT_FMT "txt"
67
68struct libtrace_t *trace;
69char *output_format=NULL;
70
71int merge_inputs = 0;
72
73struct filter_t {
74        char *expr;
75        struct libtrace_filter_t *filter;
76        uint64_t count;
77        uint64_t bytes;
78} *filters = NULL;
79int filter_count=0;
80uint64_t totcount;
81uint64_t totbytes;
82
83uint64_t packet_count=UINT64_MAX;
84double packet_interval=UINT32_MAX;
85
86
87struct output_data_t *output = NULL;
88
89static void report_results(double ts,uint64_t count,uint64_t bytes)
90{
91        int i=0;
92        output_set_data_time(output,0,ts);
93        output_set_data_int(output,1,count);
94        output_set_data_int(output,2,bytes);
95        for(i=0;i<filter_count;++i) {
96                output_set_data_int(output,i*2+3,filters[i].count);
97                output_set_data_int(output,i*2+4,filters[i].bytes);
98                filters[i].count=filters[i].bytes=0;
99        }
100        output_flush_row(output);
101}
102
103static void create_output(char *title) {
104        int i;
105       
106        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
107        if (!output) {
108                fprintf(stderr,"Failed to create output file\n");
109                return;
110        }
111        output_add_column(output,"ts");
112        output_add_column(output,"packets");
113        output_add_column(output,"bytes");
114        for(i=0;i<filter_count;++i) {
115                char buff[1024];
116                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
117                output_add_column(output,buff);
118                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
119                output_add_column(output,buff);
120        }
121        output_flush_headings(output);
122
123}
124
125uint64_t count;
126uint64_t bytes;
127
128typedef struct statistic {
129        uint64_t count;
130        uint64_t bytes;
131} statistic_t;
132
133typedef struct result {
134        struct statistic total;
135        struct statistic filters[0];
136} result_t;
137
[2498008]138static uint64_t last_ts = 0;
139static void process_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg UNUSED)  {
[29bbef0]140        static uint64_t ts = 0;
[2498008]141
142        if (result) {
143                int j;
144                result_t *res;
145                ts = libtrace_result_get_key(result);
146                res = libtrace_result_get_value(result);
147                if (last_ts == 0)
148                        last_ts = ts;
149                while (last_ts < ts) {
150                        report_results((double) last_ts * (double) packet_interval, count, bytes);
[29bbef0]151                        count = 0;
152                        bytes = 0;
153                        for (j = 0; j < filter_count; j++)
154                                filters[j].count = filters[j].bytes = 0;
[2498008]155                        last_ts++;
[29bbef0]156                }
157                count += res->total.count;
158                bytes += res->total.bytes;
159                for (j = 0; j < filter_count; j++) {
160                        filters[j].count += res->filters[j].count;
161                        filters[j].bytes += res->filters[j].bytes;
162                }
163                free(res);
164        }
165}
166
167typedef struct timestamp_sync {
168        int64_t difference_usecs;
169        uint64_t first_interval_number;
170} timestamp_sync_t;
171
172static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, 
173                                                libtrace_message_t *mesg,
174                                                libtrace_thread_t *t)
175{
176        int i;
177        static __thread uint64_t last_ts = 0, ts = 0;
178        static __thread result_t * results = NULL;
179       
180        // Unsure when we would hit this case but the old code had it, I
181        // guess we should keep it
[82facc5]182        if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
183                //fprintf(stderr, "Got packet t=%x\n", t);
[29bbef0]184                ts = trace_get_seconds(pkt) / packet_interval;
185                if (last_ts == 0)
186                        last_ts = ts;
[82facc5]187
[29bbef0]188                while (packet_interval != UINT64_MAX && last_ts<ts) {
189                        // Publish and make a new one new
[b13b939]190                        //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
[f051c1b]191                        trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
192                        trace_post_reporter(trace);
[29bbef0]193                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
194                        last_ts++;
195                }
196               
197                for(i=0;i<filter_count;++i) {
198                        if(trace_apply_filter(filters[i].filter, pkt)) {
199                                results->filters[i].count++;
200                                results->filters[i].bytes+=trace_get_wire_length(pkt);
201                        }
202                }
203               
204                results->total.count++;
205                results->total.bytes +=trace_get_wire_length(pkt);
206                /*if (count >= packet_count) {
207                        report_results(ts,count,bytes);
208                        count=0;
209                        bytes=0;
210                }*/ // TODO what was happening here doesn't match up with any of the documentations!!!
211        }
212       
213        if (mesg) {
214                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
215                switch (mesg->code) {
[f051c1b]216                        case MESSAGE_STARTING:
[29bbef0]217                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
218                                break;
[f051c1b]219                        case MESSAGE_STOPPING:
[82facc5]220                                // Should we always post this?
[29bbef0]221                                if (results->total.count) {
[f051c1b]222                                        trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
223                                        trace_post_reporter(trace);
[82facc5]224                                        results = NULL;
[29bbef0]225                                }
226                                break;
[82facc5]227                        case MESSAGE_TICK:
228                        {
229                                int64_t offset;
230                                struct timeval *tv, tv_real;
231                                libtrace_packet_t *first_packet = NULL;
232                                retrive_first_packet(trace, &first_packet, &tv);
233                                if (first_packet != NULL) {
234                                        // So figure out our running offset
235                                        tv_real = trace_get_timeval(first_packet);
236                                        offset = tv_to_usec(tv) - tv_to_usec(&tv_real);
237                                        // Get time of day and do this stuff
238                                        uint64_t next_update_time;
239                                        next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset;
240                                        if (next_update_time <= mesg->additional.uint64) {
[b13b939]241                                                //fprintf(stderr, "Got a tick and publishing early!!\n");
[f051c1b]242                                                trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
243                                                trace_post_reporter(trace);
[82facc5]244                                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
245                                                last_ts++;
246                                        } else {
[b13b939]247                                                //fprintf(stderr, "Got a tick but no publish ...\n");
[82facc5]248                                        }
249                                } else {
[b13b939]250                                        //fprintf(stderr, "Got a tick but no packets seen yet!!!\n");
[82facc5]251                                }
252                        }
[29bbef0]253                }
254        }
255        return pkt;
256}
257
[2498008]258static uint64_t bad_hash(const libtrace_packet_t * pkt UNUSED, void *data UNUSED) {
[82facc5]259        return 0;
260}
261
[29bbef0]262/* Process a trace, counting packets that match filter(s) */
263static void run_trace(char *uri)
264{
265        int j;
266
267        if (!merge_inputs) 
268                create_output(uri);
269
270        if (output == NULL)
271                return;
272
273        trace = trace_create(uri);
274        if (trace_is_err(trace)) {
275                trace_perror(trace,"trace_create");
276                trace_destroy(trace);
277                if (!merge_inputs)
278                        output_destroy(output);
279                return;
280        }
281        /*
282        if (trace_start(trace)==-1) {
283                trace_perror(trace,"trace_start");
284                trace_destroy(trace);
285                if (!merge_inputs)
286                        output_destroy(output);
287                return;
288        }*/
289        int i = 1;
290        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
[b13b939]291        /* trace_parallel_config(trace, TRACE_OPTION_TRACETIME, &i); */
292        //trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL);
293
294        if (trace_get_information(trace)->live) {
295                i = (int) (packet_interval * 1000); // Every interval send a tick
296                trace_parallel_config(trace, TRACE_OPTION_TICK_INTERVAL, &i);
297        }
298
[2498008]299        if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
[29bbef0]300                trace_perror(trace,"Failed to start trace");
301                trace_destroy(trace);
302                if (!merge_inputs)
303                        output_destroy(output);
304                return;
305        }
306
[82facc5]307
[29bbef0]308        // Wait for all threads to stop
309        trace_join(trace);
310       
311        // Flush the last one out
312        report_results((double) last_ts * (double) packet_interval, count, bytes);
313        //count = 0;
314        //bytes = 0;
315        for (j = 0; j < filter_count; j++)
316                filters[j].count = filters[j].bytes = 0;
317        (last_ts)++;
318       
319        if (trace_is_err(trace))
320                trace_perror(trace,"%s",uri);
321
322        trace_destroy(trace);
323
324        if (!merge_inputs)
325                output_destroy(output);
326       
327}
328// TODO Decide what to do with -c option
329static void usage(char *argv0)
330{
331        fprintf(stderr,"Usage:\n"
332        "%s flags libtraceuri [libtraceuri...]\n"
333        "-i --interval=seconds  Duration of reporting interval in seconds\n"
334        "-c --count=packets     Exit after count packets received\n"
335        "-o --output-format=txt|csv|html|png Reporting output format\n"
336        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
337        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
338        "-H --libtrace-help     Print libtrace runtime documentation\n"
339        ,argv0);
340}
341
342int main(int argc, char *argv[]) {
343
344        int i;
345       
346        while(1) {
347                int option_index;
348                struct option long_options[] = {
349                        { "filter",             1, 0, 'f' },
350                        { "interval",           1, 0, 'i' },
351                        { "count",              1, 0, 'c' },
352                        { "output-format",      1, 0, 'o' },
353                        { "libtrace-help",      0, 0, 'H' },
354                        { "merge-inputs",       0, 0, 'm' },
355                        { NULL,                 0, 0, 0   },
356                };
357
358                int c=getopt_long(argc, argv, "c:f:i:o:Hm",
359                                long_options, &option_index);
360
361                if (c==-1)
362                        break;
363
364                switch (c) {
365                        case 'f':
366                                ++filter_count;
367                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
368                                filters[filter_count-1].expr=strdup(optarg);
369                                filters[filter_count-1].filter=trace_create_filter(optarg);
370                                filters[filter_count-1].count=0;
371                                filters[filter_count-1].bytes=0;
372                                break;
373                        case 'i':
374                                packet_interval=atof(optarg);
375                                break;
376                        case 'c':
377                                packet_count=atoi(optarg);
378                                break;
379                        case 'o':
380                                if (output_format) free(output_format);
381                                output_format=strdup(optarg);
382                                break;
383                        case 'm':
384                                merge_inputs = 1;
385                                break;
386                        case 'H':
387                                  trace_help(); 
388                                  exit(1); 
389                                  break;       
390                        default:
391                                fprintf(stderr,"Unknown option: %c\n",c);
392                                usage(argv[0]);
393                                return 1;
394                }
395        }
396
397        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
398                packet_interval = 300; /* every 5 minutes */
399        }
400
401        if (optind >= argc)
402                return 0;
403
404        if (output_format)
405                fprintf(stderr,"output format: '%s'\n",output_format);
406        else
407                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
408       
409       
410        if (merge_inputs) {
411                /* If we're merging the inputs, we only want to create all
412                 * the column headers etc. once rather than doing them once
413                 * per trace */
414
415                /* This is going to "name" the output based on the first
416                 * provided URI - admittedly not ideal */
417                create_output(argv[optind]);
418                if (output == NULL)
419                        return 0;
420        }
421               
422        for(i=optind;i<argc;++i) {
423                run_trace(argv[i]);
424        }
425
426        if (merge_inputs) {
427                /* Clean up after ourselves */
428                output_destroy(output);
429        }
430
431
[b13b939]432        return 0;
[29bbef0]433}
Note: See TracBrowser for help on using the repository browser.