source: tools/tracertstats/tracertstats_parallel.c @ 29bbef0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 29bbef0 was 29bbef0, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

My work from over summer, with a few things tidied up and updated to include recent commits/patches to bring this up to date. Still very much work in progress.

  • Property mode set to 100644
File size: 17.5 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31/* This program takes a series of traces and bpf filters and outputs how many
32 * bytes/packets every time interval
33 */
34
35#include <stdio.h>
36#include <stdlib.h>
37#include <assert.h>
38#include <string.h>
39#include <sys/time.h>
40#include <sys/types.h>
41#include <time.h>
42
43#include <netinet/in.h>
44#include <netinet/in_systm.h>
45#include <netinet/tcp.h>
46#include <netinet/ip.h>
47#include <netinet/ip_icmp.h>
48#include <arpa/inet.h>
49#include <sys/socket.h>
50#include <getopt.h>
51#include <inttypes.h>
52#include <lt_inttypes.h>
53
54#include "libtrace.h"
55#include "output.h"
56#include "rt_protocol.h"
57#include "dagformat.h"
58
59#include "trace_vector.h"
60
61#ifndef UINT32_MAX
62        #define UINT32_MAX      0xffffffffU
63#endif
64
65#define DEFAULT_OUTPUT_FMT "txt"
66#define TRACE_TIME 1
67
68struct libtrace_t *trace;
69char *output_format=NULL;
70
71int merge_inputs = 0;
72
73struct filter_t {
74        char *expr;
75        struct libtrace_filter_t *filter;
76        uint64_t count;
77        uint64_t bytes;
78} *filters = NULL;
79int filter_count=0;
80uint64_t totcount;
81uint64_t totbytes;
82
83uint64_t packet_count=UINT64_MAX;
84double packet_interval=UINT32_MAX;
85
86
87struct output_data_t *output = NULL;
88
89static void report_results(double ts,uint64_t count,uint64_t bytes)
90{
91        int i=0;
92        output_set_data_time(output,0,ts);
93        output_set_data_int(output,1,count);
94        output_set_data_int(output,2,bytes);
95        for(i=0;i<filter_count;++i) {
96                output_set_data_int(output,i*2+3,filters[i].count);
97                output_set_data_int(output,i*2+4,filters[i].bytes);
98                filters[i].count=filters[i].bytes=0;
99        }
100        output_flush_row(output);
101}
102
103static void create_output(char *title) {
104        int i;
105       
106        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
107        if (!output) {
108                fprintf(stderr,"Failed to create output file\n");
109                return;
110        }
111        output_add_column(output,"ts");
112        output_add_column(output,"packets");
113        output_add_column(output,"bytes");
114        for(i=0;i<filter_count;++i) {
115                char buff[1024];
116                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
117                output_add_column(output,buff);
118                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
119                output_add_column(output,buff);
120        }
121        output_flush_headings(output);
122
123}
124
125uint64_t count;
126uint64_t bytes;
127
128typedef struct statistic {
129        uint64_t count;
130        uint64_t bytes;
131} statistic_t;
132
133typedef struct result {
134        struct statistic total;
135        struct statistic filters[0];
136} result_t;
137
138
139static int reduce(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
140{       
141        int i,j;
142        //uint64_t count=0, bytes=0;
143        static uint64_t ts = 0;
144        libtrace_vector_t results;
145        libtrace_vector_init(&results, sizeof(libtrace_result_t));
146        trace_get_results(trace, &results);
147        //uint64_t packets;
148       
149        /* Get the results from each core and sum 'em up */
150        for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
151                libtrace_result_t result;
152               
153                assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
154                ts = libtrace_result_get_key(&result);
155                if (*last_ts == 0)
156                        *last_ts = ts;
157               
158                result_t * res = libtrace_result_get_value(&result);
159                static result_t *  last_res = NULL;
160                assert(res != last_res);
161                last_res = res;
162                //printf("Mapper published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
163                while (*last_ts < ts) {
164                        report_results((double) *last_ts * (double) packet_interval, count, bytes);
165                        count = 0;
166                        bytes = 0;
167                        for (j = 0; j < filter_count; j++)
168                                filters[j].count = filters[j].bytes = 0;
169                        (*last_ts)++;
170                }
171               
172                count += res->total.count;
173                bytes += res->total.bytes;
174                for (j = 0; j < filter_count; j++) {
175                        filters[j].count += res->filters[j].count;
176                        filters[j].bytes += res->filters[j].bytes;
177                }
178                free(res);
179        }
180        // Done with these results - Free internally and externally
181        libtrace_vector_destroy(&results);
182       
183        return 0;
184}
185
186typedef struct timestamp_sync {
187        int64_t difference_usecs;
188        uint64_t first_interval_number;
189} timestamp_sync_t;
190
191
192static int reduce_tracetime(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
193{
194        int i,j;
195        //uint64_t count=0, bytes=0;
196        static uint64_t ts = 0;
197        libtrace_vector_t results;
198        libtrace_vector_init(&results, sizeof(libtrace_result_t));
199        trace_get_results_check_temp(trace, &results, *last_ts);
200        //trace_get_results(trace, &results);
201        //uint64_t packets;
202       
203        /* Get the results from each core and sum 'em up */
204        for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
205                libtrace_result_t result;
206               
207                assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
208                ts = libtrace_result_get_key(&result);
209                if (*last_ts == 0)
210                        *last_ts = ts;
211               
212                result_t * res = libtrace_result_get_value(&result);
213                static result_t *  last_res = NULL;
214                if (res == last_res) {
215                        printf("Hmm could be asserting but I'm not ;)\n");
216                }
217                //assert(res != last_res);
218                last_res = res;
219                //printf("Mapper published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
220                /*while (*last_ts < ts) {
221                        report_results((double) *last_ts * (double) packet_interval, count, bytes);
222                        count = 0;
223                        bytes = 0;
224                        for (j = 0; j < filter_count; j++)
225                                filters[j].count = filters[j].bytes = 0;
226                        (*last_ts)++;
227                }*/
228               
229                count += res->total.count;
230                bytes += res->total.bytes;
231                for (j = 0; j < filter_count; j++) {
232                        filters[j].count += res->filters[j].count;
233                        filters[j].bytes += res->filters[j].bytes;
234                }
235                free(res);
236        }
237        report_results((double) *last_ts * (double) packet_interval, count, bytes);
238        count = 0;
239        bytes = 0;
240        for (j = 0; j < filter_count; j++)
241                filters[j].count = filters[j].bytes = 0;
242        (*last_ts)++;
243       
244        // Done with these results - Free internally and externally
245        libtrace_vector_destroy(&results);
246       
247        return 0;
248}
249
250static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, 
251                                                libtrace_message_t *mesg,
252                                                libtrace_thread_t *t)
253{
254        int i;
255        static __thread uint64_t last_ts = 0, ts = 0;
256        static __thread result_t * results = NULL;
257       
258        // Unsure when we would hit this case but the old code had it, I
259        // guess we should keep it
260        if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) == NULL) {
261               
262                ts = trace_get_seconds(pkt) / packet_interval;
263                if (last_ts == 0)
264                        last_ts = ts;
265               
266                while (packet_interval != UINT64_MAX && last_ts<ts) {
267                        // Publish and make a new one new
268                        trace_publish_result(trace, (uint64_t) last_ts, results);
269                        trace_post_reduce(trace);
270                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
271                        last_ts++;
272                }
273               
274                for(i=0;i<filter_count;++i) {
275                        if(trace_apply_filter(filters[i].filter, pkt)) {
276                                results->filters[i].count++;
277                                results->filters[i].bytes+=trace_get_wire_length(pkt);
278                        }
279                }
280               
281                results->total.count++;
282                results->total.bytes +=trace_get_wire_length(pkt);
283                /*if (count >= packet_count) {
284                        report_results(ts,count,bytes);
285                        count=0;
286                        bytes=0;
287                }*/ // TODO what was happening here doesn't match up with any of the documentations!!!
288        }
289       
290        if (mesg) {
291                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
292                switch (mesg->code) {
293                        case MESSAGE_STARTED:
294                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
295                                break;
296                        case MESSAGE_STOPPED:
297                                if (results->total.count) {
298                                        trace_publish_result(trace, (uint64_t) last_ts, results);
299                                        trace_post_reduce(trace);
300                                }
301                }
302        }
303        return pkt;
304}
305void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key);
306/**
307 * A trace time version of map which will attempt to keep upto date
308 * with the incoming data and detect cases where results are missing and
309 * recover correctly.
310 */
311static void* per_packet_tracetime(libtrace_t *trace, libtrace_packet_t *pkt, 
312                                                libtrace_message_t *mesg,
313                                                libtrace_thread_t *t)
314{
315        // Using first entry as total and those after for filter counts
316        int i;
317        static __thread uint64_t last_ts = 0, ts = 0;
318        static __thread double debug_last = 0.0;
319        static __thread result_t * tmp_result = NULL;
320       
321        if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
322                ts = trace_get_seconds(pkt) / packet_interval;
323               
324                if (debug_last != 0.0 && debug_last > trace_get_seconds(pkt))
325                        printf("packets out of order bitch :(\n");
326                debug_last = trace_get_seconds(pkt);
327                if (last_ts == 0)
328                        last_ts = ts;
329               
330                /*
331                while (packet_interval != UINT64_MAX && last_ts<ts) {
332                        // Publish and make new
333                        trace_publish_result(trace, (uint64_t) last_ts, results);
334                        results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count);
335                        memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count);
336                        last_ts++;
337                }*/
338               
339                /* Calculate count for filters */
340                for(i=0;i<filter_count;++i) {
341                        if(trace_apply_filter(filters[i].filter, pkt)) {
342                                tmp_result->filters[i].count = 1;
343                                tmp_result->filters[i].bytes = trace_get_wire_length(pkt);
344                        } else {
345                                tmp_result->filters[i].count = 0;
346                                tmp_result->filters[i].bytes = 0;
347                        }
348                }
349               
350                /* Now Update the currently stored result */
351                result_t * results = (result_t *) trace_retrive_inprogress_result(trace, ts);
352               
353                if (!results) {
354                        results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count);
355                        memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count);
356                }
357                assert(results);
358                /* Now add to the current results */
359                results->total.count++;
360                results->total.bytes +=trace_get_wire_length(pkt);
361                /* Now add on filters */
362                for(i=0;i<filter_count;++i) {
363                        results->filters[i].count += tmp_result->filters[i].count;
364                        results->filters[i].bytes += tmp_result->filters[i].bytes;
365                }
366                /* Now release the lock and send it away place that back into the buffer */
367                trace_update_inprogress_result(trace, ts, (void *) results);
368                /*if (count >= packet_count) {
369                        report_results(ts,count,bytes);
370                        count=0;
371                        bytes=0;
372                }*/ // Hmm what was happening here doesn't match up with any of the documentations!!!
373        }
374        if (mesg) {
375                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
376                switch (mesg->code) {
377                        case MESSAGE_STARTED:
378                                tmp_result = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
379                                break;
380                        case MESSAGE_STOPPED:
381                                trace_retrive_inprogress_result(trace, 0);
382                                trace_update_inprogress_result(trace, 1, NULL);
383                }
384        }
385        // Done push the final results
386        /*if (results->total.count)
387                trace_publish_result(trace, (uint64_t) last_ts, results);*/
388       
389        return pkt;
390}
391
392/* Process a trace, counting packets that match filter(s) */
393static void run_trace(char *uri)
394{
395        int j;
396        uint64_t last_ts = 0;
397
398        if (!merge_inputs) 
399                create_output(uri);
400
401        if (output == NULL)
402                return;
403
404        trace = trace_create(uri);
405        if (trace_is_err(trace)) {
406                trace_perror(trace,"trace_create");
407                trace_destroy(trace);
408                if (!merge_inputs)
409                        output_destroy(output);
410                return;
411        }
412        /*
413        if (trace_start(trace)==-1) {
414                trace_perror(trace,"trace_start");
415                trace_destroy(trace);
416                if (!merge_inputs)
417                        output_destroy(output);
418                return;
419        }*/
420        int i = 1;
421        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
422        trace_parallel_config(trace, TRACE_OPTION_TRACETIME, &i);
423#if TRACE_TIME
424        if (trace_pstart(trace, NULL, &per_packet_tracetime, NULL)==-1) {
425#else
426        if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
427#endif
428                trace_perror(trace,"Failed to start trace");
429                trace_destroy(trace);
430                if (!merge_inputs)
431                        output_destroy(output);
432                return;
433        }
434
435#if TRACE_TIME
436        // First we wait for a message telling us that a timestamp has been
437        // published this allows us to approximately synchronize with the time
438        libtrace_message_t message;
439        int64_t offset;
440        libtrace_packet_t *packet;
441        struct timeval *tv, tv_real;
442       
443       
444        do {
445                // TODO Put a timeout here also
446                libtrace_thread_get_message(trace, &message);
447        } while (retrive_first_packet(trace, &packet, &tv) == 0);
448        tv_real = trace_get_timeval(packet);
449        offset = tv_to_usec(&tv_real) - tv_to_usec(tv);
450        last_ts = trace_get_seconds(packet) / packet_interval;
451        printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts);
452        /*
453        while (!got_first) {
454                // Wait for a message indicating we've got our 'first' packet, note not a 100% guarantee its our first but pretty likely
455               
456               
457               
458                assert(pthread_mutex_lock(&lock_more) == 0);
459               
460                for (i=0; i < 2; ++i) {
461                        if (initial_stamps[i].difference_usecs) { // Hmm certainly this cannot possibly lineup 100%??
462                                got_first=1;
463                                last_ts = initial_stamps[i].first_interval_number;
464                                offset = initial_stamps[i].difference_usecs;
465                                printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts);
466                        }
467                }
468                assert(pthread_mutex_unlock(&lock_more) == 0);
469        }*/
470        while (!trace_finished(trace)) {
471                struct timeval tv;
472                // Now try our best to read that one out
473               
474                // Read messages
475                //libtrace_thread_get_message(trace, &message);
476               
477                // We just release and do work currently, maybe if something
478                // interesting comes through we'd deal with that
479                //libtrace_thread_get_message(trace, &message);
480               
481                //while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
482               
483                /* Now wait for a second after we should see the results */
484                uint64_t next_update_time, t_usec;
485                next_update_time = (last_ts*packet_interval + packet_interval + 1) * 1000000 + offset;
486                gettimeofday(&tv, NULL);
487                t_usec = tv.tv_sec;
488                t_usec *= 1000000;
489                t_usec += tv.tv_usec;
490               
491                //printf("Current time=%"PRIu64" Next result ready=%"PRIu64" =%f\n", t_usec, next_update_time, ((double) next_update_time - (double) t_usec) / 1000000.0);
492                if (next_update_time > t_usec) {
493                        tv.tv_sec = (next_update_time - t_usec) / 1000000;
494                        tv.tv_usec = (next_update_time - t_usec) % 1000000;
495                        select(0, NULL, NULL, NULL, &tv);
496                }
497                reduce_tracetime(trace, NULL, &last_ts);
498        }
499#else
500        // reduce
501        while (!trace_finished(trace)) {
502                // Read messages
503                libtrace_message_t message;
504               
505                // We just release and do work currently, maybe if something
506                // interesting comes through we'd deal with that
507                libtrace_thread_get_message(trace, &message);
508               
509                while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
510                reduce(trace, NULL, &last_ts);
511        }
512#endif
513
514        // Wait for all threads to stop
515        trace_join(trace);
516       
517        reduce(trace, NULL, &last_ts);
518        // Flush the last one out
519        report_results((double) last_ts * (double) packet_interval, count, bytes);
520        //count = 0;
521        //bytes = 0;
522        for (j = 0; j < filter_count; j++)
523                filters[j].count = filters[j].bytes = 0;
524        (last_ts)++;
525       
526        if (trace_is_err(trace))
527                trace_perror(trace,"%s",uri);
528
529        trace_destroy(trace);
530
531        if (!merge_inputs)
532                output_destroy(output);
533       
534}
535// TODO Decide what to do with -c option
536static void usage(char *argv0)
537{
538        fprintf(stderr,"Usage:\n"
539        "%s flags libtraceuri [libtraceuri...]\n"
540        "-i --interval=seconds  Duration of reporting interval in seconds\n"
541        "-c --count=packets     Exit after count packets received\n"
542        "-o --output-format=txt|csv|html|png Reporting output format\n"
543        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
544        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
545        "-H --libtrace-help     Print libtrace runtime documentation\n"
546        ,argv0);
547}
548
549int main(int argc, char *argv[]) {
550
551        int i;
552       
553        while(1) {
554                int option_index;
555                struct option long_options[] = {
556                        { "filter",             1, 0, 'f' },
557                        { "interval",           1, 0, 'i' },
558                        { "count",              1, 0, 'c' },
559                        { "output-format",      1, 0, 'o' },
560                        { "libtrace-help",      0, 0, 'H' },
561                        { "merge-inputs",       0, 0, 'm' },
562                        { NULL,                 0, 0, 0   },
563                };
564
565                int c=getopt_long(argc, argv, "c:f:i:o:Hm",
566                                long_options, &option_index);
567
568                if (c==-1)
569                        break;
570
571                switch (c) {
572                        case 'f':
573                                ++filter_count;
574                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
575                                filters[filter_count-1].expr=strdup(optarg);
576                                filters[filter_count-1].filter=trace_create_filter(optarg);
577                                filters[filter_count-1].count=0;
578                                filters[filter_count-1].bytes=0;
579                                break;
580                        case 'i':
581                                packet_interval=atof(optarg);
582                                break;
583                        case 'c':
584                                packet_count=atoi(optarg);
585                                break;
586                        case 'o':
587                                if (output_format) free(output_format);
588                                output_format=strdup(optarg);
589                                break;
590                        case 'm':
591                                merge_inputs = 1;
592                                break;
593                        case 'H':
594                                  trace_help(); 
595                                  exit(1); 
596                                  break;       
597                        default:
598                                fprintf(stderr,"Unknown option: %c\n",c);
599                                usage(argv[0]);
600                                return 1;
601                }
602        }
603
604        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
605                packet_interval = 300; /* every 5 minutes */
606        }
607
608        if (optind >= argc)
609                return 0;
610
611        if (output_format)
612                fprintf(stderr,"output format: '%s'\n",output_format);
613        else
614                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
615       
616       
617        if (merge_inputs) {
618                /* If we're merging the inputs, we only want to create all
619                 * the column headers etc. once rather than doing them once
620                 * per trace */
621
622                /* This is going to "name" the output based on the first
623                 * provided URI - admittedly not ideal */
624                create_output(argv[optind]);
625                if (output == NULL)
626                        return 0;
627        }
628               
629        for(i=optind;i<argc;++i) {
630                run_trace(argv[i]);
631        }
632
633        if (merge_inputs) {
634                /* Clean up after ourselves */
635                output_destroy(output);
636        }
637
638
639        return 0;
640}
Note: See TracBrowser for help on using the repository browser.