Changeset 8c7490fe


Ignore:
Timestamp:
09/11/15 10:28:44 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
f625817
Parents:
ab7e4ee
Message:

Rewrite tracertstats to use new callback API

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tools/tracertstats/tracertstats.c

    rab7e4ee r8c7490fe  
    173173}
    174174
    175 typedef struct timestamp_sync {
    176         int64_t difference_usecs;
    177         uint64_t first_interval_number;
    178 } timestamp_sync_t;
    179 
    180 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    181                         int mesg, libtrace_generic_t data,
    182                         libtrace_thread_t *sender UNUSED)
     175typedef struct threadlocal {
     176        result_t *results;
     177        uint64_t last_key;
     178} thread_data_t;
     179
     180static void *cb_starting(libtrace_t *trace UNUSED,
     181        libtrace_thread_t *t UNUSED, void *global UNUSED)
    183182{
    184         int i;
    185         static __thread result_t * results = NULL;
     183        thread_data_t *td = calloc(1, sizeof(thread_data_t));
     184        td->results = calloc(1, sizeof(result_t) +
     185                        sizeof(statistic_t) * filter_count);
     186        return td;
     187}
     188
     189static libtrace_packet_t *cb_packet(libtrace_t *trace, libtrace_thread_t *t,
     190                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
     191
    186192        uint64_t key;
    187         static __thread uint64_t last_key = 0;
    188 
    189         switch(mesg) {
    190         case MESSAGE_PACKET:
    191                 key = trace_get_erf_timestamp(data.pkt);
    192                 if ((key >> 32) > (last_key >> 32) + packet_interval) {
    193                         libtrace_generic_t tmp = {.ptr = results};
    194                         trace_publish_result(trace, t, key,
    195                                         tmp, RESULT_USER);
    196                         trace_post_reporter(trace);
    197                         last_key = key;
    198                         results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    199 
     193        thread_data_t *td = (thread_data_t *)tls;
     194        int i;
     195
     196        key = trace_get_erf_timestamp(packet);
     197        if ((key >> 32) > (td->last_key >> 32) + packet_interval) {
     198                libtrace_generic_t tmp = {.ptr = td->results};
     199                trace_publish_result(trace, t, key,
     200                                tmp, RESULT_USER);
     201                trace_post_reporter(trace);
     202                td->last_key = key;
     203                td->results = calloc(1, sizeof(result_t) +
     204                                sizeof(statistic_t) * filter_count);
     205        }
     206        for(i=0;i<filter_count;++i) {
     207                if(trace_apply_filter(filters[i].filter, packet)) {
     208                        td->results->filters[i].count++;
     209                        td->results->filters[i].bytes+=trace_get_wire_length(packet);
    200210                }
    201 
    202                 for(i=0;i<filter_count;++i) {
    203                         if(trace_apply_filter(filters[i].filter, data.pkt)) {
    204                                 results->filters[i].count++;
    205                                 results->filters[i].bytes+=trace_get_wire_length(data.pkt);
    206                         }
    207                 }
    208 
    209                 results->total.count++;
    210                 results->total.bytes +=trace_get_wire_length(data.pkt);
    211                 return data.pkt;
    212 
    213         case MESSAGE_STARTING:
    214                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    215                 break;
    216 
    217         case MESSAGE_STOPPING:
    218                 // Should we always post this?
    219                 if (results->total.count) {
    220                         libtrace_generic_t tmp = {.ptr = results};
    221                         trace_publish_result(trace, t, last_key, tmp, RESULT_USER);
    222                         trace_post_reporter(trace);
    223                         results = NULL;
    224                 }
    225                 break;
    226 
    227         case MESSAGE_TICK_INTERVAL:
    228         case MESSAGE_TICK_COUNT:
    229                 {
    230                         if (data.uint64 > last_key) {
    231                                 libtrace_generic_t tmp = {.ptr = results};
    232                                 trace_publish_result(trace, t, data.uint64,
    233                                                 tmp, RESULT_USER);
    234                                 trace_post_reporter(trace);
    235                                 last_key = data.uint64;
    236                                 results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    237                         }
    238                         break;
    239                 }
    240         }
    241         return NULL;
     211        }
     212
     213        td->results->total.count++;
     214        td->results->total.bytes +=trace_get_wire_length(packet);
     215        return packet;
     216}
     217
     218static void cb_stopping(libtrace_t *trace, libtrace_thread_t *t,
     219                void *global UNUSED, void *tls) {
     220
     221        thread_data_t *td = (thread_data_t *)tls;
     222        if (td->results->total.count) {
     223                libtrace_generic_t tmp = {.ptr = td->results};
     224                trace_publish_result(trace, t, td->last_key, tmp, RESULT_USER);
     225                trace_post_reporter(trace);
     226                td->results = NULL;
     227        }
     228}
     229
     230static void cb_tick(libtrace_t *trace, libtrace_thread_t *t,
     231                void *global UNUSED, void *tls, uint64_t order) {
     232
     233        thread_data_t *td = (thread_data_t *)tls;
     234        if (order > td->last_key) {
     235                libtrace_generic_t tmp = {.ptr = td->results};
     236                trace_publish_result(trace, t, order, tmp, RESULT_USER);
     237                trace_post_reporter(trace);
     238                td->last_key = order;
     239                td->results = calloc(1, sizeof(result_t) +
     240                                sizeof(statistic_t) * filter_count);
     241        }
    242242}
    243243
     
    268268        }
    269269
    270         if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
     270        trace_cb_starting(trace, cb_starting);
     271        trace_cb_stopping(trace, cb_stopping);
     272        trace_cb_packet(trace, cb_packet);
     273        trace_cb_tick_count(trace, cb_tick);
     274        trace_cb_tick_interval(trace, cb_tick);
     275
     276        if (trace_pstart(trace, NULL, NULL, process_result)==-1) {
    271277                trace_perror(trace,"Failed to start trace");
    272278                trace_destroy(trace);
Note: See TracChangeset for help on using the changeset viewer.