Ignore:
Timestamp:
06/04/14 02:28:58 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
b13b939
Parents:
fac8c46
Message:

Adds a thread keepalive that sends a messages to the perpkt threads every second(still todo make this configurable)
Updated tracertstats to use this rather than the temporary result system which has been removed
Also fixes a handful of compile warnings

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tools/tracertstats/tracertstats_parallel.c

    r17a3dff r82facc5  
    6565
    6666#define DEFAULT_OUTPUT_FMT "txt"
    67 #define TRACE_TIME 1
    6867
    6968struct libtrace_t *trace;
     
    190189} timestamp_sync_t;
    191190
    192 
    193 static int reduce_tracetime(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
    194 {
    195         int i,j;
    196         //uint64_t count=0, bytes=0;
    197         static uint64_t ts = 0;
    198         libtrace_vector_t results;
    199         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    200         trace_get_results_check_temp(trace, &results, *last_ts);
    201         //trace_get_results(trace, &results);
    202         //uint64_t packets;
    203        
    204         /* Get the results from each core and sum 'em up */
    205         for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
    206                 libtrace_result_t result;
    207                
    208                 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
    209                 ts = libtrace_result_get_key(&result);
    210                 if (*last_ts == 0)
    211                         *last_ts = ts;
    212                
    213                 result_t * res = libtrace_result_get_value(&result);
    214                 static result_t *  last_res = NULL;
    215                 if (res == last_res) {
    216                         printf("Hmm could be asserting but I'm not ;)\n");
    217                 }
    218                 //assert(res != last_res);
    219                 last_res = res;
    220                 //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
    221                 /*while (*last_ts < ts) {
    222                         report_results((double) *last_ts * (double) packet_interval, count, bytes);
    223                         count = 0;
    224                         bytes = 0;
    225                         for (j = 0; j < filter_count; j++)
    226                                 filters[j].count = filters[j].bytes = 0;
    227                         (*last_ts)++;
    228                 }*/
    229                
    230                 count += res->total.count;
    231                 bytes += res->total.bytes;
    232                 for (j = 0; j < filter_count; j++) {
    233                         filters[j].count += res->filters[j].count;
    234                         filters[j].bytes += res->filters[j].bytes;
    235                 }
    236                 free(res);
    237         }
    238         report_results((double) *last_ts * (double) packet_interval, count, bytes);
    239         count = 0;
    240         bytes = 0;
    241         for (j = 0; j < filter_count; j++)
    242                 filters[j].count = filters[j].bytes = 0;
    243         (*last_ts)++;
    244        
    245         // Done with these results - Free internally and externally
    246         libtrace_vector_destroy(&results);
    247        
    248         return 0;
    249 }
    250 
    251191static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt,
    252192                                                libtrace_message_t *mesg,
     
    259199        // Unsure when we would hit this case but the old code had it, I
    260200        // guess we should keep it
    261         if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) == NULL) {
    262                
     201        if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
     202                //fprintf(stderr, "Got packet t=%x\n", t);
    263203                ts = trace_get_seconds(pkt) / packet_interval;
    264204                if (last_ts == 0)
    265205                        last_ts = ts;
    266                
     206
    267207                while (packet_interval != UINT64_MAX && last_ts<ts) {
    268208                        // Publish and make a new one new
     209                        fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    269210                        trace_publish_result(trace, (uint64_t) last_ts, results);
    270211                        trace_post_reduce(trace);
     
    296237                                break;
    297238                        case MESSAGE_STOPPED:
     239                                // Should we always post this?
    298240                                if (results->total.count) {
    299241                                        trace_publish_result(trace, (uint64_t) last_ts, results);
    300242                                        trace_post_reduce(trace);
     243                                        results = NULL;
    301244                                }
     245                                break;
     246                        case MESSAGE_TICK:
     247                        {
     248                                int64_t offset;
     249                                struct timeval *tv, tv_real;
     250                                libtrace_packet_t *first_packet = NULL;
     251                                retrive_first_packet(trace, &first_packet, &tv);
     252                                if (first_packet != NULL) {
     253                                        // So figure out our running offset
     254                                        tv_real = trace_get_timeval(first_packet);
     255                                        offset = tv_to_usec(tv) - tv_to_usec(&tv_real);
     256                                        // Get time of day and do this stuff
     257                                        uint64_t next_update_time;
     258                                        next_update_time = (last_ts*packet_interval + packet_interval) * 1000000 + offset;
     259                                        if (next_update_time <= mesg->additional.uint64) {
     260                                                fprintf(stderr, "Got a tick and publishing early!!\n");
     261                                                trace_publish_result(trace, (uint64_t) last_ts, results);
     262                                                trace_post_reduce(trace);
     263                                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
     264                                                last_ts++;
     265                                        } else {
     266                                                fprintf(stderr, "Got a tick but no publish ...\n");
     267                                        }
     268                                } else {
     269                                        fprintf(stderr, "Got a tick but no packets seen yet!!!\n");
     270                                }
     271                        }
    302272                }
    303273        }
    304274        return pkt;
    305275}
    306 void * trace_retrive_inprogress_result(libtrace_t *libtrace, uint64_t key);
    307 /**
    308  * A trace time version of map which will attempt to keep upto date
    309  * with the incoming data and detect cases where results are missing and
    310  * recover correctly.
    311  */
    312 static void* per_packet_tracetime(libtrace_t *trace, libtrace_packet_t *pkt,
    313                                                 libtrace_message_t *mesg,
    314                                                 libtrace_thread_t *t)
    315 {
    316         // Using first entry as total and those after for filter counts
    317         int i;
    318         static __thread uint64_t last_ts = 0, ts = 0;
    319         static __thread double debug_last = 0.0;
    320         static __thread result_t * tmp_result = NULL;
    321        
    322         if (pkt && trace_get_packet_buffer(pkt,NULL,NULL) != NULL) {
    323                 ts = trace_get_seconds(pkt) / packet_interval;
    324                
    325                 if (debug_last != 0.0 && debug_last > trace_get_seconds(pkt))
    326                         printf("packets out of order bitch :(\n");
    327                 debug_last = trace_get_seconds(pkt);
    328                 if (last_ts == 0)
    329                         last_ts = ts;
    330                
    331                 /*
    332                 while (packet_interval != UINT64_MAX && last_ts<ts) {
    333                         // Publish and make new
    334                         trace_publish_result(trace, (uint64_t) last_ts, results);
    335                         results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count);
    336                         memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    337                         last_ts++;
    338                 }*/
    339                
    340                 /* Calculate count for filters */
    341                 for(i=0;i<filter_count;++i) {
    342                         if(trace_apply_filter(filters[i].filter, pkt)) {
    343                                 tmp_result->filters[i].count = 1;
    344                                 tmp_result->filters[i].bytes = trace_get_wire_length(pkt);
    345                         } else {
    346                                 tmp_result->filters[i].count = 0;
    347                                 tmp_result->filters[i].bytes = 0;
    348                         }
    349                 }
    350                
    351                 /* Now Update the currently stored result */
    352                 result_t * results = (result_t *) trace_retrive_inprogress_result(trace, ts);
    353                
    354                 if (!results) {
    355                         results = malloc(sizeof(result_t) + sizeof(statistic_t) * filter_count);
    356                         memset(results, 0, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    357                 }
    358                 assert(results);
    359                 /* Now add to the current results */
    360                 results->total.count++;
    361                 results->total.bytes +=trace_get_wire_length(pkt);
    362                 /* Now add on filters */
    363                 for(i=0;i<filter_count;++i) {
    364                         results->filters[i].count += tmp_result->filters[i].count;
    365                         results->filters[i].bytes += tmp_result->filters[i].bytes;
    366                 }
    367                 /* Now release the lock and send it away place that back into the buffer */
    368                 trace_update_inprogress_result(trace, ts, (void *) results);
    369                 /*if (count >= packet_count) {
    370                         report_results(ts,count,bytes);
    371                         count=0;
    372                         bytes=0;
    373                 }*/ // Hmm what was happening here doesn't match up with any of the documentations!!!
    374         }
    375         if (mesg) {
    376                 // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    377                 switch (mesg->code) {
    378                         case MESSAGE_STARTED:
    379                                 tmp_result = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    380                                 break;
    381                         case MESSAGE_STOPPED:
    382                                 trace_retrive_inprogress_result(trace, 0);
    383                                 trace_update_inprogress_result(trace, 1, NULL);
    384                 }
    385         }
    386         // Done push the final results
    387         /*if (results->total.count)
    388                 trace_publish_result(trace, (uint64_t) last_ts, results);*/
    389        
    390         return pkt;
     276
     277static uint64_t bad_hash(const libtrace_packet_t * pkt, void *data) {
     278        return 0;
    391279}
    392280
     
    422310        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &i);
    423311        trace_parallel_config(trace, TRACE_OPTION_TRACETIME, &i);
     312        trace_set_hasher(trace, HASHER_CUSTOM, &bad_hash, NULL);
    424313#if TRACE_TIME
    425314        if (trace_pstart(trace, NULL, &per_packet_tracetime, NULL)==-1) {
     
    434323        }
    435324
    436 #if TRACE_TIME
    437         // First we wait for a message telling us that a timestamp has been
    438         // published this allows us to approximately synchronize with the time
    439         libtrace_message_t message;
    440         int64_t offset;
    441         libtrace_packet_t *packet;
    442         struct timeval *tv, tv_real;
    443        
    444        
    445         do {
    446                 // TODO Put a timeout here also
    447                 libtrace_thread_get_message(trace, &message);
    448         } while (retrive_first_packet(trace, &packet, &tv) == 0);
    449         tv_real = trace_get_timeval(packet);
    450         offset = tv_to_usec(&tv_real) - tv_to_usec(tv);
    451         last_ts = trace_get_seconds(packet) / packet_interval;
    452         printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts);
    453         /*
    454         while (!got_first) {
    455                 // Wait for a message indicating we've got our 'first' packet, note not a 100% guarantee its our first but pretty likely
    456                
    457                
    458                
    459                 assert(pthread_mutex_lock(&lock_more) == 0);
    460                
    461                 for (i=0; i < 2; ++i) {
    462                         if (initial_stamps[i].difference_usecs) { // Hmm certainly this cannot possibly lineup 100%??
    463                                 got_first=1;
    464                                 last_ts = initial_stamps[i].first_interval_number;
    465                                 offset = initial_stamps[i].difference_usecs;
    466                                 printf("Got first yay offset=%"PRId64" first_interval=%"PRIu64"\n", offset, last_ts);
    467                         }
    468                 }
    469                 assert(pthread_mutex_unlock(&lock_more) == 0);
    470         }*/
    471         while (!trace_finished(trace)) {
    472                 struct timeval tv;
    473                 // Now try our best to read that one out
    474                
    475                 // Read messages
    476                 //libtrace_thread_get_message(trace, &message);
    477                
    478                 // We just release and do work currently, maybe if something
    479                 // interesting comes through we'd deal with that
    480                 //libtrace_thread_get_message(trace, &message);
    481                
    482                 //while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
    483                
    484                 /* Now wait for a second after we should see the results */
    485                 uint64_t next_update_time, t_usec;
    486                 next_update_time = (last_ts*packet_interval + packet_interval + 1) * 1000000 + offset;
    487                 gettimeofday(&tv, NULL);
    488                 t_usec = tv.tv_sec;
    489                 t_usec *= 1000000;
    490                 t_usec += tv.tv_usec;
    491                
    492                 //printf("Current time=%"PRIu64" Next result ready=%"PRIu64" =%f\n", t_usec, next_update_time, ((double) next_update_time - (double) t_usec) / 1000000.0);
    493                 if (next_update_time > t_usec) {
    494                         tv.tv_sec = (next_update_time - t_usec) / 1000000;
    495                         tv.tv_usec = (next_update_time - t_usec) % 1000000;
    496                         select(0, NULL, NULL, NULL, &tv);
    497                 }
    498                 reduce_tracetime(trace, NULL, &last_ts);
    499         }
    500 #else
     325
    501326        // reduce
    502327        while (!trace_finished(trace)) {
     
    511336                reduce(trace, NULL, &last_ts);
    512337        }
    513 #endif
    514338
    515339        // Wait for all threads to stop
Note: See TracChangeset for help on using the changeset viewer.