Changeset 49b0537


Ignore:
Timestamp:
09/10/15 14:10:46 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
9e48735
Parents:
a2dcdad
Message:

Re-write tracertstats_parallel to actually work

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tools/tracertstats/tracertstats_parallel.c

    r58bfabf r49b0537  
    133133} result_t;
    134134
    135 static uint64_t last_ts = 0;
     135static uint64_t glob_last_ts = 0;
    136136static void process_result(libtrace_t *trace UNUSED, int mesg,
    137137                           libtrace_generic_t data,
     
    145145                ts = data.res->key;
    146146                res = data.res->value.ptr;
    147                 if (last_ts == 0)
    148                         last_ts = ts;
    149                 while (last_ts < ts) {
    150                         report_results((double) last_ts * (double) packet_interval, count, bytes);
     147                if (glob_last_ts == 0)
     148                        glob_last_ts = ts;
     149                while ((glob_last_ts >> 32) < (ts >> 32)) {
     150                        report_results(glob_last_ts >> 32, count, bytes);
    151151                        count = 0;
    152152                        bytes = 0;
    153153                        for (j = 0; j < filter_count; j++)
    154154                                filters[j].count = filters[j].bytes = 0;
    155                         last_ts++;
     155                        glob_last_ts = ts;
    156156                }
    157157                count += res->total.count;
     
    175175{
    176176        int i;
    177         static __thread uint64_t last_ts = 0, ts = 0;
    178177        static __thread result_t * results = NULL;
     178        uint64_t key;
     179        static __thread uint64_t last_key = 0;
    179180
    180181        switch(mesg) {
    181182        case MESSAGE_PACKET:
    182                 // Unsure when we would hit this case but the old code had it, I
    183                 // guess we should keep it
    184                 assert(trace_get_packet_buffer(data.pkt,NULL,NULL) != NULL);
    185                 ts = trace_get_seconds(data.pkt) / packet_interval;
    186                 if (last_ts == 0)
    187                         last_ts = ts;
    188 
    189                 while (packet_interval != UINT64_MAX && last_ts<ts) {
    190                         libtrace_generic_t tmp = {.ptr = results};
    191                         // Publish and make a new one new
    192                         //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    193                         trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER);
    194                         trace_post_reporter(trace);
    195                         results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    196                         last_ts++;
    197                 }
     183                key = trace_get_erf_timestamp(data.pkt);
     184                if ((key >> 32) > (last_key >> 32) + packet_interval) {
     185                        libtrace_generic_t tmp = {.ptr = results};
     186                        trace_publish_result(trace, t, key,
     187                                        tmp, RESULT_USER);
     188                        trace_post_reporter(trace);
     189                        last_key = key;
     190                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     191
     192                }
    198193
    199194                for(i=0;i<filter_count;++i) {
     
    206201                results->total.count++;
    207202                results->total.bytes +=trace_get_wire_length(data.pkt);
    208                 /*if (count >= packet_count) {
    209                         report_results(ts,count,bytes);
    210                         count=0;
    211                         bytes=0;
    212                 }*/ // TODO what was happening here doesn't match up with any of the documentations!!!
    213203                return data.pkt;
    214204
     
    221211                if (results->total.count) {
    222212                        libtrace_generic_t tmp = {.ptr = results};
    223                         trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER);
     213                        trace_publish_result(trace, t, last_key, tmp, RESULT_USER);
    224214                        trace_post_reporter(trace);
     215                        free(results);
    225216                        results = NULL;
    226217                }
     
    230221        case MESSAGE_TICK_COUNT:
    231222                {
    232                         int64_t offset;
    233                         const struct timeval *tv;
    234                         struct timeval tv_real;
    235                         const libtrace_packet_t *first_packet = NULL;
    236                         trace_get_first_packet(trace, NULL, &first_packet, &tv);
    237                         if (first_packet != NULL) {
    238                                 // So figure out our running offset
    239                                 tv_real = trace_get_timeval(first_packet);
    240                                 offset = tv_to_usec(tv) - tv_to_usec(&tv_real);
    241                                 // Get time of day and do this stuff
    242                                 uint64_t next_update_time;
    243                                 next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset;
    244                                 if (next_update_time <= data.uint64) {
    245                                         libtrace_generic_t tmp = {.ptr = results};
    246                                         //fprintf(stderr, "Got a tick and publishing early!!\n");
    247                                         trace_publish_result(trace, t, (uint64_t) last_ts, tmp, RESULT_USER);
    248                                         trace_post_reporter(trace);
    249                                         results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    250                                         last_ts++;
    251                                 } else {
    252                                         //fprintf(stderr, "Got a tick but no publish ...\n");
    253                                 }
    254                         } else {
    255                                 //fprintf(stderr, "Got a tick but no packets seen yet!!!\n");
    256                         }
     223                        if (data.uint64 > last_key) {
     224                                libtrace_generic_t tmp = {.ptr = results};
     225                                trace_publish_result(trace, t, data.uint64,
     226                                                tmp, RESULT_USER);
     227                                trace_post_reporter(trace);
     228                                last_key = data.uint64;
     229                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     230                        }
     231                        break;
    257232                }
    258233        }
    259234        return NULL;
    260 }
    261 
    262 static uint64_t bad_hash(const libtrace_packet_t * pkt UNUSED, void *data UNUSED) {
    263         return 0;
    264235}
    265236
     
    267238static void run_trace(char *uri)
    268239{
    269         int j;
    270 
    271240        if (!merge_inputs)
    272241                create_output(uri);
     
    297266
    298267        if (trace_get_information(trace)->live) {
    299                 trace_set_tick_interval(trace, (int) (packet_interval * 1000));
     268                trace_set_tick_interval(trace, (int) (packet_interval * 1000));
    300269        }
    301270
     
    313282       
    314283        // Flush the last one out
    315         report_results((double) last_ts * (double) packet_interval, count, bytes);
    316         //count = 0;
    317         //bytes = 0;
    318         for (j = 0; j < filter_count; j++)
    319                 filters[j].count = filters[j].bytes = 0;
    320         (last_ts)++;
    321        
     284        report_results((glob_last_ts >> 32), count, bytes);
    322285        if (trace_is_err(trace))
    323286                trace_perror(trace,"%s",uri);
Note: See TracChangeset for help on using the changeset viewer.