Changeset c3cb9f9


Ignore:
Timestamp:
09/10/15 16:39:46 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
9346e4a
Parents:
9e48735
Message:

Rename parallel tracertstats to "tracertstats"

The old tracertstats is now called tracertstats_single and isn't built
or distributed anymore.

Location:
tools/tracertstats
Files:
1 edited
1 moved

Legend:

Unmodified
Added
Removed
  • tools/tracertstats/tracertstats.c

    rd0cae69 rc3cb9f9  
    5252#include <lt_inttypes.h>
    5353
    54 #include "libtrace.h"
     54#include "libtrace_parallel.h"
    5555#include "output.h"
    5656#include "rt_protocol.h"
     
    6767
    6868int merge_inputs = 0;
     69int threadcount = 4;
    6970
    7071struct filter_t {
     
    120121}
    121122
     123uint64_t count;
     124uint64_t bytes;
     125
     126typedef struct statistic {
     127        uint64_t count;
     128        uint64_t bytes;
     129} statistic_t;
     130
     131typedef struct result {
     132        struct statistic total;
     133        struct statistic filters[0];
     134} result_t;
     135
     136static uint64_t glob_last_ts = 0;
     137static void process_result(libtrace_t *trace UNUSED, int mesg,
     138                           libtrace_generic_t data,
     139                           libtrace_thread_t *sender UNUSED) {
     140        static uint64_t ts = 0;
     141        int j;
     142        result_t *res;
     143
     144        switch (mesg) {
     145                case MESSAGE_RESULT:
     146                ts = data.res->key;
     147                res = data.res->value.ptr;
     148                if (glob_last_ts == 0)
     149                        glob_last_ts = ts;
     150                while ((glob_last_ts >> 32) < (ts >> 32)) {
     151                        report_results(glob_last_ts >> 32, count, bytes);
     152                        count = 0;
     153                        bytes = 0;
     154                        for (j = 0; j < filter_count; j++)
     155                                filters[j].count = filters[j].bytes = 0;
     156                        glob_last_ts = ts;
     157                }
     158                count += res->total.count;
     159                bytes += res->total.bytes;
     160                for (j = 0; j < filter_count; j++) {
     161                        filters[j].count += res->filters[j].count;
     162                        filters[j].bytes += res->filters[j].bytes;
     163                }
     164                free(res);
     165        }
     166}
     167
     168typedef struct timestamp_sync {
     169        int64_t difference_usecs;
     170        uint64_t first_interval_number;
     171} timestamp_sync_t;
     172
     173static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
     174                        int mesg, libtrace_generic_t data,
     175                        libtrace_thread_t *sender UNUSED)
     176{
     177        int i;
     178        static __thread result_t * results = NULL;
     179        uint64_t key;
     180        static __thread uint64_t last_key = 0;
     181
     182        switch(mesg) {
     183        case MESSAGE_PACKET:
     184                key = trace_get_erf_timestamp(data.pkt);
     185                if ((key >> 32) > (last_key >> 32) + packet_interval) {
     186                        libtrace_generic_t tmp = {.ptr = results};
     187                        trace_publish_result(trace, t, key,
     188                                        tmp, RESULT_USER);
     189                        trace_post_reporter(trace);
     190                        last_key = key;
     191                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     192
     193                }
     194
     195                for(i=0;i<filter_count;++i) {
     196                        if(trace_apply_filter(filters[i].filter, data.pkt)) {
     197                                results->filters[i].count++;
     198                                results->filters[i].bytes+=trace_get_wire_length(data.pkt);
     199                        }
     200                }
     201
     202                results->total.count++;
     203                results->total.bytes +=trace_get_wire_length(data.pkt);
     204                return data.pkt;
     205
     206        case MESSAGE_STARTING:
     207                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     208                break;
     209
     210        case MESSAGE_STOPPING:
     211                // Should we always post this?
     212                if (results->total.count) {
     213                        libtrace_generic_t tmp = {.ptr = results};
     214                        trace_publish_result(trace, t, last_key, tmp, RESULT_USER);
     215                        trace_post_reporter(trace);
     216                        free(results);
     217                        results = NULL;
     218                }
     219                break;
     220
     221        case MESSAGE_TICK_INTERVAL:
     222        case MESSAGE_TICK_COUNT:
     223                {
     224                        if (data.uint64 > last_key) {
     225                                libtrace_generic_t tmp = {.ptr = results};
     226                                trace_publish_result(trace, t, data.uint64,
     227                                                tmp, RESULT_USER);
     228                                trace_post_reporter(trace);
     229                                last_key = data.uint64;
     230                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     231                        }
     232                        break;
     233                }
     234        }
     235        return NULL;
     236}
     237
    122238/* Process a trace, counting packets that match filter(s) */
    123 static void run_trace(char *uri) 
     239static void run_trace(char *uri)
    124240{
    125         struct libtrace_packet_t *packet = trace_create_packet();
    126         int i;
    127         uint64_t count = 0;
    128         uint64_t bytes = 0;
    129         double last_ts = 0;
    130         double ts = 0;
    131 
    132241        if (!merge_inputs)
    133242                create_output(uri);
     
    136245                return;
    137246
    138         trace = trace_create(uri);
     247        trace = trace_create(uri);
    139248        if (trace_is_err(trace)) {
    140249                trace_perror(trace,"trace_create");
     
    142251                if (!merge_inputs)
    143252                        output_destroy(output);
    144                 return;
    145         }
     253                return;
     254        }
     255        /*
    146256        if (trace_start(trace)==-1) {
    147257                trace_perror(trace,"trace_start");
     
    150260                        output_destroy(output);
    151261                return;
    152         }
    153 
    154         for (;;) {
    155                 int psize;
    156                 if ((psize = trace_read_packet(trace, packet)) <1) {
    157                         break;
    158                 }
    159                
    160                 if (trace_get_packet_buffer(packet,NULL,NULL) == NULL) {
    161                         continue;
    162                 }
    163                
    164                 ts = trace_get_seconds(packet);
    165 
    166                 if (last_ts == 0)
    167                         last_ts = ts;
    168 
    169                 while (packet_interval != UINT64_MAX && last_ts<ts) {
    170                         report_results(last_ts,count,bytes);
    171                         count=0;
    172                         bytes=0;
    173                         last_ts+=packet_interval;
    174                 }
    175                 for(i=0;i<filter_count;++i) {
    176                         if(trace_apply_filter(filters[i].filter,packet)) {
    177                                 ++filters[i].count;
    178                                 filters[i].bytes+=trace_get_wire_length(packet);
    179                         }
    180                 }
    181 
    182                 ++count;
    183                 bytes+=trace_get_wire_length(packet);
    184 
    185 
    186                 if (count >= packet_count) {
    187                         report_results(ts,count,bytes);
    188                         count=0;
    189                         bytes=0;
    190                 }
    191         }
    192         report_results(ts,count,bytes);
    193 
     262        }*/
     263        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
     264        trace_set_tracetime(trace, true);
     265        trace_set_perpkt_threads(trace, threadcount);
     266
     267        //trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL);
     268
     269        if (trace_get_information(trace)->live) {
     270                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
     271        }
     272
     273        if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
     274                trace_perror(trace,"Failed to start trace");
     275                trace_destroy(trace);
     276                if (!merge_inputs)
     277                        output_destroy(output);
     278                return;
     279        }
     280
     281
     282        // Wait for all threads to stop
     283        trace_join(trace);
     284       
     285        // Flush the last one out
     286        report_results((glob_last_ts >> 32), count, bytes);
    194287        if (trace_is_err(trace))
    195288                trace_perror(trace,"%s",uri);
     
    199292        if (!merge_inputs)
    200293                output_destroy(output);
    201 
    202         trace_destroy_packet(packet);
    203 }
    204 
     294       
     295}
     296// TODO Decide what to do with -c option
    205297static void usage(char *argv0)
    206298{
     
    209301        "-i --interval=seconds  Duration of reporting interval in seconds\n"
    210302        "-c --count=packets     Exit after count packets received\n"
     303        "-t --threads=max       Create 'max' processing threads (default: 4)\n"
    211304        "-o --output-format=txt|csv|html|png Reporting output format\n"
    212305        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
     
    229322                        { "libtrace-help",      0, 0, 'H' },
    230323                        { "merge-inputs",       0, 0, 'm' },
     324                        { "threads",            1, 0, 't' },
    231325                        { NULL,                 0, 0, 0   },
    232326                };
    233327
    234                 int c=getopt_long(argc, argv, "c:f:i:o:Hm",
     328                int c=getopt_long(argc, argv, "c:f:i:o:t:Hm",
    235329                                long_options, &option_index);
    236330
     
    247341                                filters[filter_count-1].bytes=0;
    248342                                break;
     343                        case 't':
     344                                threadcount = atoi(optarg);
     345                                if (threadcount <= 0)
     346                                        threadcount = 1;
     347                                break;
    249348                        case 'i':
    250349                                packet_interval=atof(optarg);
     
    284383       
    285384       
    286 
    287385        if (merge_inputs) {
    288386                /* If we're merging the inputs, we only want to create all
     
    295393                if (output == NULL)
    296394                        return 0;
    297 
    298395        }
    299396               
     
    308405
    309406
    310         return 0;
    311 }
     407        return 0;
     408}
  • tools/tracertstats/tracertstats_single.c

    r9e48735 rc3cb9f9  
    5252#include <lt_inttypes.h>
    5353
    54 #include "libtrace_parallel.h"
     54#include "libtrace.h"
    5555#include "output.h"
    5656#include "rt_protocol.h"
     
    6767
    6868int merge_inputs = 0;
    69 int threadcount = 4;
    7069
    7170struct filter_t {
     
    121120}
    122121
    123 uint64_t count;
    124 uint64_t bytes;
    125 
    126 typedef struct statistic {
    127         uint64_t count;
    128         uint64_t bytes;
    129 } statistic_t;
    130 
    131 typedef struct result {
    132         struct statistic total;
    133         struct statistic filters[0];
    134 } result_t;
    135 
    136 static uint64_t glob_last_ts = 0;
    137 static void process_result(libtrace_t *trace UNUSED, int mesg,
    138                            libtrace_generic_t data,
    139                            libtrace_thread_t *sender UNUSED) {
    140         static uint64_t ts = 0;
    141         int j;
    142         result_t *res;
    143 
    144         switch (mesg) {
    145                 case MESSAGE_RESULT:
    146                 ts = data.res->key;
    147                 res = data.res->value.ptr;
    148                 if (glob_last_ts == 0)
    149                         glob_last_ts = ts;
    150                 while ((glob_last_ts >> 32) < (ts >> 32)) {
    151                         report_results(glob_last_ts >> 32, count, bytes);
    152                         count = 0;
    153                         bytes = 0;
    154                         for (j = 0; j < filter_count; j++)
    155                                 filters[j].count = filters[j].bytes = 0;
    156                         glob_last_ts = ts;
    157                 }
    158                 count += res->total.count;
    159                 bytes += res->total.bytes;
    160                 for (j = 0; j < filter_count; j++) {
    161                         filters[j].count += res->filters[j].count;
    162                         filters[j].bytes += res->filters[j].bytes;
    163                 }
    164                 free(res);
    165         }
    166 }
    167 
    168 typedef struct timestamp_sync {
    169         int64_t difference_usecs;
    170         uint64_t first_interval_number;
    171 } timestamp_sync_t;
    172 
    173 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    174                         int mesg, libtrace_generic_t data,
    175                         libtrace_thread_t *sender UNUSED)
     122/* Process a trace, counting packets that match filter(s) */
     123static void run_trace(char *uri)
    176124{
     125        struct libtrace_packet_t *packet = trace_create_packet();
    177126        int i;
    178         static __thread result_t * results = NULL;
    179         uint64_t key;
    180         static __thread uint64_t last_key = 0;
    181 
    182         switch(mesg) {
    183         case MESSAGE_PACKET:
    184                 key = trace_get_erf_timestamp(data.pkt);
    185                 if ((key >> 32) > (last_key >> 32) + packet_interval) {
    186                         libtrace_generic_t tmp = {.ptr = results};
    187                         trace_publish_result(trace, t, key,
    188                                         tmp, RESULT_USER);
    189                         trace_post_reporter(trace);
    190                         last_key = key;
    191                         results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    192 
    193                 }
    194 
    195                 for(i=0;i<filter_count;++i) {
    196                         if(trace_apply_filter(filters[i].filter, data.pkt)) {
    197                                 results->filters[i].count++;
    198                                 results->filters[i].bytes+=trace_get_wire_length(data.pkt);
    199                         }
    200                 }
    201 
    202                 results->total.count++;
    203                 results->total.bytes +=trace_get_wire_length(data.pkt);
    204                 return data.pkt;
    205 
    206         case MESSAGE_STARTING:
    207                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    208                 break;
    209 
    210         case MESSAGE_STOPPING:
    211                 // Should we always post this?
    212                 if (results->total.count) {
    213                         libtrace_generic_t tmp = {.ptr = results};
    214                         trace_publish_result(trace, t, last_key, tmp, RESULT_USER);
    215                         trace_post_reporter(trace);
    216                         free(results);
    217                         results = NULL;
    218                 }
    219                 break;
    220 
    221         case MESSAGE_TICK_INTERVAL:
    222         case MESSAGE_TICK_COUNT:
    223                 {
    224                         if (data.uint64 > last_key) {
    225                                 libtrace_generic_t tmp = {.ptr = results};
    226                                 trace_publish_result(trace, t, data.uint64,
    227                                                 tmp, RESULT_USER);
    228                                 trace_post_reporter(trace);
    229                                 last_key = data.uint64;
    230                                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    231                         }
    232                         break;
    233                 }
    234         }
    235         return NULL;
    236 }
    237 
    238 /* Process a trace, counting packets that match filter(s) */
    239 static void run_trace(char *uri)
    240 {
     127        uint64_t count = 0;
     128        uint64_t bytes = 0;
     129        double last_ts = 0;
     130        double ts = 0;
     131
    241132        if (!merge_inputs)
    242133                create_output(uri);
     
    245136                return;
    246137
    247         trace = trace_create(uri);
     138        trace = trace_create(uri);
    248139        if (trace_is_err(trace)) {
    249140                trace_perror(trace,"trace_create");
     
    251142                if (!merge_inputs)
    252143                        output_destroy(output);
    253                 return;
    254         }
    255         /*
     144                return;
     145        }
    256146        if (trace_start(trace)==-1) {
    257147                trace_perror(trace,"trace_start");
     
    260150                        output_destroy(output);
    261151                return;
    262         }*/
    263         trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    264         trace_set_tracetime(trace, true);
    265         trace_set_perpkt_threads(trace, threadcount);
    266 
    267         //trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL);
    268 
    269         if (trace_get_information(trace)->live) {
    270                 trace_set_tick_interval(trace, (int) (packet_interval * 1000));
    271         }
    272 
    273         if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
    274                 trace_perror(trace,"Failed to start trace");
    275                 trace_destroy(trace);
    276                 if (!merge_inputs)
    277                         output_destroy(output);
    278                 return;
    279         }
    280 
    281 
    282         // Wait for all threads to stop
    283         trace_join(trace);
    284        
    285         // Flush the last one out
    286         report_results((glob_last_ts >> 32), count, bytes);
     152        }
     153
     154        for (;;) {
     155                int psize;
     156                if ((psize = trace_read_packet(trace, packet)) <1) {
     157                        break;
     158                }
     159               
     160                if (trace_get_packet_buffer(packet,NULL,NULL) == NULL) {
     161                        continue;
     162                }
     163               
     164                ts = trace_get_seconds(packet);
     165
     166                if (last_ts == 0)
     167                        last_ts = ts;
     168
     169                while (packet_interval != UINT64_MAX && last_ts<ts) {
     170                        report_results(last_ts,count,bytes);
     171                        count=0;
     172                        bytes=0;
     173                        last_ts+=packet_interval;
     174                }
     175                for(i=0;i<filter_count;++i) {
     176                        if(trace_apply_filter(filters[i].filter,packet)) {
     177                                ++filters[i].count;
     178                                filters[i].bytes+=trace_get_wire_length(packet);
     179                        }
     180                }
     181
     182                ++count;
     183                bytes+=trace_get_wire_length(packet);
     184
     185
     186                if (count >= packet_count) {
     187                        report_results(ts,count,bytes);
     188                        count=0;
     189                        bytes=0;
     190                }
     191        }
     192        report_results(ts,count,bytes);
     193
    287194        if (trace_is_err(trace))
    288195                trace_perror(trace,"%s",uri);
     
    292199        if (!merge_inputs)
    293200                output_destroy(output);
    294        
    295 }
    296 // TODO Decide what to do with -c option
     201
     202        trace_destroy_packet(packet);
     203}
     204
    297205static void usage(char *argv0)
    298206{
     
    301209        "-i --interval=seconds  Duration of reporting interval in seconds\n"
    302210        "-c --count=packets     Exit after count packets received\n"
    303         "-t --threads=max       Create 'max' processing threads (default: 4)\n"
    304211        "-o --output-format=txt|csv|html|png Reporting output format\n"
    305212        "-f --filter=bpf        Apply BPF filter. Can be specified multiple times\n"
     
    322229                        { "libtrace-help",      0, 0, 'H' },
    323230                        { "merge-inputs",       0, 0, 'm' },
    324                         { "threads",            1, 0, 't' },
    325231                        { NULL,                 0, 0, 0   },
    326232                };
    327233
    328                 int c=getopt_long(argc, argv, "c:f:i:o:t:Hm",
     234                int c=getopt_long(argc, argv, "c:f:i:o:Hm",
    329235                                long_options, &option_index);
    330236
     
    341247                                filters[filter_count-1].bytes=0;
    342248                                break;
    343                         case 't':
    344                                 threadcount = atoi(optarg);
    345                                 if (threadcount <= 0)
    346                                         threadcount = 1;
    347                                 break;
    348249                        case 'i':
    349250                                packet_interval=atof(optarg);
     
    383284       
    384285       
     286
    385287        if (merge_inputs) {
    386288                /* If we're merging the inputs, we only want to create all
     
    393295                if (output == NULL)
    394296                        return 0;
     297
    395298        }
    396299               
     
    405308
    406309
    407         return 0;
    408 }
     310        return 0;
     311}
Note: See TracChangeset for help on using the changeset viewer.