Ignore:
Timestamp:
09/16/14 02:35:10 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
d994324
Parents:
50b1bee
Message:

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tools/tracertstats/tracertstats_parallel.c

    rf051c1b r2498008  
    136136} result_t;
    137137
    138 
    139 static int reduce(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
    140 {       
    141         int i,j;
    142         //uint64_t count=0, bytes=0;
     138static uint64_t last_ts = 0;
     139static void process_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg UNUSED)  {
    143140        static uint64_t ts = 0;
    144         libtrace_vector_t results;
    145         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    146         trace_get_results(trace, &results);
    147         //uint64_t packets;
    148        
    149         /* Get the results from each core and sum 'em up */
    150         for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
    151                 libtrace_result_t result;
    152                
    153                 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
    154                 ts = libtrace_result_get_key(&result);
    155                 if (*last_ts == 0)
    156                         *last_ts = ts;
    157                
    158                 result_t * res = libtrace_result_get_value(&result);
    159                 static result_t *  last_res = NULL;
    160                 // Memory manager might falsely trigger this
    161                 assert(res != last_res);
    162                 last_res = res;
    163                 //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
    164                 while (*last_ts < ts) {
    165                         report_results((double) *last_ts * (double) packet_interval, count, bytes);
     141
     142        if (result) {
     143                int j;
     144                result_t *res;
     145                ts = libtrace_result_get_key(result);
     146                res = libtrace_result_get_value(result);
     147                if (last_ts == 0)
     148                        last_ts = ts;
     149                while (last_ts < ts) {
     150                        report_results((double) last_ts * (double) packet_interval, count, bytes);
    166151                        count = 0;
    167152                        bytes = 0;
    168153                        for (j = 0; j < filter_count; j++)
    169154                                filters[j].count = filters[j].bytes = 0;
    170                         (*last_ts)++;
    171                 }
    172                
     155                        last_ts++;
     156                }
    173157                count += res->total.count;
    174158                bytes += res->total.bytes;
     
    179163                free(res);
    180164        }
    181         // Done with these results - Free internally and externally
    182         libtrace_vector_destroy(&results);
    183        
    184         return 0;
    185165}
    186166
     
    276256}
    277257
    278 static uint64_t bad_hash(const libtrace_packet_t * pkt, void *data) {
     258static uint64_t bad_hash(const libtrace_packet_t * pkt UNUSED, void *data UNUSED) {
    279259        return 0;
    280260}
     
    284264{
    285265        int j;
    286         uint64_t last_ts = 0;
    287266
    288267        if (!merge_inputs)
     
    318297        }
    319298
    320         if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
     299        if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
    321300                trace_perror(trace,"Failed to start trace");
    322301                trace_destroy(trace);
     
    327306
    328307
    329         // reduce
    330         while (!trace_finished(trace)) {
    331                 // Read messages
    332                 libtrace_message_t message;
    333                
    334                 // We just release and do work currently, maybe if something
    335                 // interesting comes through we'd deal with that
    336                 libtrace_thread_get_message(trace, &message);
    337                
    338                 while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
    339                 reduce(trace, NULL, &last_ts);
    340         }
    341 
    342308        // Wait for all threads to stop
    343309        trace_join(trace);
    344310       
    345         reduce(trace, NULL, &last_ts);
    346311        // Flush the last one out
    347312        report_results((double) last_ts * (double) packet_interval, count, bytes);
Note: See TracChangeset for help on using the changeset viewer.