Changeset 322c516


Ignore:
Timestamp:
09/11/15 17:15:22 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
b3dca4d
Parents:
f625817
Message:

Port tracestats to new parallel API

Old single-threaded tracestats has been moved to tracestats_single.c, which
won't be built or distributed.

Location:
tools/tracestats
Files:
1 added
1 deleted
1 edited

Legend:

Unmodified
Added
Removed
  • tools/tracestats/tracestats.c

    r6b98325 r322c516  
    5353#include <signal.h>
    5454
    55 #include "libtrace.h"
     55#include "libtrace_parallel.h"
    5656#include "lt_inttypes.h"
    57 
    58 struct libtrace_t *trace;
    59 
    60 static volatile int done=0;
    61 
    62 static void cleanup_signal(int signal)
     57#include <pthread.h>
     58
     59struct libtrace_t *trace = NULL;
     60
     61static void cleanup_signal(int signal UNUSED)
    6362{
    64         (void)signal;
    65         done=1;
    66         trace_interrupt();
     63        if (trace)
     64                trace_pstop(trace);
    6765}
    6866
     
    7068        char *expr;
    7169        struct libtrace_filter_t *filter;
     70} *filters = NULL;
     71
     72int filter_count=0;
     73
     74
     75typedef struct statistics {
    7276        uint64_t count;
    7377        uint64_t bytes;
    74 } *filters = NULL;
    75 int filter_count=0;
    76 uint64_t totcount;
    77 uint64_t totbytes;
     78} statistics_t;
     79
     80volatile uint64_t totcount = 0;
     81volatile uint64_t totbytes = 0;
     82
     83
     84static void fn_result(libtrace_t *trace UNUSED,
     85                libtrace_thread_t *sender UNUSED,
     86                void *global UNUSED, void *tls,
     87                libtrace_result_t *result) {
     88        statistics_t *counters = (statistics_t *)tls;
     89        int i;
     90
     91        assert(result->key == 0);
     92        statistics_t * res = result->value.ptr;
     93        counters[0].count += res[0].count;
     94        counters[0].bytes += res[0].bytes;
     95
     96        for (i = 0; i < filter_count; i++) {
     97                counters[i+1].count += res[i+1].count;
     98                counters[i+1].bytes += res[i+1].bytes;
     99        }
     100        free(res);
     101}
     102
     103static void fn_print_results(libtrace_t *trace,
     104                libtrace_thread_t *sender UNUSED, void *global UNUSED,
     105                void *tls) {
     106
     107        statistics_t *counters = (statistics_t *)tls;
     108        libtrace_stat_t *stats = NULL;
     109        int i;
     110
     111        stats = trace_get_statistics(trace, NULL);
     112        printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
     113        for(i=0;i<filter_count;++i) {
     114                printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,counters[i+1].count,counters[i+1].bytes,counters[i+1].count*100.0/counters[0].count);
     115        }
     116        if (stats->received_valid)
     117                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     118                                "Input packets", stats->received);
     119        if (stats->filtered_valid)
     120                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     121                                "Filtered packets", stats->filtered);
     122        if (stats->dropped_valid)
     123                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     124                                "Dropped packets",stats->dropped);
     125        if (stats->accepted_valid)
     126                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
     127                                "Accepted packets", stats->accepted);
     128        if (stats->errors_valid)
     129                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
     130                                "Erred packets", stats->errors);
     131        printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",counters[0].count,counters[0].bytes);
     132        totcount+=counters[0].count;
     133        totbytes+=counters[0].bytes;
     134
     135}
     136
     137
     138static void* fn_starting(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, void *global UNUSED) {
     139        /* Allocate space to hold a total count and one for each filter */
     140        return calloc(1, sizeof(statistics_t) * (filter_count + 1));
     141}
     142
     143
     144static void fn_stopping(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     145                        void *global UNUSED, void*tls) {
     146        statistics_t *results = (statistics_t *)tls;
     147        libtrace_generic_t gen;
     148        /* We only output one result per thread with the key 0 when the
     149         * trace is over. */
     150        gen.ptr = results;
     151        trace_publish_result(trace, t, 0, gen, RESULT_USER);
     152}
     153
     154static libtrace_packet_t* fn_packet(libtrace_t *trace,
     155                libtrace_thread_t *t UNUSED,
     156                void *global UNUSED, void*tls, libtrace_packet_t *pkt) {
     157        statistics_t *results = (statistics_t *)tls;
     158        int i, wlen;
     159
     160        /* Apply filters to every packet note the result */
     161        wlen = trace_get_wire_length(pkt);
     162        for(i=0;i<filter_count;++i) {
     163                if (filters[i].filter == NULL)
     164                        continue;
     165                if(trace_apply_filter(filters[i].filter,pkt) > 0) {
     166                        results[i+1].count++;
     167                        results[i+1].bytes+=wlen;
     168                }
     169                if (trace_is_err(trace)) {
     170                        trace_perror(trace, "trace_apply_filter");
     171                        fprintf(stderr, "Removing filter from filterlist\n");
     172                        /* This is a race, but will be atomic */
     173                        filters[i].filter = NULL;
     174                }
     175        }
     176        results[0].count++;
     177        results[0].bytes +=wlen;
     178        return pkt;
     179}
    78180
    79181/* Process a trace, counting packets that match filter(s) */
    80 static void run_trace(char *uri)
     182static void run_trace(char *uri, int threadcount)
    81183{
    82         struct libtrace_packet_t *packet = trace_create_packet();
    83         int i;
    84         uint64_t count = 0;
    85         uint64_t bytes = 0;
    86         libtrace_stat_t *stats;
    87184
    88185        fprintf(stderr,"%s:\n",uri);
    89 
    90         trace = trace_create(uri);
     186        libtrace_callback_set_t *pktcbs, *rescbs;
     187
     188        trace = trace_create(uri);
    91189
    92190        if (trace_is_err(trace)) {
     
    95193        }
    96194
    97         if (trace_start(trace)==-1) {
     195        pktcbs = trace_create_callback_set();
     196        rescbs = trace_create_callback_set();
     197
     198        trace_set_packet_cb(pktcbs, fn_packet);
     199        trace_set_starting_cb(pktcbs, fn_starting);
     200        trace_set_stopping_cb(pktcbs, fn_stopping);
     201        trace_set_starting_cb(rescbs, fn_starting);
     202        trace_set_result_cb(rescbs, fn_result);
     203        trace_set_stopping_cb(rescbs, fn_print_results);
     204
     205        if (threadcount != 0)
     206                trace_set_perpkt_threads(trace, threadcount);
     207
     208        /* Start the trace as a parallel trace */
     209        if (trace_pstart(trace, NULL, pktcbs, rescbs)==-1) {
    98210                trace_perror(trace,"Failed to start trace");
    99211                return;
    100212        }
    101213
    102 
    103         for (;;) {
    104                 int psize;
    105                 int wlen;
    106                 if ((psize = trace_read_packet(trace, packet)) <1) {
    107                         break;
    108                 }
    109                
    110                 if (done)
    111                         break;
    112                 wlen = trace_get_wire_length(packet);
    113 
    114                 for(i=0;i<filter_count;++i) {
    115                         if (filters[i].filter == NULL)
    116                                 continue;
    117                         if(trace_apply_filter(filters[i].filter,packet) > 0) {
    118                                 ++filters[i].count;
    119                                 filters[i].bytes+=wlen;
    120                         }
    121                         if (trace_is_err(trace)) {
    122                                 trace_perror(trace, "trace_apply_filter");
    123                                 fprintf(stderr, "Removing filter from filterlist\n");
    124                                 filters[i].filter = NULL;
    125                         }
    126                 }
    127 
    128                 ++count;
    129                 bytes+=wlen;
    130         }
    131         trace_destroy_packet(packet);
    132 
    133         stats = trace_get_statistics(trace, NULL);
    134 
    135         printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
    136         for(i=0;i<filter_count;++i) {
    137                 printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
    138                 filters[i].bytes=0;
    139                 filters[i].count=0;
    140         }
    141         if (stats->received_valid)
    142                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    143                                 "Input packets", stats->received);
    144         if (stats->filtered_valid)
    145                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    146                                 "Filtered packets", stats->filtered);
    147         if (stats->dropped_valid)
    148                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    149                                 "Dropped packets",stats->dropped);
    150         if (stats->accepted_valid)
    151                 fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
    152                                 "Accepted packets", stats->accepted);
    153         if (stats->errors_valid)
    154                 fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
    155                                 "Erred packets", stats->errors);
    156         printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
    157         totcount+=count;
    158         totbytes+=bytes;
     214        /* Wait for all threads to stop */
     215        trace_join(trace);
    159216
    160217        if (trace_is_err(trace))
    161                 trace_perror(trace,"Processing trace");
    162 
    163         trace_destroy(trace);
     218                trace_perror(trace,"%s",uri);
     219
     220        trace_destroy(trace);
     221        trace_destroy_callback_set(pktcbs);
     222        trace_destroy_callback_set(rescbs);
    164223}
    165224
    166225static void usage(char *argv0)
    167226{
    168         fprintf(stderr,"Usage: %s [-H|--libtrace-help] [--filter|-f bpf ]... libtraceuri...\n",argv0);
     227        fprintf(stderr,"Usage: %s [-h|--help] [--threads|-t threads] [--filter|-f bpf ]... libtraceuri...\n",argv0);
    169228}
    170229
     
    173232        int i;
    174233        struct sigaction sigact;
     234        int threadcount = 4;
    175235
    176236        while(1) {
     
    178238                struct option long_options[] = {
    179239                        { "filter",        1, 0, 'f' },
    180                         { "libtrace-help", 0, 0, 'H' },
     240                        { "help", 0, 0, 'h' },
     241                        { "threads",            1, 0, 't' },
    181242                        { NULL,            0, 0, 0   },
    182243                };
    183244
    184                 int c=getopt_long(argc, argv, "f:H",
     245                int c=getopt_long(argc, argv, "f:ht:",
    185246                                long_options, &option_index);
    186247
     
    189250
    190251                switch (c) {
    191                         case 'f': 
     252                        case 'f':
    192253                                ++filter_count;
    193254                                filters=realloc(filters,filter_count*sizeof(struct filter_t));
    194255                                filters[filter_count-1].expr=strdup(optarg);
    195256                                filters[filter_count-1].filter=trace_create_filter(optarg);
    196                                 filters[filter_count-1].count=0;
    197                                 filters[filter_count-1].bytes=0;
    198257                                break;
    199                         case 'H':
    200                                 trace_help();
    201                                 exit(1);
    202                                 break;
    203                         default:
     258                        case 'h':
     259                                usage(argv[0]);
     260                                return 1;
     261                        case 't':
     262                                threadcount = atoi(optarg);
     263                                if (threadcount <= 0)
     264                                        threadcount = 1;
     265                                break;
     266                        default:
    204267                                fprintf(stderr,"Unknown option: %c\n",c);
    205268                                usage(argv[0]);
     
    216279
    217280        for(i=optind;i<argc;++i) {
    218                 run_trace(argv[i]);
     281                run_trace(argv[i], threadcount);
    219282        }
    220283        if (optind+1<argc) {
     
    222285                printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",totcount,totbytes);
    223286        }
    224 
    225 
    226         return 0;
    227 }
     287       
     288        return 0;
     289}
Note: See TracChangeset for help on using the changeset viewer.