Ignore:
Timestamp:
09/10/15 16:39:46 (6 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
9346e4a
Parents:
9e48735
Message:

Rename parallel tracertstats to "tracertstats"

The old tracertstats is now called tracertstats_single and isn't built
or distributed anymore.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tools/tracertstats/tracertstats.c

    rd0cae69 rc3cb9f9  
    5252#include <lt_inttypes.h>
    5353
    54 #include "libtrace.h"
     54#include "libtrace_parallel.h"
    5555#include "output.h"
    5656#include "rt_protocol.h"
     
    6767
    6868int merge_inputs = 0;
     69int threadcount = 4;
    6970
    7071struct filter_t {
     
    120121}
    121122
     123uint64_t count;
     124uint64_t bytes;
     125
     126typedef struct statistic {
     127        uint64_t count;
     128        uint64_t bytes;
     129} statistic_t;
     130
     131typedef struct result {
     132        struct statistic total;
     133        struct statistic filters[0];
     134} result_t;
     135
     136static uint64_t glob_last_ts = 0;
     137static void process_result(libtrace_t *trace UNUSED, int mesg,
     138                           libtrace_generic_t data,
     139                           libtrace_thread_t *sender UNUSED) {
     140        static uint64_t ts = 0;
     141        int j;
     142        result_t *res;
     143
     144        switch (mesg) {
     145                case MESSAGE_RESULT:
     146                ts = data.res->key;
     147                res = data.res->value.ptr;
     148                if (glob_last_ts == 0)
     149                        glob_last_ts = ts;
     150                while ((glob_last_ts >> 32) < (ts >> 32)) {
     151                        report_results(glob_last_ts >> 32, count, bytes);
     152                        count = 0;
     153                        bytes = 0;
     154                        for (j = 0; j < filter_count; j++)
     155                                filters[j].count = filters[j].bytes = 0;
     156                        glob_last_ts = ts;
     157                }
     158                count += res->total.count;
     159                bytes += res->total.bytes;
     160                for (j = 0; j < filter_count; j++) {
     161                        filters[j].count += res->filters[j].count;
     162                        filters[j].bytes += res->filters[j].bytes;
     163                }
     164                free(res);
     165        }
     166}
     167
     168typedef struct timestamp_sync {
     169        int64_t difference_usecs;
     170        uint64_t first_interval_number;
     171} timestamp_sync_t;
     172
     173static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
     174                        int mesg, libtrace_generic_t data,
     175                        libtrace_thread_t *sender UNUSED)
     176{
     177        int i;
     178        static __thread result_t * results = NULL;
     179        uint64_t key;
     180        static __thread uint64_t last_key = 0;
     181
     182        switch(mesg) {
     183        case MESSAGE_PACKET:
     184                key = trace_get_erf_timestamp(data.pkt);
     185                if ((key >> 32) > (last_key >> 32) + packet_interval) {
     186                        libtrace_generic_t tmp = {.ptr = results};
     187                        trace_publish_result(trace, t, key,
     188                                        tmp, RESULT_USER);
     189                        trace_post_reporter(trace);
     190                        last_key = key;
     191                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     192
     193                }
     194
     195                for(i=0;i<filter_count;++i) {
     196                        if(trace_apply_filter(filters[i].filter, data.pkt)) {
     197                                results->filters[i].count++;
     198                                results->filters[i].bytes+=trace_get_wire_length(data.pkt);
     199                        }
     200                }
     201
     202                results->total.count++;
     203                results->total.bytes +=trace_get_wire_length(data.pkt);
     204                return data.pkt;
     205
     206        case MESSAGE_STARTING:
     207                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     208                break;
     209
     210        case MESSAGE_STOPPING:
     211                // Should we always post this?
     212                if (results->total.count) {
     213                        libtrace_generic_t tmp = {.ptr = results};
     214                        trace_publish_result(trace, t, last_key, tmp, RESULT_USER);
     215                        trace_post_reporter(trace);
     216                        free(results);
     217                        results = NULL;
     218                }
     219                break;
     220
     221        case MESSAGE_TICK_INTERVAL:
     222        case MESSAGE_TICK_COUNT:
     223                {
     224                        if (data.uint64 > last_key) {
     225                                libtrace_generic_t tmp = {.ptr = results};
     226                                trace_publish_result(trace, t, data.uint64,
     227                                                tmp, RESULT_USER);
     228                                trace_post_reporter(trace);
     229                                last_key = data.uint64;
     230                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     231                        }
     232                        break;
     233                }
     234        }
     235        return NULL;
     236}
     237
    122238/* Process a trace, counting packets that match filter(s) */
    123 static void run_trace(char *uri) 
     239static void run_trace(char *uri)
    124240{
    125         struct libtrace_packet_t *packet = trace_create_packet();
    126         int i;
    127         uint64_t count = 0;
    128         uint64_t bytes = 0;
    129         double last_ts = 0;
    130         double ts = 0;
    131 
    132241        if (!merge_inputs)
    133242                create_output(uri);
     
    136245                return;
    137246
    138         trace = trace_create(uri);
     247        trace = trace_create(uri);
    139248        if (trace_is_err(trace)) {
    140249                trace_perror(trace,"trace_create");
     
    142251                if (!merge_inputs)
    143252                        output_destroy(output);
    144                 return;
    145         }
     253                return;
     254        }
     255        /*
    146256        if (trace_start(trace)==-1) {
    147257                trace_perror(trace,"trace_start");
     
    150260                        output_destroy(output);
    151261                return;
    152         }
    153 
    154         for (;;) {
    155                 int psize;
    156                 if ((psize = trace_read_packet(trace, packet)) <1) {
    157                         break;
    158                 }
    159                
    160                 if (trace_get_packet_buffer(packet,NULL,NULL) == NULL) {
    161                         continue;
    162                 }
    163                
    164                 ts = trace_get_seconds(packet);
    165 
    166                 if (last_ts == 0)
    167                         last_ts = ts;
    168 
    169                 while (packet_interval != UINT64_MAX && last_ts<ts) {
    170                         report_results(last_ts,count,bytes);
    171                         count=0;
    172                         bytes=0;
    173                         last_ts+=packet_interval;
    174                 }
    175                 for(i=0;i<filter_count;++i) {
    176                         if(trace_apply_filter(filters[i].filter,packet)) {
    177                                 ++filters[i].count;
    178                                 filters[i].bytes+=trace_get_wire_length(packet);
    179                         }
    180                 }
    181 
    182                 ++count;
    183                 bytes+=trace_get_wire_length(packet);
    184 
    185 
    186                 if (count >= packet_count) {
    187                         report_results(ts,count,bytes);
    188                         count=0;
    189                         bytes=0;
    190                 }
    191         }
    192         report_results(ts,count,bytes);
    193 
     262        }*/
     263        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
     264        trace_set_tracetime(trace, true);
     265        trace_set_perpkt_threads(trace, threadcount);
     266
     267        //trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL);
     268
     269        if (trace_get_information(trace)->live) {
     270                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
     271        }
     272
     273        if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
     274                trace_perror(trace,"Failed to start trace");
     275                trace_destroy(trace);
     276                if (!merge_inputs)
     277                        output_destroy(output);
     278                return;
     279        }
     280
     281
     282        // Wait for all threads to stop
     283        trace_join(trace);
     284       
     285        // Flush the last one out
     286        report_results((glob_last_ts >> 32), count, bytes);
    194287        if (trace_is_err(trace))
    195288                trace_perror(trace,"%s",uri);
     
    199292        if (!merge_inputs)
    200293                output_destroy(output);
    201 
    202         trace_destroy_packet(packet);
    203 }
    204 
     294       
     295}
     296// TODO Decide what to do with -c option
    205297static void usage(char *argv0)
    206298{
     
    209301        "-i --interval=seconds  Duration of reporting interval in seconds\n"
    210302        "-c --count=packets     Exit after count packets received\n"
     303        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
    211304        "-o --output-format=txt|csv|html|png Reporting output format\n"
    212305        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
     
    229322                        { "libtrace-help",      0, 0, 'H' },
    230323                        { "merge-inputs",       0, 0, 'm' },
     324                        { "threads",            1, 0, 't' },
    231325                        { NULL,                 0, 0, 0   },
    232326                };
    233327
    234                 int c=getopt_long(argc, argv, "c:f:i:o:Hm",
     328                int c=getopt_long(argc, argv, "c:f:i:o:t:Hm",
    235329                                long_options, &option_index);
    236330
     
    247341                                filters[filter_count-1].bytes=0;
    248342                                break;
     343                        case 't':
     344                                threadcount = atoi(optarg);
     345                                if (threadcount <= 0)
     346                                        threadcount = 1;
     347                                break;
    249348                        case 'i':
    250349                                packet_interval=atof(optarg);
     
    284383       
    285384       
    286 
    287385        if (merge_inputs) {
    288386                /* If we're merging the inputs, we only want to create all
     
    295393                if (output == NULL)
    296394                        return 0;
    297 
    298395        }
    299396               
     
    308405
    309406
    310         return 0;
    311 }
     407        return 0;
     408}
Note: See TracChangeset for help on using the changeset viewer.