Changeset 2498008 for tools


Ignore:
Timestamp:
09/16/14 02:35:10 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
d994324
Parents:
50b1bee
Message:

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

Location:
tools
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • tools/traceanon/traceanon_parallel.c

    r50b1bee r2498008  
    208208struct libtrace_out_t *writer = 0;
    209209
    210 static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
     210static void write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
    211211        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
    212212
     
    226226                }
    227227        }
    228         return NULL;
    229228}
    230229
  • tools/tracertstats/tracertstats_parallel.c

    rf051c1b r2498008  
    136136} result_t;
    137137
    138 
    139 static int reduce(libtrace_t* trace, void* global_blob, uint64_t *last_ts)
    140 {       
    141         int i,j;
    142         //uint64_t count=0, bytes=0;
     138static uint64_t last_ts = 0;
     139static void process_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg UNUSED)  {
    143140        static uint64_t ts = 0;
    144         libtrace_vector_t results;
    145         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    146         trace_get_results(trace, &results);
    147         //uint64_t packets;
    148        
    149         /* Get the results from each core and sum 'em up */
    150         for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
    151                 libtrace_result_t result;
    152                
    153                 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
    154                 ts = libtrace_result_get_key(&result);
    155                 if (*last_ts == 0)
    156                         *last_ts = ts;
    157                
    158                 result_t * res = libtrace_result_get_value(&result);
    159                 static result_t *  last_res = NULL;
    160                 // Memory manager might falsely trigger this
    161                 assert(res != last_res);
    162                 last_res = res;
    163                 //printf("Perpkt published %"PRIu64" - c=%"PRIu64"\n", ts, res->total.count);
    164                 while (*last_ts < ts) {
    165                         report_results((double) *last_ts * (double) packet_interval, count, bytes);
     141
     142        if (result) {
     143                int j;
     144                result_t *res;
     145                ts = libtrace_result_get_key(result);
     146                res = libtrace_result_get_value(result);
     147                if (last_ts == 0)
     148                        last_ts = ts;
     149                while (last_ts < ts) {
     150                        report_results((double) last_ts * (double) packet_interval, count, bytes);
    166151                        count = 0;
    167152                        bytes = 0;
    168153                        for (j = 0; j < filter_count; j++)
    169154                                filters[j].count = filters[j].bytes = 0;
    170                         (*last_ts)++;
    171                 }
    172                
     155                        last_ts++;
     156                }
    173157                count += res->total.count;
    174158                bytes += res->total.bytes;
     
    179163                free(res);
    180164        }
    181         // Done with these results - Free internally and externally
    182         libtrace_vector_destroy(&results);
    183        
    184         return 0;
    185165}
    186166
     
    276256}
    277257
    278 static uint64_t bad_hash(const libtrace_packet_t * pkt, void *data) {
     258static uint64_t bad_hash(const libtrace_packet_t * pkt UNUSED, void *data UNUSED) {
    279259        return 0;
    280260}
     
    284264{
    285265        int j;
    286         uint64_t last_ts = 0;
    287266
    288267        if (!merge_inputs)
     
    318297        }
    319298
    320         if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
     299        if (trace_pstart(trace, NULL, &per_packet, process_result)==-1) {
    321300                trace_perror(trace,"Failed to start trace");
    322301                trace_destroy(trace);
     
    327306
    328307
    329         // reduce
    330         while (!trace_finished(trace)) {
    331                 // Read messages
    332                 libtrace_message_t message;
    333                
    334                 // We just release and do work currently, maybe if something
    335                 // interesting comes through we'd deal with that
    336                 libtrace_thread_get_message(trace, &message);
    337                
    338                 while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
    339                 reduce(trace, NULL, &last_ts);
    340         }
    341 
    342308        // Wait for all threads to stop
    343309        trace_join(trace);
    344310       
    345         reduce(trace, NULL, &last_ts);
    346311        // Flush the last one out
    347312        report_results((double) last_ts * (double) packet_interval, count, bytes);
  • tools/tracestats/tracestats_parallel.c

    rf051c1b r2498008  
    150150}
    151151
    152 static int reduce(libtrace_t* trace, void* global_blob)
    153 {
    154         int i,j;
    155         uint64_t count=0, bytes=0;
    156         libtrace_vector_t results;
    157         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    158         trace_get_results(trace, &results);
     152static void report_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg) {
     153        static uint64_t count=0, bytes=0;
    159154        uint64_t packets;
    160        
    161         /* Get the results from each core and sum 'em up */
    162         for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
    163                 libtrace_result_t result;
    164                 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
    165                 assert(libtrace_result_get_key(&result) == 0);
    166                 statistics_t * res = libtrace_result_get_value(&result);
     155        int i;
     156        if (result) {
     157                int j;
     158                /* Get the results from each core and sum 'em up */
     159                assert(libtrace_result_get_key(result) == 0);
     160                statistics_t * res = libtrace_result_get_value(result);
    167161                count += res[0].count;
    168162                bytes += res[0].bytes;
     
    172166                }
    173167                free(res);
    174         }
    175         // Done with these results - Free internally and externally
    176         libtrace_vector_destroy(&results);
    177 
    178     printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
    179         for(i=0;i<filter_count;++i) {
    180                 printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
    181                 filters[i].bytes=0;
    182                 filters[i].count=0;
    183         }
    184         packets=trace_get_received_packets(trace);
    185         if (packets!=UINT64_MAX)
    186                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    187                                 "Input packets", packets);
    188         packets=trace_get_filtered_packets(trace);
    189         if (packets!=UINT64_MAX)
    190                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    191                                 "Filtered packets", packets);
    192         packets=trace_get_dropped_packets(trace);
    193         if (packets!=UINT64_MAX)
    194                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    195                                 "Dropped packets",packets);
    196         packets=trace_get_accepted_packets(trace);
    197         if (packets!=UINT64_MAX)
    198                 fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
    199                                 "Accepted Packets",packets);
    200         printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
    201         totcount+=count;
    202         totbytes+=bytes;
    203        
    204         return 0;
     168        } else switch (mesg->code) {
     169                case MESSAGE_STOPPING:
     170                        printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
     171                        for(i=0;i<filter_count;++i) {
     172                                printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
     173                                filters[i].bytes=0;
     174                                filters[i].count=0;
     175                        }
     176                        packets=trace_get_received_packets(trace);
     177                        if (packets!=UINT64_MAX)
     178                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     179                                                "Input packets", packets);
     180                        packets=trace_get_filtered_packets(trace);
     181                        if (packets!=UINT64_MAX)
     182                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     183                                                "Filtered packets", packets);
     184                        packets=trace_get_dropped_packets(trace);
     185                        if (packets!=UINT64_MAX)
     186                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     187                                                "Dropped packets",packets);
     188                        packets=trace_get_accepted_packets(trace);
     189                        if (packets!=UINT64_MAX)
     190                                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
     191                                                "Accepted Packets",packets);
     192                        printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
     193                        totcount+=count;
     194                        totbytes+=bytes;
     195        }
    205196}
    206197
     
    212203        return 0;
    213204}
     205
     206struct user_configuration uc;
     207
    214208
    215209/* Process a trace, counting packets that match filter(s) */
     
    230224    //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
    231225        option = 2;
    232         trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
     226        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
     227        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
     228        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &uc);
     229
    233230        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
    234231
     
    242239
    243240
    244         if (trace_pstart(trace, (void *)&blob, &per_packet, NULL)==-1) {
     241        if (trace_pstart(trace, (void *)&blob, &per_packet, report_result)==-1) {
    245242                trace_perror(trace,"Failed to start trace");
    246243                return;
     
    249246        // Wait for all threads to stop
    250247        trace_join(trace);
    251         reduce(trace, NULL);
    252248
    253249        //map_pair_iterator_t * results = NULL;
     
    273269        int i;
    274270        struct sigaction sigact;
    275 
     271        ZERO_USER_CONFIG(uc);
    276272        while(1) {
    277273                int option_index;
     
    279275                        { "filter",        1, 0, 'f' },
    280276                        { "libtrace-help", 0, 0, 'H' },
     277                        { "config",             1, 0, 'u' },
     278                        { "config-file",                1, 0, 'U' },
    281279                        { NULL,            0, 0, 0   },
    282280                };
    283281
    284                 int c=getopt_long(argc, argv, "f:H",
     282                int c=getopt_long(argc, argv, "f:Hu:U:",
    285283                                long_options, &option_index);
    286284
     
    301299                                exit(1);
    302300                                break;
     301                        case 'u':
     302                                  parse_user_config(&uc, optarg);
     303                                  break;
     304                        case 'U':;
     305                                FILE * f = fopen(optarg, "r");
     306                                if (f != NULL) {
     307                                        parse_user_config_file(&uc, f);
     308                                } else {
     309                                        perror("Failed to open configuration file\n");
     310                                        usage(argv[0]);
     311                                }
     312                                break;
    303313                        default:
    304314                                fprintf(stderr,"Unknown option: %c\n",c);
Note: See TracChangeset for help on using the changeset viewer.