Ignore:
Timestamp:
09/16/14 02:35:10 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
d994324
Parents:
50b1bee
Message:

Refactor the combining step to allow user defined functions here.

Remove the old trace_get_results, now instead simply provide a reporter function which gets called as soon as results are ready.
The combiner function used determines the order of these results and when they are released etc.
The combiner function can be selected from those built-in or a custom version can be defined results are provided when ready.
Quickly hacked the parallel tests to work with this update, these are still a bit messy.

Also some fixes some compile warnings.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tools/tracestats/tracestats_parallel.c

    rf051c1b r2498008  
    150150}
    151151
    152 static int reduce(libtrace_t* trace, void* global_blob)
    153 {
    154         int i,j;
    155         uint64_t count=0, bytes=0;
    156         libtrace_vector_t results;
    157         libtrace_vector_init(&results, sizeof(libtrace_result_t));
    158         trace_get_results(trace, &results);
     152static void report_result(libtrace_t *trace UNUSED, libtrace_result_t *result, libtrace_message_t *mesg) {
     153        static uint64_t count=0, bytes=0;
    159154        uint64_t packets;
    160        
    161         /* Get the results from each core and sum 'em up */
    162         for (i = 0 ; i < libtrace_vector_get_size(&results) ; i++) {
    163                 libtrace_result_t result;
    164                 assert(libtrace_vector_get(&results, i, (void *) &result) == 1);
    165                 assert(libtrace_result_get_key(&result) == 0);
    166                 statistics_t * res = libtrace_result_get_value(&result);
     155        int i;
     156        if (result) {
     157                int j;
     158                /* Get the results from each core and sum 'em up */
     159                assert(libtrace_result_get_key(result) == 0);
     160                statistics_t * res = libtrace_result_get_value(result);
    167161                count += res[0].count;
    168162                bytes += res[0].bytes;
     
    172166                }
    173167                free(res);
    174         }
    175         // Done with these results - Free internally and externally
    176         libtrace_vector_destroy(&results);
    177 
    178     printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
    179         for(i=0;i<filter_count;++i) {
    180                 printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
    181                 filters[i].bytes=0;
    182                 filters[i].count=0;
    183         }
    184         packets=trace_get_received_packets(trace);
    185         if (packets!=UINT64_MAX)
    186                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    187                                 "Input packets", packets);
    188         packets=trace_get_filtered_packets(trace);
    189         if (packets!=UINT64_MAX)
    190                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    191                                 "Filtered packets", packets);
    192         packets=trace_get_dropped_packets(trace);
    193         if (packets!=UINT64_MAX)
    194                 fprintf(stderr,"%30s:\t%12" PRIu64"\n",
    195                                 "Dropped packets",packets);
    196         packets=trace_get_accepted_packets(trace);
    197         if (packets!=UINT64_MAX)
    198                 fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
    199                                 "Accepted Packets",packets);
    200         printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
    201         totcount+=count;
    202         totbytes+=bytes;
    203        
    204         return 0;
     168        } else switch (mesg->code) {
     169                case MESSAGE_STOPPING:
     170                        printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
     171                        for(i=0;i<filter_count;++i) {
     172                                printf("%30s:\t%12"PRIu64"\t%12"PRIu64"\t%7.03f\n",filters[i].expr,filters[i].count,filters[i].bytes,filters[i].count*100.0/count);
     173                                filters[i].bytes=0;
     174                                filters[i].count=0;
     175                        }
     176                        packets=trace_get_received_packets(trace);
     177                        if (packets!=UINT64_MAX)
     178                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     179                                                "Input packets", packets);
     180                        packets=trace_get_filtered_packets(trace);
     181                        if (packets!=UINT64_MAX)
     182                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     183                                                "Filtered packets", packets);
     184                        packets=trace_get_dropped_packets(trace);
     185                        if (packets!=UINT64_MAX)
     186                                fprintf(stderr,"%30s:\t%12" PRIu64"\n",
     187                                                "Dropped packets",packets);
     188                        packets=trace_get_accepted_packets(trace);
     189                        if (packets!=UINT64_MAX)
     190                                fprintf(stderr,"%30s:\t%12" PRIu64 "\n",
     191                                                "Accepted Packets",packets);
     192                        printf("%30s:\t%12"PRIu64"\t%12" PRIu64 "\n","Total",count,bytes);
     193                        totcount+=count;
     194                        totbytes+=bytes;
     195        }
    205196}
    206197
     
    212203        return 0;
    213204}
     205
     206struct user_configuration uc;
     207
    214208
    215209/* Process a trace, counting packets that match filter(s) */
     
    230224    //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
    231225        option = 2;
    232         trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
     226        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
     227        trace_parallel_config(trace, TRACE_OPTION_SET_CONFIG, &uc);
     228        trace_parallel_config(trace, TRACE_OPTION_ORDERED, &uc);
     229
    233230        //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
    234231
     
    242239
    243240
    244         if (trace_pstart(trace, (void *)&blob, &per_packet, NULL)==-1) {
     241        if (trace_pstart(trace, (void *)&blob, &per_packet, report_result)==-1) {
    245242                trace_perror(trace,"Failed to start trace");
    246243                return;
     
    249246        // Wait for all threads to stop
    250247        trace_join(trace);
    251         reduce(trace, NULL);
    252248
    253249        //map_pair_iterator_t * results = NULL;
     
    273269        int i;
    274270        struct sigaction sigact;
    275 
     271        ZERO_USER_CONFIG(uc);
    276272        while(1) {
    277273                int option_index;
     
    279275                        { "filter",        1, 0, 'f' },
    280276                        { "libtrace-help", 0, 0, 'H' },
     277                        { "config",             1, 0, 'u' },
     278                        { "config-file",                1, 0, 'U' },
    281279                        { NULL,            0, 0, 0   },
    282280                };
    283281
    284                 int c=getopt_long(argc, argv, "f:H",
     282                int c=getopt_long(argc, argv, "f:Hu:U:",
    285283                                long_options, &option_index);
    286284
     
    301299                                exit(1);
    302300                                break;
     301                        case 'u':
     302                                  parse_user_config(&uc, optarg);
     303                                  break;
     304                        case 'U':;
     305                                FILE * f = fopen(optarg, "r");
     306                                if (f != NULL) {
     307                                        parse_user_config_file(&uc, f);
     308                                } else {
     309                                        perror("Failed to open configuration file\n");
     310                                        usage(argv[0]);
     311                                }
     312                                break;
    303313                        default:
    304314                                fprintf(stderr,"Unknown option: %c\n",c);
Note: See TracChangeset for help on using the changeset viewer.