Ignore:
Timestamp:
09/10/15 16:39:46 (6 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
9346e4a
Parents:
9e48735
Message:

Rename parallel tracertstats to "tracertstats"

The old tracertstats is now called tracertstats_single and isn't built
or distributed anymore.

File:
1 moved

Legend:

Unmodified
Added
Removed
  • tools/tracertstats/tracertstats_single.c

    r9e48735 rc3cb9f9  
    5252#include <lt_inttypes.h>
    5353
    54 #include "libtrace_parallel.h"
     54#include "libtrace.h"
    5555#include "output.h"
    5656#include "rt_protocol.h"
     
    6767
    6868int merge_inputs = 0;
    69 int threadcount = 4;
    7069
    7170struct filter_t {
     
    121120}
    122121
    123 uint64_t count;
    124 uint64_t bytes;
    125 
    126 typedef struct statistic {
    127         uint64_t count;
    128         uint64_t bytes;
    129 } statistic_t;
    130 
    131 typedef struct result {
    132         struct statistic total;
    133         struct statistic filters[0];
    134 } result_t;
    135 
    136 static uint64_t glob_last_ts = 0;
    137 static void process_result(libtrace_t *trace UNUSED, int mesg,
    138                            libtrace_generic_t data,
    139                            libtrace_thread_t *sender UNUSED) {
    140         static uint64_t ts = 0;
    141         int j;
    142         result_t *res;
    143 
    144         switch (mesg) {
    145                 case MESSAGE_RESULT:
    146                 ts = data.res->key;
    147                 res = data.res->value.ptr;
    148                 if (glob_last_ts == 0)
    149                         glob_last_ts = ts;
    150                 while ((glob_last_ts >> 32) < (ts >> 32)) {
    151                         report_results(glob_last_ts >> 32, count, bytes);
    152                         count = 0;
    153                         bytes = 0;
    154                         for (j = 0; j < filter_count; j++)
    155                                 filters[j].count = filters[j].bytes = 0;
    156                         glob_last_ts = ts;
    157                 }
    158                 count += res->total.count;
    159                 bytes += res->total.bytes;
    160                 for (j = 0; j < filter_count; j++) {
    161                         filters[j].count += res->filters[j].count;
    162                         filters[j].bytes += res->filters[j].bytes;
    163                 }
    164                 free(res);
    165         }
    166 }
    167 
    168 typedef struct timestamp_sync {
    169         int64_t difference_usecs;
    170         uint64_t first_interval_number;
    171 } timestamp_sync_t;
    172 
    173 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    174                         int mesg, libtrace_generic_t data,
    175                         libtrace_thread_t *sender UNUSED)
     122/* Process a trace, counting packets that match filter(s) */
     123static void run_trace(char *uri)
    176124{
     125        struct libtrace_packet_t *packet = trace_create_packet();
    177126        int i;
    178         static __thread result_t * results = NULL;
    179         uint64_t key;
    180         static __thread uint64_t last_key = 0;
    181 
    182         switch(mesg) {
    183         case MESSAGE_PACKET:
    184                 key = trace_get_erf_timestamp(data.pkt);
    185                 if ((key >> 32) > (last_key >> 32) + packet_interval) {
    186                         libtrace_generic_t tmp = {.ptr = results};
    187                         trace_publish_result(trace, t, key,
    188                                         tmp, RESULT_USER);
    189                         trace_post_reporter(trace);
    190                         last_key = key;
    191                         results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    192 
    193                 }
    194 
    195                 for(i=0;i<filter_count;++i) {
    196                         if(trace_apply_filter(filters[i].filter, data.pkt)) {
    197                                 results->filters[i].count++;
    198                                 results->filters[i].bytes+=trace_get_wire_length(data.pkt);
    199                         }
    200                 }
    201 
    202                 results->total.count++;
    203                 results->total.bytes +=trace_get_wire_length(data.pkt);
    204                 return data.pkt;
    205 
    206         case MESSAGE_STARTING:
    207                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    208                 break;
    209 
    210         case MESSAGE_STOPPING:
    211                 // Should we always post this?
    212                 if (results->total.count) {
    213                         libtrace_generic_t tmp = {.ptr = results};
    214                         trace_publish_result(trace, t, last_key, tmp, RESULT_USER);
    215                         trace_post_reporter(trace);
    216                         free(results);
    217                         results = NULL;
    218                 }
    219                 break;
    220 
    221         case MESSAGE_TICK_INTERVAL:
    222         case MESSAGE_TICK_COUNT:
    223                 {
    224                         if (data.uint64 > last_key) {
    225                                 libtrace_generic_t tmp = {.ptr = results};
    226                                 trace_publish_result(trace, t, data.uint64,
    227                                                 tmp, RESULT_USER);
    228                                 trace_post_reporter(trace);
    229                                 last_key = data.uint64;
    230                                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    231                         }
    232                         break;
    233                 }
    234         }
    235         return NULL;
    236 }
    237 
    238 /* Process a trace, counting packets that match filter(s) */
    239 static void run_trace(char *uri)
    240 {
     127        uint64_t count = 0;
     128        uint64_t bytes = 0;
     129        double last_ts = 0;
     130        double ts = 0;
     131
    241132        if (!merge_inputs)
    242133                create_output(uri);
     
    245136                return;
    246137
    247         trace = trace_create(uri);
     138        trace = trace_create(uri);
    248139        if (trace_is_err(trace)) {
    249140                trace_perror(trace,"trace_create");
     
    251142                if (!merge_inputs)
    252143                        output_destroy(output);
    253                 return;
    254         }
    255         /*
     144                return;
     145        }
    256146        if (trace_start(trace)==-1) {
    257147                trace_perror(trace,"trace_start");
     
    260150                        output_destroy(output);
    261151                return;
    262         }*/
    263         trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    264         trace_set_tracetime(trace, true);
    265         trace_set_perpkt_threads(trace, threadcount);
    266 
    267         //trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL);
    268 
    269         if (trace_get_information(trace)->live) {
    270                 trace_set_tick_interval(trace, (int) (packet_interval * 1000));
    271         }
    272 
    273         if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
    274                 trace_perror(trace,"Failed to start trace");
    275                 trace_destroy(trace);
    276                 if (!merge_inputs)
    277                         output_destroy(output);
    278                 return;
    279         }
    280 
    281 
    282         // Wait for all threads to stop
    283         trace_join(trace);
    284        
    285         // Flush the last one out
    286         report_results((glob_last_ts >> 32), count, bytes);
     152        }
     153
     154        for (;;) {
     155                int psize;
     156                if ((psize = trace_read_packet(trace, packet)) <1) {
     157                        break;
     158                }
     159               
     160                if (trace_get_packet_buffer(packet,NULL,NULL) == NULL) {
     161                        continue;
     162                }
     163               
     164                ts = trace_get_seconds(packet);
     165
     166                if (last_ts == 0)
     167                        last_ts = ts;
     168
     169                while (packet_interval != UINT64_MAX && last_ts<ts) {
     170                        report_results(last_ts,count,bytes);
     171                        count=0;
     172                        bytes=0;
     173                        last_ts+=packet_interval;
     174                }
     175                for(i=0;i<filter_count;++i) {
     176                        if(trace_apply_filter(filters[i].filter,packet)) {
     177                                ++filters[i].count;
     178                                filters[i].bytes+=trace_get_wire_length(packet);
     179                        }
     180                }
     181
     182                ++count;
     183                bytes+=trace_get_wire_length(packet);
     184
     185
     186                if (count >= packet_count) {
     187                        report_results(ts,count,bytes);
     188                        count=0;
     189                        bytes=0;
     190                }
     191        }
     192        report_results(ts,count,bytes);
     193
    287194        if (trace_is_err(trace))
    288195                trace_perror(trace,"%s",uri);
     
    292199        if (!merge_inputs)
    293200                output_destroy(output);
    294        
    295 }
    296 // TODO Decide what to do with -c option
     201
     202        trace_destroy_packet(packet);
     203}
     204
    297205static void usage(char *argv0)
    298206{
     
    301209        "-i --interval=seconds  Duration of reporting interval in seconds\n"
    302210        "-c --count=packets     Exit after count packets received\n"
    303         "-t --threads=max       Create 'max' processing threads (default: 4)\n"
    304211        "-o --output-format=txt|csv|html|png Reporting output format\n"
    305212        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
     
    322229                        { "libtrace-help",      0, 0, 'H' },
    323230                        { "merge-inputs",       0, 0, 'm' },
    324                         { "threads",            1, 0, 't' },
    325231                        { NULL,                 0, 0, 0   },
    326232                };
    327233
    328                 int c=getopt_long(argc, argv, "c:f:i:o:t:Hm",
     234                int c=getopt_long(argc, argv, "c:f:i:o:Hm",
    329235                                long_options, &option_index);
    330236
     
    341247                                filters[filter_count-1].bytes=0;
    342248                                break;
    343                         case 't':
    344                                 threadcount = atoi(optarg);
    345                                 if (threadcount <= 0)
    346                                         threadcount = 1;
    347                                 break;
    348249                        case 'i':
    349250                                packet_interval=atof(optarg);
     
    383284       
    384285       
     286
    385287        if (merge_inputs) {
    386288                /* If we're merging the inputs, we only want to create all
     
    393295                if (output == NULL)
    394296                        return 0;
     297
    395298        }
    396299               
     
    405308
    406309
    407         return 0;
    408 }
     310        return 0;
     311}
Note: See TracChangeset for help on using the changeset viewer.