Changeset dfbdda7a for tools


Ignore:
Timestamp:
03/30/15 14:22:09 (5 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
8370482
Parents:
773a2a3
Message:

Cleanup tracestats_parallel

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tools/tracestats/tracestats_parallel.c

    r7c95027 rdfbdda7a  
    5959struct libtrace_t *trace = NULL;
    6060
    61 static void cleanup_signal(int signal)
     61static void cleanup_signal(int signal UNUSED)
    6262{
    63         static int s = 0;
    64         (void)signal;
    65         // trace_interrupt();
    66         // trace_pstop isn't really signal safe because its got lots of locks in it
    67     trace_pstop(trace);
    68     /*if (s == 0) {
    69                 if (trace_ppause(trace) == -1)
    70                         trace_perror(trace, "Pause failed");
    71         }
    72         else {
    73                 if (trace_pstart(trace, NULL, NULL, NULL) == -1)
    74                         trace_perror(trace, "Start failed");
    75     }*/
    76         s = !s;
     63        if (trace)
     64                trace_pstop(trace);
    7765}
    7866
     
    9987
    10088
    101 //libtrace_message_t mesg
    10289static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    10390                        int mesg, libtrace_generic_t data,
    10491                        libtrace_thread_t *sender UNUSED)
    10592{
    106         // Using first entry as total and those after for filter counts
     93        /* Using first entry as total and those after for filter counts */
    10794        static __thread statistics_t * results = NULL;
    10895        int i, wlen;
    10996        libtrace_stat_t *stats;
    110 
    111 
    112         // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
     97        libtrace_generic_t gen;
     98
    11399        switch (mesg) {
    114100        case MESSAGE_PACKET:
     101                /* Apply filters to every packet note the result */
    115102                wlen = trace_get_wire_length(data.pkt);
    116103                for(i=0;i<filter_count;++i) {
     
    124111                                trace_perror(trace, "trace_apply_filter");
    125112                                fprintf(stderr, "Removing filter from filterlist\n");
    126                                 // XXX might be a problem across threads below
     113                                /* This is a race, but will be atomic */
    127114                                filters[i].filter = NULL;
    128115                        }
     
    131118                results[0].bytes +=wlen;
    132119                return data.pkt;
    133         case MESSAGE_STOPPING:
    134                 stats = trace_create_statistics();
    135                 trace_get_thread_statistics(trace, t, stats);
    136                 trace_print_statistics(stats, stderr, NULL);
    137                 free(stats);
    138                 trace_publish_result(trace, t, 0, (libtrace_generic_t){.ptr = results}, RESULT_USER); // Only ever using a single key 0
    139                 //fprintf(stderr, "tracestats_parallel:\t Stopping thread - publishing results\n");
    140                 break;
    141120        case MESSAGE_STARTING:
     121                /* Allocate space to hold a total count and one for each filter */
    142122                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    143123                break;
    144         case MESSAGE_DO_PAUSE:
    145                 assert(!"GOT Asked to pause!!!\n");
     124        case MESSAGE_STOPPING:
     125                /* We only output one result per thread with the key 0 when the
     126                 * trace is over. */
     127                gen.ptr = results;
     128                trace_publish_result(trace, t, 0, gen, RESULT_USER);
    146129                break;
    147         case MESSAGE_PAUSING:
    148                 //fprintf(stderr, "tracestats_parallel:\t pausing thread\n");
     130        default:
    149131                break;
    150         case MESSAGE_RESUMING:
    151                 //fprintf(stderr, "tracestats_parallel:\t resuming thread\n");
    152                 break;
    153132        }
    154133        return NULL;
    155134}
    156135
    157 static void report_result(libtrace_t *trace UNUSED, int mesg,
     136static void report_result(libtrace_t *trace, int mesg,
    158137                          libtrace_generic_t data,
    159138                          libtrace_thread_t *sender UNUSED) {
     
    176155                break;
    177156        case MESSAGE_STOPPING:
     157                /* We are done, print out results */
    178158                stats = trace_get_statistics(trace, NULL);
    179159                printf("%-30s\t%12s\t%12s\t%7s\n","filter","count","bytes","%");
     
    204184}
    205185
    206 static uint64_t rand_hash(libtrace_packet_t * pkt UNUSED, void *data UNUSED) {
    207         return rand();
    208 }
    209 
    210 static uint64_t bad_hash(libtrace_packet_t * pkt UNUSED, void *data UNUSED) {
    211         return 0;
    212 }
    213 
    214186/* Process a trace, counting packets that match filter(s) */
    215187static void run_trace(char *uri, char *config, char *config_file)
     
    224196                return;
    225197        }
    226 
    227         //libtrace_filter_t *f = trace_create_filter("udp");
    228         //trace_config(trace, TRACE_OPTION_FILTER, f);
    229 
    230         //trace_config(trace, TRACE_OPTION_META_FREQ, &option);
    231         //option = 10000;
    232         trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
    233         //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
    234198
    235199        /* Apply config */
     
    249213        }
    250214
    251         trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    252 
    253         //trace_parallel_config(trace, TRACE_OPTION_SET_MAPPER_BUFFER_SIZE, &option);
    254 
    255         /* OPTIONALLY SETUP CORES HERE BUT WE DON'T CARE ABOUT THAT YET XXX */
    256 
    257         /*if (trace_start(trace)==-1) {
    258         trace_perror(trace,"Failed to start trace");
    259         return;
    260         }*/
    261         global_blob_t blob;
    262 
    263 
    264         if (trace_pstart(trace, (void *)&blob, &per_packet, report_result)==-1) {
     215        /* Start the trace as a parallel trace */
     216        if (trace_pstart(trace, NULL, &per_packet, report_result)==-1) {
    265217                trace_perror(trace,"Failed to start trace");
    266218                return;
    267219        }
    268220
    269         // Wait for all threads to stop
     221        /* Wait for all threads to stop */
    270222        trace_join(trace);
    271223
    272         //map_pair_iterator_t * results = NULL;
    273         //trace_get_results(trace, &results);
    274 
    275         //if (results != NULL) {
    276         //      reduce(trace, global_blob, results);
    277         //}
    278224        if (trace_is_err(trace))
    279225                trace_perror(trace,"%s",uri);
Note: See TracChangeset for help on using the changeset viewer.