source: tools/tracertstats/tracertstats_parallel.c @ d6a56b6

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since d6a56b6 was d6a56b6, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Move the data structures out of the way and into there own folder and tidy file naming.

  • Property mode set to 100644
File size: 17.6 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007 The University of Waikato, Hamilton, New Zealand.
5 * Authors: Daniel Lawson
6 *          Perry Lorier
7 *         
8 * All rights reserved.
9 *
10 * This code has been developed by the University of Waikato WAND
11 * research group. For further information please see http://www.wand.net.nz/
12 *
13 * libtrace is free software; you can redistribute it and/or modify
14 * it under the terms of the GNU General Public License as published by
15 * the Free Software Foundation; either version 2 of the License, or
16 * (at your option) any later version.
17 *
18 * libtrace is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with libtrace; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 *
27 * $Id$
28 *
29 */
30
31/* This program takes a series of traces and bpf filters and outputs how many
32 * bytes/packets every time interval
33 */
34
35#include <stdio.h>
36#include <stdlib.h>
37#include <assert.h>
38#include <string.h>
39#include <sys/time.h>
40#include <sys/types.h>
41#include <time.h>
42
43#include <netinet/in.h>
44#include <netinet/in_systm.h>
45#include <netinet/tcp.h>
46#include <netinet/ip.h>
47#include <netinet/ip_icmp.h>
48#include <arpa/inet.h>
49#include <sys/socket.h>
50#include <getopt.h>
51#include <inttypes.h>
52#include <lt_inttypes.h>
53
54#include "libtrace.h"
55#include "output.h"
56#include "rt_protocol.h"
57#include "dagformat.h"
58
59#include "data-struct/vector.h"
60#include "data-struct/message_queue.h"
61
62#ifndef UINT32_MAX
63        #define UINT32_MAX      0xffffffffU
64#endif
65
66#define DEFAULT_OUTPUT_FMT "txt"
67#define TRACE_TIME 1
68
69struct libtrace_t *trace;
70char *output_format=NULL;
71
72int merge_inputs = 0;
73
74struct filter_t {
75        char *expr;
76        struct libtrace_filter_t *filter;
77        uint64_t count;
78        uint64_t bytes;
79} *filters = NULL;
80int filter_count=0;
81uint64_t totcount;
82uint64_t totbytes;
83
84uint64_t packet_count=UINT64_MAX;
85double packet_interval=UINT32_MAX;
86
87
88struct output_data_t *output = NULL;
89
90static void report_results(double ts,uint64_t count,uint64_t bytes)
91{
92        int i=0;
93        output_set_data_time(output,0,ts);
94        output_set_data_int(output,1,count);
95        output_set_data_int(output,2,bytes);
96        for(i=0;i<filter_count;++i) {
97                output_set_data_int(output,i*2+3,filters[i].count);
98                output_set_data_int(output,i*2+4,filters[i].bytes);
99                filters[i].count=filters[i].bytes=0;
100        }
101        output_flush_row(output);
102}
103
104static void create_output(char *title) {
105        int i;
106       
107        output=output_init(title,output_format?output_format:DEFAULT_OUTPUT_FMT);
108        if (!output) {
109                fprintf(stderr,"Failed to create output file\n");
110                return;
111        }
112        output_add_column(output,"ts");
113        output_add_column(output,"packets");
114        output_add_column(output,"bytes");
115        for(i=0;i<filter_count;++i) {
116                char buff[1024];
117                snprintf(buff,sizeof(buff),"%s packets",filters[i].expr);
118                output_add_column(output,buff);
119                snprintf(buff,sizeof(buff),"%s bytes",filters[i].expr);
120                output_add_column(output,buff);
121        }
122        output_flush_headings(output);
123
124}
125
126uint64_t count;
127uint64_t bytes;
128
129typedef struct statistic {
130        uint64_t count;
131        uint64_t bytes;
132} statistic_t;
133
134typedef struct result {
135        struct statistic total;
136        struct statistic filters[0];
137} result_t;
138
139
140static int reduce(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
141{       
142        int i,j;
143        //uint64_t count=0, bytes=0;
144        static uint64_t ts = 0;
145        libtrace_vector_t results;
146        libtrace_vector_init(&results, sizeof(libtrace_result_t));
147        trace_get_results(trace, &results);
148        //uint64_t packets;
149       
150        /* Get the results from each core and sum 'em up */
151        for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
152                libtrace_result_t result;
153               
154                assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
155                ts = libtrace_result_get_key(&result);
156                if (*last_ts == 0)
157                        *last_ts = ts;
158               
159                result_t * res = libtrace_result_get_value(&result);
160                static result_t *  last_res = NULL;
161                assert(res != last_res);
162                last_res = res;
163                //printf("Mapper published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
164                while (*last_ts < ts) {
165                        report_results((double) *last_ts * (double) packet_interval, count, bytes);
166                        count = 0;
167                        bytes = 0;
168                        for (j = 0; j < filter_count; j++)
169                                filters[j].count = filters[j].bytes = 0;
170                        (*last_ts)++;
171                }
172               
173                count += res->total.count;
174                bytes += res->total.bytes;
175                for (j = 0; j < filter_count; j++) {
176                        filters[j].count += res->filters[j].count;
177                        filters[j].bytes += res->filters[j].bytes;
178                }
179                free(res);
180        }
181        // Done with these results - Free internally and externally
182        libtrace_vector_destroy(&results);
183       
184        return 0;
185}
186
187typedef struct timestamp_sync {
188        int64_t difference_usecs;
189        uint64_t first_interval_number;
190} timestamp_sync_t;
191
192
193static int reduce_tracetime(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
194{
195        int i,j;
196        //uint64_t count=0, bytes=0;
197        static uint64_t ts = 0;
198        libtrace_vector_t results;
199        libtrace_vector_init(&results, sizeof(libtrace_result_t));
200        trace_get_results_check_temp(trace, &results, *last_ts);
201        //trace_get_results(trace, &results);
202        //uint64_t packets;
203       
204        /* Get the results from each core and sum 'em up */
205        for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
206                libtrace_result_t result;
207               
208                assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
209                ts = libtrace_result_get_key(&result);
210                if (*last_ts == 0)
211                        *last_ts = ts;
212               
213                result_t * res = libtrace_result_get_value(&result);
214                static result_t *  last_res = NULL;
215                if (res == last_res) {
216                        printf("Hmm could be asserting but I'm not ;)\n");
217                }
218                //assert(res != last_res);
219                last_res = res;
220                //printf("Mapper published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
221                /*while (*last_ts < ts) {
222                        report_results((double) *last_ts * (double) packet_interval, count, bytes);
223                        count = 0;
224                        bytes = 0;
225                        for (j = 0; j < filter_count; j++)
226                                filters[j].count = filters[j].bytes = 0;
227                        (*last_ts)++;
228                }*/
229               
230                count += res->total.count;
231                bytes += res->total.bytes;
232                for (j = 0; j < filter_count; j++) {
233                        filters[j].count += res->filters[j].count;
234                        filters[j].bytes += res->filters[j].bytes;
235                }
236                free(res);
237        }
238        report_results((double) *last_ts * (double) packet_interval, count, bytes);
239        count = 0;
240        bytes = 0;
241        for (j = 0; j < filter_count; j++)
242                filters[j].count = filters[j].bytes = 0;
243        (*last_ts)++;
244       
245        // Done with these results - Free internally and externally
246        libtrace_vector_destroy(&results);
247       
248        return 0;
249}
250
251static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, 
252                                                libtrace_message_t *mesg,
253                                                libtrace_thread_t *t)
254{
255        int i;
256        static __thread uint64_t last_ts = 0, ts = 0;
257        static __thread result_t * results = NULL;
258       
259        // Unsure when we would hit this case but the old code had it, I
260        // guess we should keep it
261        if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) == NULL) {
262               
263                ts = trace_get_seconds(pkt) / packet_interval;
264                if (last_ts == 0)
265                        last_ts = ts;
266               
267                while (packet_interval != UINT64_MAX && last_ts<ts) {
268                        // Publish and make a new one new
269                        trace_publish_result(trace, (uint64_t) last_ts, results);
270                        trace_post_reduce(trace);
271                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
272                        last_ts++;
273                }
274               
275                for(i=0;i<filter_count;++i) {
276                        if(trace_apply_filter(filters[i].filter, pkt)) {
277                                results->filters[i].count++;
278                                results->filters[i].bytes+=trace_get_wire_length(pkt);
279                        }
280                }
281               
282                results->total.count++;
283                results->total.bytes +=trace_get_wire_length(pkt);
284                /*if (count >= packet_count) {
285                        report_results(ts,count,bytes);
286                        count=0;
287                        bytes=0;
288                }*/ // TODO what was happening here doesn't match up with any of the documentations!!!
289        }
290       
291        if (mesg) {
292                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
293                switch (mesg->code) {
294                        case MESSAGE_STARTED:
295                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
296                                break;
297                        case MESSAGE_STOPPED:
298                                if (results->total.count) {
299                                        trace_publish_result(trace, (uint64_t) last_ts, results);
300                                        trace_post_reduce(trace);
301                                }
302                }
303        }
304        return pkt;
305}
306void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key);
307/**
308 * A trace time version of map which will attempt to keep upto date
309 * with the incoming data and detect cases where results are missing and
310 * recover correctly.
311 */
312static void* per_packet_tracetime(libtrace_t *trace, libtrace_packet_t *pkt, 
313                                                libtrace_message_t *mesg,
314                                                libtrace_thread_t *t)
315{
316        // Using first entry as total and those after for filter counts
317        int i;
318        static __thread uint64_t last_ts = 0, ts = 0;
319        static __thread double debug_last = 0.0;
320        static __thread result_t * tmp_result = NULL;
321       
322        if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
323                ts = trace_get_seconds(pkt) / packet_interval;
324               
325                if (debug_last != 0.0 && debug_last > trace_get_seconds(pkt))
326                        printf("packets out of order bitch :(\n");
327                debug_last = trace_get_seconds(pkt);
328                if (last_ts == 0)
329                        last_ts = ts;
330               
331                /*
332                while (packet_interval != UINT64_MAX && last_ts<ts) {
333                        // Publish and make new
334                        trace_publish_result(trace, (uint64_t) last_ts, results);
335                        results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count);
336                        memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count);
337                        last_ts++;
338                }*/
339               
340                /* Calculate count for filters */
341                for(i=0;i<filter_count;++i) {
342                        if(trace_apply_filter(filters[i].filter, pkt)) {
343                                tmp_result->filters[i].count = 1;
344                                tmp_result->filters[i].bytes = trace_get_wire_length(pkt);
345                        } else {
346                                tmp_result->filters[i].count = 0;
347                                tmp_result->filters[i].bytes = 0;
348                        }
349                }
350               
351                /* Now Update the currently stored result */
352                result_t * results = (result_t *) trace_retrive_inprogress_result(trace, ts);
353               
354                if (!results) {
355                        results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count);
356                        memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count);
357                }
358                assert(results);
359                /* Now add to the current results */
360                results->total.count++;
361                results->total.bytes +=trace_get_wire_length(pkt);
362                /* Now add on filters */
363                for(i=0;i<filter_count;++i) {
364                        results->filters[i].count += tmp_result->filters[i].count;
365                        results->filters[i].bytes += tmp_result->filters[i].bytes;
366                }
367                /* Now release the lock and send it away place that back into the buffer */
368                trace_update_inprogress_result(trace, ts, (void *) results);
369                /*if (count >= packet_count) {
370                        report_results(ts,count,bytes);
371                        count=0;
372                        bytes=0;
373                }*/ // Hmm what was happening here doesn't match up with any of the documentations!!!
374        }
375        if (mesg) {
376                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
377                switch (mesg->code) {
378                        case MESSAGE_STARTED:
379                                tmp_result = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
380                                break;
381                        case MESSAGE_STOPPED:
382                                trace_retrive_inprogress_result(trace, 0);
383                                trace_update_inprogress_result(trace, 1, NULL);
384                }
385        }
386        // Done push the final results
387        /*if (results->total.count)
388                trace_publish_result(trace, (uint64_t) last_ts, results);*/
389       
390        return pkt;
391}
392
393/* Process a trace, counting packets that match filter(s) */
394static void run_trace(char *uri)
395{
396        int j;
397        uint64_t last_ts = 0;
398
399        if (!merge_inputs) 
400                create_output(uri);
401
402        if (output == NULL)
403                return;
404
405        trace = trace_create(uri);
406        if (trace_is_err(trace)) {
407                trace_perror(trace,"trace_create");
408                trace_destroy(trace);
409                if (!merge_inputs)
410                        output_destroy(output);
411                return;
412        }
413        /*
414        if (trace_start(trace)==-1) {
415                trace_perror(trace,"trace_start");
416                trace_destroy(trace);
417                if (!merge_inputs)
418                        output_destroy(output);
419                return;
420        }*/
421        int i = 1;
422        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
423        trace_parallel_config(trace, TRACE_OPTION_TRACETIME, &i);
424#if TRACE_TIME
425        if (trace_pstart(trace, NULL, &per_packet_tracetime, NULL)==-1) {
426#else
427        if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
428#endif
429                trace_perror(trace,"Failed to start trace");
430                trace_destroy(trace);
431                if (!merge_inputs)
432                        output_destroy(output);
433                return;
434        }
435
436#if TRACE_TIME
437        // First we wait for a message telling us that a timestamp has been
438        // published this allows us to approximately synchronize with the time
439        libtrace_message_t message;
440        int64_t offset;
441        libtrace_packet_t *packet;
442        struct timeval *tv, tv_real;
443       
444       
445        do {
446                // TODO Put a timeout here also
447                libtrace_thread_get_message(trace, &message);
448        } while (retrive_first_packet(trace, &packet, &tv) == 0);
449        tv_real = trace_get_timeval(packet);
450        offset = tv_to_usec(&tv_real) - tv_to_usec(tv);
451        last_ts = trace_get_seconds(packet) / packet_interval;
452        printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts);
453        /*
454        while (!got_first) {
455                // Wait for a message indicating we've got our 'first' packet, note not a 100% guarantee its our first but pretty likely
456               
457               
458               
459                assert(pthread_mutex_lock(&lock_more) == 0);
460               
461                for (i=0; i < 2; ++i) {
462                        if (initial_stamps[i].difference_usecs) { // Hmm certainly this cannot possibly lineup 100%??
463                                got_first=1;
464                                last_ts = initial_stamps[i].first_interval_number;
465                                offset = initial_stamps[i].difference_usecs;
466                                printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts);
467                        }
468                }
469                assert(pthread_mutex_unlock(&lock_more) == 0);
470        }*/
471        while (!trace_finished(trace)) {
472                struct timeval tv;
473                // Now try our best to read that one out
474               
475                // Read messages
476                //libtrace_thread_get_message(trace, &message);
477               
478                // We just release and do work currently, maybe if something
479                // interesting comes through we'd deal with that
480                //libtrace_thread_get_message(trace, &message);
481               
482                //while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
483               
484                /* Now wait for a second after we should see the results */
485                uint64_t next_update_time, t_usec;
486                next_update_time = (last_ts*packet_interval + packet_interval + 1) * 1000000 + offset;
487                gettimeofday(&tv, NULL);
488                t_usec = tv.tv_sec;
489                t_usec *= 1000000;
490                t_usec += tv.tv_usec;
491               
492                //printf("Current time=%"PRIu64" Next result ready=%"PRIu64" =%f\n", t_usec, next_update_time, ((double) next_update_time - (double) t_usec) / 1000000.0);
493                if (next_update_time > t_usec) {
494                        tv.tv_sec = (next_update_time - t_usec) / 1000000;
495                        tv.tv_usec = (next_update_time - t_usec) % 1000000;
496                        select(0, NULL, NULL, NULL, &tv);
497                }
498                reduce_tracetime(trace, NULL, &last_ts);
499        }
500#else
501        // reduce
502        while (!trace_finished(trace)) {
503                // Read messages
504                libtrace_message_t message;
505               
506                // We just release and do work currently, maybe if something
507                // interesting comes through we'd deal with that
508                libtrace_thread_get_message(trace, &message);
509               
510                while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
511                reduce(trace, NULL, &last_ts);
512        }
513#endif
514
515        // Wait for all threads to stop
516        trace_join(trace);
517       
518        reduce(trace, NULL, &last_ts);
519        // Flush the last one out
520        report_results((double) last_ts * (double) packet_interval, count, bytes);
521        //count = 0;
522        //bytes = 0;
523        for (j = 0; j < filter_count; j++)
524                filters[j].count = filters[j].bytes = 0;
525        (last_ts)++;
526       
527        if (trace_is_err(trace))
528                trace_perror(trace,"%s",uri);
529
530        trace_destroy(trace);
531
532        if (!merge_inputs)
533                output_destroy(output);
534       
535}
536// TODO Decide what to do with -c option
537static void usage(char *argv0)
538{
539        fprintf(stderr,"Usage:\n"
540        "%s flags libtraceuri [libtraceuri...]\n"
541        "-i --interval=seconds  Duration of reporting interval in seconds\n"
542        "-c --count=packets     Exit after count packets received\n"
543        "-o --output-format=txt|csv|html|png Reporting output format\n"
544        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
545        "-m --merge-inputs      Do not create separate outputs for each input trace\n"
546        "-H --libtrace-help     Print libtrace runtime documentation\n"
547        ,argv0);
548}
549
550int main(int argc, char *argv[]) {
551
552        int i;
553       
554        while(1) {
555                int option_index;
556                struct option long_options[] = {
557                        { "filter",             1, 0, 'f' },
558                        { "interval",           1, 0, 'i' },
559                        { "count",              1, 0, 'c' },
560                        { "output-format",      1, 0, 'o' },
561                        { "libtrace-help",      0, 0, 'H' },
562                        { "merge-inputs",       0, 0, 'm' },
563                        { NULL,                 0, 0, 0   },
564                };
565
566                int c=getopt_long(argc, argv, "c:f:i:o:Hm",
567                                long_options, &option_index);
568
569                if (c==-1)
570                        break;
571
572                switch (c) {
573                        case 'f':
574                                ++filter_count;
575                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
576                                filters[filter_count-1].expr=strdup(optarg);
577                                filters[filter_count-1].filter=trace_create_filter(optarg);
578                                filters[filter_count-1].count=0;
579                                filters[filter_count-1].bytes=0;
580                                break;
581                        case 'i':
582                                packet_interval=atof(optarg);
583                                break;
584                        case 'c':
585                                packet_count=atoi(optarg);
586                                break;
587                        case 'o':
588                                if (output_format) free(output_format);
589                                output_format=strdup(optarg);
590                                break;
591                        case 'm':
592                                merge_inputs = 1;
593                                break;
594                        case 'H':
595                                  trace_help(); 
596                                  exit(1); 
597                                  break;       
598                        default:
599                                fprintf(stderr,"Unknown option: %c\n",c);
600                                usage(argv[0]);
601                                return 1;
602                }
603        }
604
605        if (packet_count == UINT64_MAX && packet_interval == UINT32_MAX) {
606                packet_interval = 300; /* every 5 minutes */
607        }
608
609        if (optind >= argc)
610                return 0;
611
612        if (output_format)
613                fprintf(stderr,"output format: '%s'\n",output_format);
614        else
615                fprintf(stderr,"output format: '%s'\n", DEFAULT_OUTPUT_FMT);
616       
617       
618        if (merge_inputs) {
619                /* If we're merging the inputs, we only want to create all
620                 * the column headers etc. once rather than doing them once
621                 * per trace */
622
623                /* This is going to "name" the output based on the first
624                 * provided URI - admittedly not ideal */
625                create_output(argv[optind]);
626                if (output == NULL)
627                        return 0;
628        }
629               
630        for(i=optind;i<argc;++i) {
631                run_trace(argv[i]);
632        }
633
634        if (merge_inputs) {
635                /* Clean up after ourselves */
636                output_destroy(output);
637        }
638
639
640        return 0;
641}
Note: See TracBrowser for help on using the repository browser.