Changeset f051c1b for tools


Ignore:
Timestamp:
08/12/14 14:14:50 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
0862c20
Parents:
f9a70ca
Message:

Tidies up the state messages received, now all states are passed through created->resumed->paused->stopped this might simplify some code. Removed the extra paused state.
Hooks up the reporter method through trace_pstart, hopefully resulting in simpler code most of the time. Also renames this from reducer to reporter anywhere it was not already. Adds a test for this also.
Removes is_packet from a result in favour of a more generic type, with packet being one of these.
Moves configuration for tuning relelated settings into a single structure.

Location:
tools
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • tools/traceanon/traceanon_parallel.c

    r9594cf9 rf051c1b  
    2727        static int s = 0;
    2828        (void)signal;
    29         // trace_interrupt();
     29    //trace_interrupt();
    3030        // trace_pstop isn't really signal safe because its got lots of locks in it
    31         // trace_pstop(trace);
    32         if (s == 0) {
     31    trace_pstop(trace);
     32    /*if (s == 0) {
    3333                if (trace_ppause(trace) == -1)
    3434                        trace_perror(trace, "Pause failed");
     
    3737                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
    3838                        trace_perror(trace, "Start failed");
    39         }
     39    }*/
    4040        s = !s;
    4141}
     
    141141
    142142
    143 static uint64_t bad_hash(libtrace_packet_t * pkt)
     143UNUSED static uint64_t bad_hash(UNUSED libtrace_packet_t * pkt)
    144144{
    145145        return 0;
     
    147147
    148148
    149 static uint64_t rand_hash(libtrace_packet_t * pkt)
     149UNUSED static uint64_t rand_hash(UNUSED libtrace_packet_t * pkt)
    150150{
    151151        return rand();
     
    153153
    154154
    155 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
    156 {
    157         int i;
     155static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t)
     156{
    158157       
    159158        if (pkt) {
     
    191190                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
    192191                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
    193                 trace_publish_packet(trace, pkt);
     192                trace_publish_result(trace, t, trace_packet_get_order(pkt), pkt, RESULT_PACKET);
    194193                //return ;
    195194        }
     
    197196                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    198197                switch (mesg->code) {
    199                         case MESSAGE_STARTED:
     198                        case MESSAGE_STARTING:
    200199                                enc_init(enc_type,key);
    201200                }
     
    204203}
    205204
     205struct libtrace_out_t *writer = 0;
     206
     207static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
     208        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
     209        if (result) {
     210                libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result);
     211                assert(libtrace_result_get_key(result) == packet_count++);
     212                if (trace_write_packet(writer,packet)==-1) {
     213                        trace_perror_output(writer,"writer");
     214                        trace_interrupt();
     215                }
     216                trace_free_result_packet(trace, packet);
     217        }
     218        return NULL;
     219}
     220
     221
    206222int main(int argc, char *argv[])
    207223{
    208224        //struct libtrace_t *trace = 0;
    209         struct libtrace_packet_t *packet/* = trace_create_packet()*/;
    210         struct libtrace_out_t *writer = 0;
    211225        struct sigaction sigact;
    212226        char *output = 0;
     
    380394        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i);
    381395        i = 2;
    382         trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
    383        
    384         if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
     396    trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
     397       
     398        if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) {
    385399                trace_perror(trace,"trace_start");
    386400                trace_destroy_output(writer);
     
    402416        sigaction(SIGINT, &sigact, NULL);
    403417        sigaction(SIGTERM, &sigact, NULL);
    404        
    405         // Read in the resulting packets and then free them when done
    406         libtrace_vector_t res;
    407         int res_size = 0;
    408         libtrace_vector_init(&res, sizeof(libtrace_result_t));
    409         uint64_t packet_count = 0;
    410         while (!trace_finished(trace)) {
    411                 // Read messages
    412                 libtrace_message_t message;
    413                
    414                 // We just release and do work currently, maybe if something
    415                 // interesting comes through we'd deal with that
    416                 libtrace_thread_get_message(trace, &message);
    417                
    418                 while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
    419                
    420                 if ((res_size = trace_get_results(trace, &res)) == 0)
    421                         ;/*sched_yield();*/
    422                
    423                 for (i = 0 ; i < res_size ; i++) {
    424                         libtrace_result_t result;
    425                         assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
    426                         packet = libtrace_result_get_value(&result);
    427                         assert(libtrace_result_get_key(&result) == packet_count);
    428                         packet_count++;
    429                         if (trace_write_packet(writer,packet)==-1) {
    430                                 trace_perror_output(writer,"writer");
    431                                 trace_interrupt();
    432                                 break;
    433                         }
    434                         //trace_destroy_packet(packet);
    435                         trace_free_result_packet(trace, packet);
    436                 }
    437         }
     418
     419        // Wait for the trace to finish
    438420        trace_join(trace);
    439        
    440         // Grab everything that's left here
    441         res_size = trace_get_results(trace, &res);
    442        
    443         for (i = 0 ; i < res_size ; i++) {
    444                 libtrace_result_t result;
    445                 assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
    446                 packet = libtrace_result_get_value(&result);
    447                 if (libtrace_result_get_key(&result) != packet_count)
    448                         printf ("Got a %"PRIu64" but expected a %"PRIu64" %d\n", libtrace_result_get_key(&result), packet_count, res_size);
    449                 assert(libtrace_result_get_key(&result) == packet_count);
    450                
    451                 packet_count++;
    452                 if (trace_write_packet(writer,packet)==-1) {
    453                         trace_perror_output(writer,"writer");
    454                         trace_interrupt();
    455                         break;
    456                 }
    457                 trace_destroy_packet(packet);
    458         }
    459         libtrace_vector_destroy(&res);
    460421       
    461422        //trace_destroy_packet(packet);
  • tools/tracertstats/tracertstats_parallel.c

    rb13b939 rf051c1b  
    209209                        // Publish and make a new one new
    210210                        //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    211                         trace_publish_result(trace, (uint64_t) last_ts, results);
    212                         trace_post_reduce(trace);
     211                        trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     212                        trace_post_reporter(trace);
    213213                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    214214                        last_ts++;
     
    234234                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    235235                switch (mesg->code) {
    236                         case MESSAGE_STARTED:
     236                        case MESSAGE_STARTING:
    237237                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    238238                                break;
    239                         case MESSAGE_STOPPED:
     239                        case MESSAGE_STOPPING:
    240240                                // Should we always post this?
    241241                                if (results->total.count) {
    242                                         trace_publish_result(trace, (uint64_t) last_ts, results);
    243                                         trace_post_reduce(trace);
     242                                        trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     243                                        trace_post_reporter(trace);
    244244                                        results = NULL;
    245245                                }
     
    260260                                        if (next_update_time <= mesg->additional.uint64) {
    261261                                                //fprintf(stderr, "Got a tick and publishing early!!\n");
    262                                                 trace_publish_result(trace, (uint64_t) last_ts, results);
    263                                                 trace_post_reduce(trace);
     262                                                trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     263                                                trace_post_reporter(trace);
    264264                                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    265265                                                last_ts++;
  • tools/tracestats/tracestats_parallel.c

    r9594cf9 rf051c1b  
    6767        // trace_interrupt();
    6868        // trace_pstop isn't really signal safe because its got lots of locks in it
    69         // trace_pstop(trace);
    70         if (s == 0) {
     69    trace_pstop(trace);
     70    /*if (s == 0) {
    7171                if (trace_ppause(trace) == -1)
    7272                        trace_perror(trace, "Pause failed");
     
    7575                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
    7676                        trace_perror(trace, "Start failed");
    77         }
     77    }*/
    7878        s = !s;
    7979}
     
    129129                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    130130                switch (mesg->code) {
    131                         case MESSAGE_STOPPED:
    132                                 trace_publish_result(trace, 0, results); // Only ever using a single key 0
     131                        case MESSAGE_STOPPING:
     132                                trace_publish_result(trace, t, 0, results, RESULT_NORMAL); // Only ever using a single key 0
    133133                                fprintf(stderr, "Thread published resuslts WOWW\n");
    134134                                break;
    135                         case MESSAGE_STARTED:
     135                        case MESSAGE_STARTING:
    136136                                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    137137                                break;
     
    142142                                fprintf(stderr, "Thread is pausing\n");
    143143                                break;
    144                         case MESSAGE_PAUSED:
     144                        case MESSAGE_RESUMING:
    145145                                fprintf(stderr, "Thread has paused\n");
    146146                                break;
     
    228228        int option = 2;
    229229        //option = 10000;
    230         trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
     230    //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
    231231        option = 2;
    232232        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
Note: See TracChangeset for help on using the changeset viewer.