Changeset f051c1b


Ignore:
Timestamp:
08/12/14 14:14:50 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
0862c20
Parents:
f9a70ca
Message:

Tidies up the state messages received, now all states are passed through created->resumed->paused->stopped this might simplify some code. Removed the extra paused state.
Hooks up the reporter method through trace_pstart, hopefully resulting in simpler code most of the time. Also renames this from reducer to reporter anywhere it was not already. Adds a test for this also.
Removes is_packet from a result in favour of a more generic type, with packet being one of these.
Moves configuration for tuning relelated settings into a single structure.

Files:
1 added
14 edited

Legend:

Unmodified
Added
Removed
  • lib/libtrace.h.in

    ra49a9eb rf051c1b  
    244244        uint64_t key;
    245245        void * value;
    246         int is_packet;
     246        int type;
    247247} libtrace_result_t;
     248#define RESULT_NORMAL 0
     249#define RESULT_PACKET 1
     250#define RESULT_TICK   2
    248251
    249252typedef struct libtrace_thread_t libtrace_thread_t;
     
    31963199
    31973200typedef void* (*fn_per_pkt)(libtrace_t* trace, libtrace_packet_t *p, libtrace_message_t *m, libtrace_thread_t *thread);
    3198 typedef void* (*fn_reducer)(libtrace_t* trace, void* global_blob);
     3201typedef void* (*fn_reporter)(libtrace_t* trace, libtrace_result_t *r, libtrace_message_t *m);
    31993202typedef uint64_t (*fn_hasher)(const libtrace_packet_t* packet, void *data);
    32003203
    3201 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer);
     3204DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter);
    32023205DLLEXPORT int trace_ppause(libtrace_t *libtrace);
    32033206DLLEXPORT int trace_pstop(libtrace_t *libtrace);
     
    32193222
    32203223
    3221 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value);
    3222 DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet);
     3224DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type);
    32233225typedef struct libtrace_vector libtrace_vector_t;
    32243226DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results);
    32253227
    3226 DLLEXPORT int trace_post_reduce(libtrace_t *libtrace);
     3228DLLEXPORT int trace_post_reporter(libtrace_t *libtrace);
    32273229DLLEXPORT int libtrace_thread_get_message_count(libtrace_t * libtrace);
    32283230DLLEXPORT int libtrace_thread_get_message(libtrace_t * libtrace, libtrace_message_t * message);
    32293231DLLEXPORT int libtrace_thread_try_get_message(libtrace_t * libtrace, libtrace_message_t * message);
    3230 DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message);
     3232DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message);
    32313233DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message);
    32323234DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message);
     
    32463248         */
    32473249        TRACE_OPTION_SET_HASHER,
    3248        
    3249         /**
    3250          * See diagrams, this sets the maximum size of freelist used to
    3251          * maintain packets and there memory buffers.
    3252          * NOTE setting this to less than recommend could cause deadlock a
    3253          * trace that manages its own packets.
    3254          * A unblockable error message will be printed.
    3255          */
    3256         TRACE_OPTION_SET_PACKET_FREELIST_SIZE,
    3257        
    3258         /**
    3259          * See diagrams, this sets the maximum size of buffers used between
    3260          * the single hasher thread and the buffer.
    3261          * NOTE setting this to less than recommend could cause deadlock a
    3262          * trace that manages its own packets.
    3263          * A unblockable warning message will be printed to stderr in this case.
    3264          */
    3265         TRACE_OPTION_SET_PERPKT_BUFFER_SIZE,
    32663250       
    32673251        /**
     
    32823266          * Libtrace ordered results, results in each queue are ordered by key
    32833267          * however my not be sequential, a typically case is packet timestamps
    3284           * the reducer will receive packets in order - note threasholds
     3268          * the reporter will receive packets in order - note threasholds
    32853269          * will be used such that a empty queue wont break things
    32863270          */
     
    33083292
    33093293enum libtrace_messages {
    3310         MESSAGE_STARTED,
     3294        MESSAGE_STARTING,
     3295        MESSAGE_RESUMING,
     3296        MESSAGE_STOPPING,
     3297        MESSAGE_PAUSING,
    33113298        MESSAGE_DO_PAUSE,
    3312         MESSAGE_PAUSING,
    3313         MESSAGE_PAUSED,
    33143299        MESSAGE_DO_STOP,
    3315         MESSAGE_STOPPED,
    33163300        MESSAGE_FIRST_PACKET,
    33173301        MESSAGE_PERPKT_ENDED,
     
    33193303        MESSAGE_PERPKT_PAUSED,
    33203304        MESSAGE_PERPKT_EOF,
    3321         MESSAGE_POST_REDUCE,
     3305        MESSAGE_POST_REPORTER,
    33223306        MESSAGE_POST_RANGE,
    33233307        MESSAGE_TICK,
     
    33863370DLLEXPORT libtrace_info_t *trace_get_information(libtrace_t * libtrace);
    33873371
     3372/**
     3373 * Tuning the parallel sizes
     3374 */
     3375struct user_configuration {
     3376        // Packet memory cache settings (ocache_init) total
     3377        /**
     3378         * See diagrams, this sets the maximum size of freelist used to
     3379         * maintain packets and their memory buffers.
     3380         * NOTE setting this to less than recommend could cause deadlock a
     3381         * trace that manages its own packets.
     3382         * A unblockable error message will be printed.
     3383         */
     3384        size_t packet_global_cache_size;
     3385        // Per thread
     3386        size_t packet_thread_cache_size;
     3387        // Packet count limited
     3388        bool fixed_packet_count;
     3389        // Bursts/Batches of packets this size are combined, used in single thread mode
     3390        size_t burst_size;
     3391        // Each perpkt thread has a queue leading into the reporter
     3392        //size_t reporter_queue_size;
     3393        /** The tick interval - in milliseconds (0) */
     3394        size_t tick_interval;
     3395        // The tick interval for file based traces, in number of packets TODO implement this
     3396        size_t tick_count;
     3397
     3398        // The number of per packet threads requested
     3399        size_t perpkt_threads;
     3400
     3401        /**
     3402         * See diagrams, this sets the maximum size of buffers used between
     3403         * the single hasher thread and the buffer.
     3404         * NOTE setting this to less than recommend could cause deadlock a
     3405         * trace that manages its own packets.
     3406         * A unblockable warning message will be printed to stderr in this case.
     3407         */
     3408        /** The number of packets that can queue per thread from hasher thread */
     3409        size_t hasher_queue_size;
     3410
     3411        // Reporter threashold before results are sent
     3412        size_t reporter_thold;
     3413};
     3414
     3415#define ZERO_USER_CONFIG(config) memset(&config, 0, sizeof(struct user_configuration));
     3416
    33883417#ifdef __cplusplus
    33893418} /* extern "C" */
  • lib/libtrace_int.h

    rbe3f75b rf051c1b  
    177177        THREAD_HASHER,
    178178        THREAD_PERPKT,
    179         THREAD_REDUCER,
     179        THREAD_REPORTER,
    180180        THREAD_KEEPALIVE
    181181};
     
    307307        /** For the sliding window hasher implementation */
    308308        pthread_rwlock_t window_lock;
    309         /** Set once trace_join has been called */
    310         bool joined;
    311309        /** Set to indicate a perpkt's queue is full as such the writing perpkt cannot proceed */
    312310        bool perpkt_queue_full;
    313311        /** Global storage for this trace, shared among all the threads  */
    314312        void* global_blob;
    315         /** Requested size of the pkt buffer (currently only used if using dedicated hasher thread) */
    316         int packet_freelist_size;
    317313        /** The actual freelist */
    318314        libtrace_ocache_t packet_freelist;
    319         /** The number of packets that can queue per thread - XXX consider deadlocks with non malloc()'d packets that need to be released */
    320         int perpkt_buffer_size;
    321         /** The reducer flags */
    322         int reducer_flags;
    323         /** The tick interval - in milliseconds (0 or -ve==disabled) */
    324         int tick_interval;
     315        /** The reporter flags */
     316        int reporter_flags;
    325317        /** Used to track the next expected key */
    326318        uint64_t expected_key;
    327319        /** User defined per_pkt function called when a pkt is ready */
    328320        fn_per_pkt per_pkt;
    329         /** User defined reducer function entry point XXX not hooked up */
    330         fn_reducer reducer;
     321        /** User defined reporter function entry point XXX not hooked up */
     322        fn_reporter reporter;
    331323        /** The hasher function */
    332324        enum hasher_types hasher_type;
     
    336328       
    337329        libtrace_thread_t hasher_thread;
    338         libtrace_thread_t reducer_thread;
     330        libtrace_thread_t reporter_thread;
    339331        libtrace_thread_t keepalive_thread;
    340332        int perpkt_thread_count;
     
    351343         */
    352344        uint64_t dropped_packets;
    353         uint64_t received_packets;
     345        uint64_t received_packets;
     346        struct user_configuration config;
    354347};
    355348
  • lib/trace.c

    rf9a70ca rf051c1b  
    265265        libtrace->state = STATE_NEW;
    266266        libtrace->perpkt_queue_full = false;
    267         libtrace->reducer_flags = 0;
     267        libtrace->reporter_flags = 0;
    268268        libtrace->global_blob = NULL;
    269269        libtrace->per_pkt = NULL;
    270         libtrace->reducer = NULL;
     270        libtrace->reporter = NULL;
    271271        libtrace->hasher = NULL;
    272         libtrace->packet_freelist_size = 0;
    273         libtrace->perpkt_buffer_size = 0;
    274272        libtrace->expected_key = 0;
    275273        libtrace_zero_ocache(&libtrace->packet_freelist);
    276274        libtrace_zero_thread(&libtrace->hasher_thread);
    277         libtrace_zero_thread(&libtrace->reducer_thread);
     275        libtrace_zero_thread(&libtrace->reporter_thread);
    278276        libtrace_zero_thread(&libtrace->keepalive_thread);
    279277        libtrace_zero_slidingwindow(&libtrace->sliding_window);
    280         libtrace->reducer_thread.type = THREAD_EMPTY;
     278        libtrace->reporter_thread.type = THREAD_EMPTY;
    281279        libtrace->perpkt_thread_count = 0;
    282280        libtrace->perpkt_threads = NULL;
    283281        libtrace->tracetime = 0;
    284         libtrace->tick_interval = 0;
    285282        libtrace->first_packets.first = 0;
    286283        libtrace->first_packets.count = 0;
    287284        libtrace->first_packets.packets = NULL;
    288285        libtrace->dropped_packets = UINT64_MAX;
     286        ZERO_USER_CONFIG(libtrace->config);
    289287
    290288        /* Parse the URI to determine what sort of trace we are dealing with */
     
    388386        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
    389387        libtrace->perpkt_queue_full = false;
    390         libtrace->reducer_flags = 0;
     388        libtrace->reporter_flags = 0;
    391389        libtrace->global_blob = NULL;
    392390        libtrace->per_pkt = NULL;
    393         libtrace->reducer = NULL;
     391        libtrace->reporter = NULL;
    394392        libtrace->hasher = NULL;
    395393        libtrace->expected_key = 0;
    396         libtrace->packet_freelist_size = 0;
    397         libtrace->perpkt_buffer_size = 0;
    398394        libtrace_zero_ocache(&libtrace->packet_freelist);
    399395        libtrace_zero_thread(&libtrace->hasher_thread);
    400         libtrace_zero_thread(&libtrace->reducer_thread);
     396        libtrace_zero_thread(&libtrace->reporter_thread);
    401397        libtrace_zero_thread(&libtrace->keepalive_thread);
    402398        libtrace_zero_slidingwindow(&libtrace->sliding_window);
    403         libtrace->reducer_thread.type = THREAD_EMPTY;
     399        libtrace->reporter_thread.type = THREAD_EMPTY;
    404400        libtrace->perpkt_thread_count = 0;
    405401        libtrace->perpkt_threads = NULL;
    406402        libtrace->tracetime = 0;
    407         libtrace->tick_interval = 0;
     403        ZERO_USER_CONFIG(libtrace->config);
    408404       
    409405        for(tmp=formats_list;tmp;tmp=tmp->next) {
  • lib/trace_parallel.c

    rbe3f75b rf051c1b  
    101101
    102102
    103 #define VERBOSE_DEBBUGING 0
     103#define VERBOSE_DEBBUGING 1
    104104
    105105
     
    181181
    182182/**
     183 * True if the trace has dedicated hasher thread otherwise false,
     184 * to be used after the trace is running
     185 */
     186static inline int trace_has_dedicated_reporter(libtrace_t * libtrace)
     187{
     188        assert(libtrace->state != STATE_NEW);
     189        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
     190}
     191
     192/**
    183193 * Changes a thread's state and broadcasts the condition variable. This
    184194 * should always be done when the lock is held.
     
    206216
    207217#if VERBOSE_DEBBUGING
    208         fprintf(stderr, "Thread %d State changed from %d to %d\n", t->tid,
    209                 t->state, prev_state);
     218        fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
     219                prev_state, t->state);
    210220#endif
    211221        if (need_lock)
     
    232242#if VERBOSE_DEBBUGING
    233243        fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
    234                 trace->uridata, get_trace_state_name(trace->state),
    235                 get_trace_state_name(prev_state));
     244                trace->uridata, get_trace_state_name(prev_state),
     245                get_trace_state_name(trace->state));
    236246#endif
    237247        if (need_lock)
     
    309319        if (!(ret = get_thread_table(libtrace))) {
    310320                pthread_t tid = pthread_self();
    311                 // Check if we are reducer or something else
    312                 if (pthread_equal(tid, libtrace->reducer_thread.tid))
    313                         ret = &libtrace->reducer_thread;
     321                // Check if we are reporter or something else
     322                if (pthread_equal(tid, libtrace->reporter_thread.tid))
     323                        ret = &libtrace->reporter_thread;
    314324                else if (pthread_equal(tid, libtrace->hasher_thread.tid))
    315325                        ret = &libtrace->hasher_thread;
     
    324334{
    325335        libtrace_result_t *res = (libtrace_result_t *)data;
    326         if (res->is_packet) {
     336        if (res->type == RESULT_PACKET) {
    327337                // Duplicate the packet in standard malloc'd memory and free the
    328                 // original
     338                // original, This is a 1:1 exchange so is ocache count remains unchanged.
    329339                libtrace_packet_t *oldpkt, *dup;
    330340                oldpkt = (libtrace_packet_t *) res->value;
     
    342352static void trace_make_results_packets_safe(libtrace_t *trace) {
    343353        libtrace_thread_t *t = get_thread_descriptor(trace);
    344         if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
     354        if (trace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED))
    345355                libtrace_deque_apply_function(&t->deque, &do_copy_result_packet);
    346356        else
     
    353363 */
    354364static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
    355         trace_make_results_packets_safe(trace);
     365        if (t->type == THREAD_PERPKT)
     366                trace_make_results_packets_safe(trace);
    356367        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    357368        thread_change_state(trace, t, THREAD_PAUSED, false);
     
    363374}
    364375
    365 #define PACKETQUEUES 10
    366376
    367377/**
     
    372382        libtrace_thread_t * t;
    373383        libtrace_message_t message = {0};
    374         libtrace_packet_t *packets[PACKETQUEUES] = {NULL};
     384        libtrace_packet_t *packets[trace->config.burst_size];
    375385        size_t nb_packets;
    376386        size_t i;
    377387
     388        memset(&packets, 0, sizeof(void*) * trace->config.burst_size);
    378389        // Force this thread to wait until trace_pstart has been completed
    379390        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
     
    389400        // Send a message to say we've started
    390401
    391         message.code = MESSAGE_STARTED;
     402        // Let the per_packet function know we have started
     403        message.code = MESSAGE_STARTING;
    392404        message.sender = t;
    393 
    394         // Let the per_packet function know we have started
     405        (*trace->per_pkt)(trace, NULL, &message, t);
     406        message.code = MESSAGE_RESUMING;
    395407        (*trace->per_pkt)(trace, NULL, &message, t);
    396408
     
    403415                                        // Send message to say we are pausing, TODO consider sender
    404416                                        message.code = MESSAGE_PAUSING;
     417                                        message.sender = t;
    405418                                        (*trace->per_pkt)(trace, NULL, &message, t);
    406419                                        // If a hasher thread is running empty input queues so we don't loose data
     
    409422                                                // The hasher has stopped by this point, so the queue shouldn't be filling
    410423                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    411                                                         nb_packets = trace_pread_packet(trace, t, packets, 1);
    412                                                         if (nb_packets == 1) {
    413                                                                 if (packets[0]->error > 0)
    414                                                                         packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t);
    415                                                         } else {
    416                                                                 fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", packets[0]->error, libtrace_ringbuffer_is_empty(&t->rbuffer));
     424                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
     425                                                        if (packets[0]->error > 0)
     426                                                                packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t);
     427                                                        else if (packets[0]->error != -2) {
     428                                                                // EOF or error, either way we'll stop
     429                                                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
     430                                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
     431                                                                        // No packets after this should have any data in them
     432                                                                        assert(packets[0]->error <= 0);
     433                                                                }
     434                                                                goto stop;
    417435                                                        }
    418436                                                }
    419437                                        }
    420                                         // Send a paused message as a final chance to memory copy any packets
    421                                         message.code = MESSAGE_PAUSED;
    422                                         (*trace->per_pkt)(trace, NULL, &message, t);
    423438                                        // Now we do the actual pause, this returns when we are done
    424439                                        trace_thread_pause(trace, t);
     440                                        message.code = MESSAGE_RESUMING;
     441                                        (*trace->per_pkt)(trace, NULL, &message, t);
    425442                                        // Check for new messages as soon as we return
    426443                                        continue;
     
    440457                        nb_packets = 1;
    441458                } else {
    442                         nb_packets = trace_pread_packet(trace, t, packets, PACKETQUEUES);
     459                        nb_packets = trace_pread_packet(trace, t, packets, trace->config.burst_size);
    443460                }
    444461                // Loop through the packets we just read
     
    450467                                // An error this should be the last packet we read
    451468                                size_t z;
    452                                 for (z = i ; z < nb_packets; ++z)
    453                                         fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[i]->error);
    454                                 assert (i == nb_packets-1);
     469                                // We could have an eof or error and a message such as pause
     470                                for (z = i ; z < nb_packets; ++z) {
     471                                        fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
     472                                        assert (packets[z]->error < 1);
     473                                }
    455474                                goto stop;
    456475                        }
     
    463482stop:
    464483        /* ~~~~~~~~~~~~~~ Trace is finished do tear down ~~~~~~~~~~~~~~~~~~~~~ */
     484
    465485        // Let the per_packet function know we have stopped
    466         message.code = MESSAGE_STOPPED;
    467         message.sender = NULL;
     486        message.code = MESSAGE_PAUSING;
     487        message.sender = t;
     488        (*trace->per_pkt)(trace, NULL, &message, t);
     489        message.code = MESSAGE_STOPPING;
    468490        message.additional.uint64 = 0;
    469491        (*trace->per_pkt)(trace, NULL, &message, t);
    470492
    471493        // Free any remaining packets
    472         for (i = 0; i < PACKETQUEUES; i++) {
     494        for (i = 0; i < trace->config.burst_size; i++) {
    473495                if (packets[i]) {
    474496                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
     
    483505        message.code = MESSAGE_PERPKT_ENDED;
    484506        message.additional.uint64 = 0;
    485         trace_send_message_to_reducer(trace, &message);
     507        trace_send_message_to_reporter(trace, &message);
    486508
    487509        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
     
    566588                        pkt_skipped = 0;
    567589                } else {
     590                        assert(!"Dropping a packet!!");
    568591                        pkt_skipped = 1; // Reuse that packet no one read it
    569592                }
     
    573596        for (i = 0; i < trace->perpkt_thread_count; i++) {
    574597                libtrace_packet_t * bcast;
    575                 printf("Broadcasting error/EOF now the trace is over\n");
     598                fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");
    576599                if (i == trace->perpkt_thread_count - 1) {
    577600                        bcast = packet;
     
    582605                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    583606                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
    584                         ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    585607                        // Unlock early otherwise we could deadlock
    586608                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
     609                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    587610                } else {
     611                        fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);
    588612                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    589613                }
     
    596620        message.code = MESSAGE_PERPKT_ENDED;
    597621        message.additional.uint64 = 0;
    598         trace_send_message_to_reducer(trace, &message);
     622        trace_send_message_to_reporter(trace, &message);
    599623        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    600624        if (trace->format->punregister_thread) {
     
    10701094                libtrace_message_t mesg = {0};
    10711095                mesg.code = MESSAGE_FIRST_PACKET;
    1072                 trace_send_message_to_reducer(libtrace, &mesg);
     1096                trace_send_message_to_reporter(libtrace, &mesg);
    10731097                t->recorded_first = true;
    10741098        }
     
    11241148
    11251149/** Similar to delay_tracetime but send messages to all threads periodically */
     1150static void* reporter_entry(void *data) {
     1151        libtrace_message_t message = {0};
     1152        libtrace_t *trace = (libtrace_t *)data;
     1153        libtrace_thread_t *t = &trace->reporter_thread;
     1154        size_t res_size;
     1155        libtrace_vector_t results;
     1156        libtrace_vector_init(&results, sizeof(libtrace_result_t));
     1157        fprintf(stderr, "Reporter thread starting\n");
     1158        libtrace_result_t result;
     1159        size_t i;
     1160
     1161        message.code = MESSAGE_STARTING;
     1162        message.sender = t;
     1163        (*trace->reporter)(trace, NULL, &message);
     1164        message.code = MESSAGE_RESUMING;
     1165        (*trace->reporter)(trace, NULL, &message);
     1166
     1167        while (!trace_finished(trace)) {
     1168
     1169                //while ( != LIBTRACE_MQ_FAILED) { }
     1170                libtrace_message_queue_get(&t->messages, &message);
     1171
     1172                switch (message.code) {
     1173                        // Check for results
     1174                        case MESSAGE_POST_REPORTER:
     1175                                res_size = trace_get_results(trace, &results);
     1176                                for (i = 0; i < res_size; i++) {
     1177                                        ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
     1178                                        (*trace->reporter)(trace, &result, NULL);
     1179                                }
     1180                                break;
     1181                        case MESSAGE_DO_PAUSE:
     1182                                message.code = MESSAGE_PAUSING;
     1183                                message.sender = t;
     1184                                (*trace->reporter)(trace, NULL, &message);
     1185                                trace_thread_pause(trace, t);
     1186                                message.code = MESSAGE_RESUMING;
     1187                                (*trace->reporter)(trace, NULL, &message);
     1188                                break;
     1189                        default:
     1190                                (*trace->reporter)(trace, NULL, &message);
     1191                }
     1192        }
     1193
     1194        // Flush out whats left now all our threads have finished
     1195        res_size = trace_get_results(trace, &results);
     1196        for (i = 0; i < res_size; i++) {
     1197                ASSERT_RET(libtrace_vector_get(&results, i, (void *) &result), == 1);
     1198                (*trace->reporter)(trace, &result, NULL);
     1199        }
     1200
     1201        // GOODBYE
     1202        message.code = MESSAGE_PAUSING;
     1203        message.sender = t;
     1204        (*trace->reporter)(trace, NULL, &message);
     1205        message.code = MESSAGE_STOPPING;
     1206        (*trace->reporter)(trace, NULL, &message);
     1207
     1208        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
     1209        print_memory_stats();
     1210        return NULL;
     1211}
     1212
     1213/** Similar to delay_tracetime but send messages to all threads periodically */
    11261214static void* keepalive_entry(void *data) {
    11271215        struct timeval prev, next;
     
    11351223        while (trace->state != STATE_FINSHED) {
    11361224                fd_set rfds;
    1137                 next_release = tv_to_usec(&prev) + (trace->tick_interval * 1000);
     1225                next_release = tv_to_usec(&prev) + (trace->config.tick_interval * 1000);
    11381226                gettimeofday(&next, NULL);
    11391227                if (next_release > tv_to_usec(&next)) {
     
    13291417 * @returns 0 on success
    13301418 */
    1331 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer)
     1419DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reporter reporter)
    13321420{
    13331421        int i;
     
    13531441                if (per_pkt)
    13541442                        libtrace->per_pkt = per_pkt;
     1443
     1444                if (reporter)
     1445                        libtrace->reporter = reporter;
    13551446
    13561447                assert(libtrace_parallel);
     
    13821473        libtrace->global_blob = global_blob;
    13831474        libtrace->per_pkt = per_pkt;
    1384         libtrace->reducer = reducer;
     1475        libtrace->reporter = reporter;
    13851476
    13861477        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     
    13911482
    13921483        // Set default buffer sizes
    1393         if (libtrace->perpkt_buffer_size <= 0)
    1394                 libtrace->perpkt_buffer_size = 1000;
     1484        if (libtrace->config.hasher_queue_size <= 0)
     1485                libtrace->config.hasher_queue_size = 1000;
    13951486
    13961487        if (libtrace->perpkt_thread_count <= 0) {
     
    14021493        }
    14031494
    1404         if(libtrace->packet_freelist_size <= 0)
    1405                 libtrace->packet_freelist_size = (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count;
    1406 
    1407         if(libtrace->packet_freelist_size <
    1408                 (libtrace->perpkt_buffer_size + 1) * libtrace->perpkt_thread_count)
     1495        if (libtrace->config.reporter_thold <= 0)
     1496                libtrace->config.reporter_thold = 100;
     1497        if (libtrace->config.burst_size <= 0)
     1498                libtrace->config.burst_size = 10;
     1499        if (libtrace->config.packet_thread_cache_size <= 0)
     1500                libtrace->config.packet_thread_cache_size = 20;
     1501        if (libtrace->config.packet_global_cache_size <= 0)
     1502                libtrace->config.packet_global_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
     1503
     1504        if (libtrace->config.packet_global_cache_size <
     1505                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
    14091506                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
    14101507
     
    14321529                libtrace->hasher_thread.type = THREAD_EMPTY;
    14331530        }
    1434         //libtrace_ocache_init(&libtrace->packet_freelist, trace_create_packet, trace_destroy_packet, 64, libtrace->packet_freelist_size * 4, true);
     1531
    14351532        libtrace_ocache_init(&libtrace->packet_freelist,
    14361533                                                 (void* (*)()) trace_create_packet,
    14371534                                                 (void (*)(void *))trace_destroy_packet,
    1438                                                  64,
    1439                                                  libtrace->packet_freelist_size * 4,
    1440                                                  true);
     1535                                                 libtrace->config.packet_thread_cache_size,
     1536                                                 libtrace->config.packet_global_cache_size * 4,
     1537                                                 libtrace->config.fixed_packet_count);
     1538        // Unused slidingwindow code
    14411539        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
    1442         ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0);
     1540        //ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0);
     1541
    14431542        // This will be applied to every new thread that starts, i.e. they will block all signals
    14441543        // Lets start a fixed number of reading threads
    1445 
    1446         // For now we never have a dedicated thread for the reducer
    1447         // i.e. This main thread is used as the reducer
    1448         libtrace->reducer_thread.tid = pthread_self();
    1449         libtrace->reducer_thread.type = THREAD_REDUCER;
    1450         libtrace->reducer_thread.state = THREAD_RUNNING;
    1451         libtrace_message_queue_init(&libtrace->reducer_thread.messages, sizeof(libtrace_message_t));
    14521544
    14531545        /* Ready some storages */
     
    14701562                t->perpkt_num = i;
    14711563                if (libtrace->hasher)
    1472                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
     1564                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, LIBTRACE_RINGBUFFER_POLLING);
    14731565                // Depending on the mode vector or deque might be chosen
    14741566                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
     
    14921584                threads_started = trace_start_perpkt_threads(libtrace);
    14931585
    1494         if (libtrace->tick_interval > 0) {
     1586        libtrace->reporter_thread.type = THREAD_REPORTER;
     1587        libtrace->reporter_thread.state = THREAD_RUNNING;
     1588        libtrace_message_queue_init(&libtrace->reporter_thread.messages, sizeof(libtrace_message_t));
     1589        if (reporter) {
     1590                // Got a real reporter
     1591                ASSERT_RET(pthread_create(&libtrace->reporter_thread.tid, NULL, reporter_entry, (void *) libtrace), == 0);
     1592        } else {
     1593                // Main thread is reporter
     1594                libtrace->reporter_thread.tid = pthread_self();
     1595        }
     1596
     1597        if (libtrace->config.tick_interval > 0) {
    14951598                libtrace->keepalive_thread.type = THREAD_KEEPALIVE;
    14961599                libtrace->keepalive_thread.state = THREAD_RUNNING;
     
    15801683                        fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n");
    15811684                }
     1685        }
     1686
     1687        // Deal with the reporter
     1688        if (trace_has_dedicated_reporter(libtrace)) {
     1689                fprintf(stderr, "Reporter thread running we deal with this special!\n");
     1690                libtrace_message_t message = {0};
     1691                message.code = MESSAGE_DO_PAUSE;
     1692                trace_send_message_to_thread(libtrace, &libtrace->reporter_thread, &message);
     1693                // Wait for it to pause
     1694                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     1695                while (libtrace->reporter_thread.state == THREAD_RUNNING) {
     1696                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
     1697                }
     1698                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    15821699        }
    15831700
     
    17601877        // buffers so clean them up
    17611878        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    1762                 // Its possible 1 packet got added by the reducer (or 1 per any other thread) since we cleaned up
     1879                // Its possible 1 packet got added by the reporter (or 1 per any other thread) since we cleaned up
    17631880                // if they lost timeslice before-during a write
    17641881                libtrace_packet_t * packet;
     
    17731890        // TODO consider perpkt threads marking trace as finished before join is called
    17741891        libtrace_change_state(libtrace, STATE_FINSHED, true);
     1892
     1893        if (trace_has_dedicated_reporter(libtrace)) {
     1894                fprintf(stderr, "Waiting to join with the reporter\n");
     1895                pthread_join(libtrace->reporter_thread.tid, NULL);
     1896                fprintf(stderr, "Joined with the reporter\n");
     1897                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
     1898        }
    17751899       
    17761900        // Wait for the tick (keepalive) thread if it has been started
     
    18121936 * Return backlog indicator
    18131937 */
    1814 DLLEXPORT int trace_post_reduce(libtrace_t *libtrace)
     1938DLLEXPORT int trace_post_reporter(libtrace_t *libtrace)
    18151939{
    18161940        libtrace_message_t message = {0};
    1817         message.code = MESSAGE_POST_REDUCE;
     1941        message.code = MESSAGE_POST_REPORTER;
    18181942        message.sender = get_thread_descriptor(libtrace);
    1819         return libtrace_message_queue_put(&libtrace->reducer_thread.messages, (void *) &message);
     1943        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, (void *) &message);
    18201944}
    18211945
     
    18231947 * Return backlog indicator
    18241948 */
    1825 DLLEXPORT int trace_send_message_to_reducer(libtrace_t * libtrace, libtrace_message_t * message)
    1826 {
    1827         //printf("Sending message code=%d to reducer\n", message->code);
     1949DLLEXPORT int trace_send_message_to_reporter(libtrace_t * libtrace, libtrace_message_t * message)
     1950{
     1951        //printf("Sending message code=%d to reporter\n", message->code);
    18281952        message->sender = get_thread_descriptor(libtrace);
    1829         return libtrace_message_queue_put(&libtrace->reducer_thread.messages, message);
     1953        return libtrace_message_queue_put(&libtrace->reporter_thread.messages, message);
    18301954}
    18311955
     
    18351959DLLEXPORT int trace_send_message_to_thread(libtrace_t * libtrace, libtrace_thread_t *t, libtrace_message_t * message)
    18361960{
    1837         //printf("Sending message code=%d to reducer\n", message->code);
     1961        //printf("Sending message code=%d to reporter\n", message->code);
    18381962        message->sender = get_thread_descriptor(libtrace);
    18391963        return libtrace_message_queue_put(&t->messages, message);
     
    18471971                libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message);
    18481972        }
    1849         //printf("Sending message code=%d to reducer\n", message->code);
     1973        //printf("Sending message code=%d to reporter\n", message->code);
    18501974        return 0;
    18511975}
     
    19092033/**
    19102034 * Publish to the reduce queue, return
    1911  */
    1912 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) {
     2035 * Should only be called by a perpkt thread, i.e. from a perpkt handler
     2036 */
     2037DLLEXPORT void trace_publish_result(libtrace_t *libtrace, libtrace_thread_t *t, uint64_t key, void * value, int type) {
    19132038        libtrace_result_t res;
    1914         res.is_packet = 0;
    1915         // Who am I???
    1916         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1917         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    1918         // Now put it into my table
    19192039        UNUSED static __thread int count = 0;
    1920 
     2040        res.type = type;
    19212041
    19222042        libtrace_result_set_key_value(&res, key, value);
     
    19302050                printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
    19312051        count = (count+1)%1000;*/
    1932         if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    1933                 if (libtrace_deque_get_size(&t->deque) >= 800) {
    1934                         trace_post_reduce(libtrace);
     2052        if (libtrace->reporter_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
     2053                if (libtrace_deque_get_size(&t->deque) >= libtrace->config.reporter_thold) {
     2054                        trace_post_reporter(libtrace);
    19352055                }
    19362056                //while (libtrace_deque_get_size(&t->deque) >= 1000)
     
    19412061                //      sched_yield();
    19422062
    1943                 if (libtrace_vector_get_size(&t->vector) >= 800) {
    1944                         trace_post_reduce(libtrace);
     2063                if (libtrace_vector_get_size(&t->vector) >= libtrace->config.reporter_thold) {
     2064                        trace_post_reporter(libtrace);
    19452065                }
    19462066                libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    19472067        }
    19482068}
    1949 
    1950 DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1951         libtrace_result_t res;
    1952         // Who am I???
    1953         int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ?
    1954         libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread];
    1955         // Now put it into my table
    1956         UNUSED static __thread int count = 0;
    1957 
    1958         res.is_packet = 1;
    1959         libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet);
    1960         /*
    1961         if (count == 1)
    1962                 printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector));
    1963         count = (count+1) %1000;
    1964         libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    1965         */
    1966         /*if (count == 1)
    1967                 printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque));
    1968         count = (count+1)%1000;*/
    1969         if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) {
    1970                 if (libtrace_deque_get_size(&t->deque) >= 800) {
    1971                         trace_post_reduce(libtrace);
    1972                 }
    1973                 //while (libtrace_deque_get_size(&t->deque) >= 1000)
    1974                 //      sched_yield();
    1975                 libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :)
    1976         } else {
    1977                 //while (libtrace_vector_get_size(&t->vector) >= 1000)
    1978                 //      sched_yield();
    1979 
    1980                 if (libtrace_vector_get_size(&t->vector) >= 800) {
    1981                         trace_post_reduce(libtrace);
    1982                 }
    1983                 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :)
    1984         }
    1985 }
    1986 
    19872069
    19882070static int compareres(const void* p1, const void* p2)
     
    19982080DLLEXPORT int trace_get_results(libtrace_t *libtrace, libtrace_vector_t * results) {
    19992081        int i;
    2000         int flags = libtrace->reducer_flags; // Hint these aren't a changing
     2082        int flags = libtrace->reporter_flags; // Hint these aren't a changing
    20012083
    20022084        libtrace_vector_empty(results);
     
    21052187DLLEXPORT int trace_finished(libtrace_t * libtrace) {
    21062188        // TODO I don't like using this so much, we could use state!!!
    2107         return !(libtrace->perpkt_thread_states[THREAD_RUNNING] || libtrace->perpkt_thread_states[THREAD_FINISHING]);
     2189        return libtrace->perpkt_thread_states[THREAD_FINISHED] == libtrace->perpkt_thread_count;
    21082190}
    21092191
     
    21132195        switch (option) {
    21142196                case TRACE_OPTION_TICK_INTERVAL:
    2115                         libtrace->tick_interval = *((int *) value);
     2197                        libtrace->config.tick_interval = *((int *) value);
    21162198                        return 1;
    21172199                case TRACE_OPTION_SET_HASHER:
    21182200                        return trace_set_hasher(libtrace, (enum hasher_types) *((int *) value), NULL, NULL);
    2119                 case TRACE_OPTION_SET_PERPKT_BUFFER_SIZE:
    2120                         libtrace->perpkt_buffer_size = *((int *) value);
    2121                         return 1;
    2122                 case TRACE_OPTION_SET_PACKET_FREELIST_SIZE:
    2123                         libtrace->packet_freelist_size = *((int *) value);
    2124                         return 1;
    21252201                case TRACE_OPTION_SET_PERPKT_THREAD_COUNT:
    21262202                        libtrace->perpkt_thread_count = *((int *) value);
     
    21282204                case TRACE_DROP_OUT_OF_ORDER:
    21292205                        if (*((int *) value))
    2130                                 libtrace->reducer_flags |= REDUCE_DROP_OOO;
     2206                                libtrace->reporter_flags |= REDUCE_DROP_OOO;
    21312207                        else
    2132                                 libtrace->reducer_flags &= ~REDUCE_DROP_OOO;
     2208                                libtrace->reporter_flags &= ~REDUCE_DROP_OOO;
    21332209                        return 1;
    21342210                case TRACE_OPTION_SEQUENTIAL:
    21352211                        if (*((int *) value))
    2136                                 libtrace->reducer_flags |= REDUCE_SEQUENTIAL;
     2212                                libtrace->reporter_flags |= REDUCE_SEQUENTIAL;
    21372213                        else
    2138                                 libtrace->reducer_flags &= ~REDUCE_SEQUENTIAL;
     2214                                libtrace->reporter_flags &= ~REDUCE_SEQUENTIAL;
    21392215                        return 1;
    21402216                case TRACE_OPTION_ORDERED:
    21412217                        if (*((int *) value))
    2142                                 libtrace->reducer_flags |= REDUCE_ORDERED;
     2218                                libtrace->reporter_flags |= REDUCE_ORDERED;
    21432219                        else
    2144                                 libtrace->reducer_flags &= ~REDUCE_ORDERED;
     2220                                libtrace->reporter_flags &= ~REDUCE_ORDERED;
    21452221                        return 1;
    21462222                case TRACE_OPTION_TRACETIME:
  • test/Makefile

    r17c5749 rf051c1b  
    1515BINS_PARALLEL = test-format-parallel test-format-parallel-hasher \
    1616        test-format-parallel-singlethreaded test-format-parallel-stressthreads \
    17         test-format-parallel-singlethreaded-hasher
     17        test-format-parallel-singlethreaded-hasher test-format-parallel-reporter
    1818
    1919BINS = test-pcap-bpf test-event test-time test-dir test-wireless test-errors \
  • test/do-tests-parallel.sh

    r54834a1 rf051c1b  
    5757do_test ./test-format-parallel-stressthreads erf
    5858
     59echo \* Read stress testing with 100 threads
     60do_test ./test-format-parallel-reporter erf
     61
    5962echo
    6063echo "Tests passed: $OK"
  • test/test-format-parallel-hasher.c

    r59ef093 rf051c1b  
    9595        bool seen_start_message;
    9696        bool seen_stop_message;
    97         bool seen_paused_message;
     97        bool seen_resumed_message;
    9898        bool seen_pausing_message;
    9999        int count;
     
    125125        }
    126126        else switch (mesg->code) {
    127                 case MESSAGE_STARTED:
     127                case MESSAGE_STARTING:
    128128                        assert(tls == NULL);
    129129                        tls = calloc(sizeof(struct TLS), 1);
     
    132132                        tls->seen_start_message = true;
    133133                        break;
    134                 case MESSAGE_STOPPED:
     134                case MESSAGE_STOPPING:
    135135                        assert(tls->seen_start_message);
    136136                        assert(tls != NULL);
     
    139139
    140140                        // All threads publish to verify the thread count
    141                         trace_publish_result(trace, (uint64_t) 0, (void *) tls->count);
    142                         trace_post_reduce(trace);
     141                        trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
     142                        trace_post_reporter(trace);
    143143                        free(tls);
    144144                        break;
     
    152152                        tls->seen_pausing_message = true;
    153153                        break;
    154                 case MESSAGE_PAUSED:
    155                         assert(tls->seen_pausing_message);
    156                         tls->seen_paused_message = true;
     154                case MESSAGE_RESUMING:
     155                        assert(tls->seen_pausing_message  || tls->seen_start_message);
     156                        tls->seen_resumed_message = true;
    157157                        break;
    158158        }
     
    197197        trace_pstart(trace, NULL, per_packet, NULL);
    198198        iferr(trace,tracename);
    199 
    200199        /* Make sure traces survive a pause and restart */
    201200        trace_ppause(trace);
  • test/test-format-parallel-singlethreaded-hasher.c

    r59ef093 rf051c1b  
    9595        bool seen_start_message;
    9696        bool seen_stop_message;
    97         bool seen_paused_message;
     97        bool seen_resuming_message;
    9898        bool seen_pausing_message;
    9999        int count;
     
    125125        }
    126126        else switch (mesg->code) {
    127                 case MESSAGE_STARTED:
     127                case MESSAGE_STARTING:
    128128                        assert(tls == NULL);
    129129                        tls = calloc(sizeof(struct TLS), 1);
     
    132132                        tls->seen_start_message = true;
    133133                        break;
    134                 case MESSAGE_STOPPED:
     134                case MESSAGE_STOPPING:
    135135                        assert(tls->seen_start_message);
    136136                        assert(tls != NULL);
     
    139139
    140140                        // All threads publish to verify the thread count
    141                         trace_publish_result(trace, (uint64_t) 0, (void *) tls->count);
    142                         trace_post_reduce(trace);
     141                        trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
     142                        trace_post_reporter(trace);
    143143                        free(tls);
    144144                        break;
     
    152152                        tls->seen_pausing_message = true;
    153153                        break;
    154                 case MESSAGE_PAUSED:
    155                         assert(tls->seen_pausing_message);
    156                         tls->seen_paused_message = true;
     154                case MESSAGE_RESUMING:
     155                        assert(tls->seen_pausing_message || tls->seen_start_message);
     156                        tls->seen_resuming_message = true;
    157157                        break;
    158158        }
  • test/test-format-parallel-singlethreaded.c

    r59ef093 rf051c1b  
    9595        bool seen_start_message;
    9696        bool seen_stop_message;
    97         bool seen_paused_message;
     97        bool seen_resuming_message;
    9898        bool seen_pausing_message;
    9999        int count;
     
    125125        }
    126126        else switch (mesg->code) {
    127                 case MESSAGE_STARTED:
     127                case MESSAGE_STARTING:
    128128                        assert(tls == NULL);
    129129                        tls = calloc(sizeof(struct TLS), 1);
     
    132132                        tls->seen_start_message = true;
    133133                        break;
    134                 case MESSAGE_STOPPED:
     134                case MESSAGE_STOPPING:
    135135                        assert(tls->seen_start_message);
    136136                        assert(tls != NULL);
     
    139139
    140140                        // All threads publish to verify the thread count
    141                         trace_publish_result(trace, (uint64_t) 0, (void *) tls->count);
    142                         trace_post_reduce(trace);
     141                        trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
     142                        trace_post_reporter(trace);
    143143                        free(tls);
    144144                        break;
     
    152152                        tls->seen_pausing_message = true;
    153153                        break;
    154                 case MESSAGE_PAUSED:
    155                         assert(tls->seen_pausing_message);
    156                         tls->seen_paused_message = true;
     154                case MESSAGE_RESUMING:
     155                        assert(tls->seen_pausing_message || tls->seen_start_message);
     156                        tls->seen_resuming_message = true;
    157157                        break;
    158158        }
  • test/test-format-parallel-stressthreads.c

    r59ef093 rf051c1b  
    9595        bool seen_start_message;
    9696        bool seen_stop_message;
    97         bool seen_paused_message;
     97        bool seen_resuming_message;
    9898        bool seen_pausing_message;
    9999        int count;
     
    125125        }
    126126        else switch (mesg->code) {
    127                 case MESSAGE_STARTED:
     127                case MESSAGE_STARTING:
    128128                        assert(tls == NULL);
    129129                        tls = calloc(sizeof(struct TLS), 1);
     
    132132                        tls->seen_start_message = true;
    133133                        break;
    134                 case MESSAGE_STOPPED:
     134                case MESSAGE_STOPPING:
    135135                        assert(tls->seen_start_message);
    136136                        assert(tls != NULL);
     
    139139
    140140                        // All threads publish to verify the thread count
    141                         trace_publish_result(trace, (uint64_t) 0, (void *) tls->count);
    142                         trace_post_reduce(trace);
     141                        trace_publish_result(trace, t, (uint64_t) 0, (void *) tls->count, RESULT_NORMAL);
     142                        trace_post_reporter(trace);
    143143                        free(tls);
    144144                        break;
     
    152152                        tls->seen_pausing_message = true;
    153153                        break;
    154                 case MESSAGE_PAUSED:
    155                         assert(tls->seen_pausing_message);
    156                         tls->seen_paused_message = true;
     154                case MESSAGE_RESUMING:
     155                        assert(tls->seen_pausing_message || tls->seen_start_message);
     156                        tls->seen_resuming_message = true;
    157157                        break;
    158158        }
  • test/test-format-parallel.c

    r59ef093 rf051c1b  
    9595        bool seen_start_message;
    9696        bool seen_stop_message;
    97         bool seen_paused_message;
     97        bool seen_resuming_message;
    9898        bool seen_pausing_message;
    9999        int count;
     
    132132        }
    133133        else switch (mesg->code) {
    134                 case MESSAGE_STARTED:
     134                case MESSAGE_STARTING:
    135135                        assert(!seen_start_message || seen_paused_message);
    136136                        assert(tls == NULL);
     
    141141                        tls->seen_start_message = true;
    142142                        break;
    143                 case MESSAGE_STOPPED:
     143                case MESSAGE_STOPPING:
    144144                        assert(seen_start_message);
    145145                        assert(tls != NULL);
     
    152152
    153153                        // All threads publish to verify the thread count
    154                         trace_publish_result(trace, (uint64_t) 0, (void *) count);
    155                         trace_post_reduce(trace);
     154                        trace_publish_result(trace, t, (uint64_t) 0, (void *) count, RESULT_NORMAL);
     155                        trace_post_reporter(trace);
    156156                        break;
    157157                case MESSAGE_TICK:
     
    165165                        tls->seen_pausing_message = true;
    166166                        break;
    167                 case MESSAGE_PAUSED:
    168                         assert(seen_pausing_message);
     167                case MESSAGE_RESUMING:
     168                        assert(tls->seen_pausing_message  || tls->seen_start_message );
    169169                        seen_paused_message = true;
    170                         tls->seen_paused_message = true;
     170                        tls->seen_resuming_message = true;
    171171                        break;
    172172        }
  • tools/traceanon/traceanon_parallel.c

    r9594cf9 rf051c1b  
    2727        static int s = 0;
    2828        (void)signal;
    29         // trace_interrupt();
     29    //trace_interrupt();
    3030        // trace_pstop isn't really signal safe because its got lots of locks in it
    31         // trace_pstop(trace);
    32         if (s == 0) {
     31    trace_pstop(trace);
     32    /*if (s == 0) {
    3333                if (trace_ppause(trace) == -1)
    3434                        trace_perror(trace, "Pause failed");
     
    3737                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
    3838                        trace_perror(trace, "Start failed");
    39         }
     39    }*/
    4040        s = !s;
    4141}
     
    141141
    142142
    143 static uint64_t bad_hash(libtrace_packet_t * pkt)
     143UNUSED static uint64_t bad_hash(UNUSED libtrace_packet_t * pkt)
    144144{
    145145        return 0;
     
    147147
    148148
    149 static uint64_t rand_hash(libtrace_packet_t * pkt)
     149UNUSED static uint64_t rand_hash(UNUSED libtrace_packet_t * pkt)
    150150{
    151151        return rand();
     
    153153
    154154
    155 static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, libtrace_thread_t *t)
    156 {
    157         int i;
     155static void* per_packet(libtrace_t *trace, libtrace_packet_t *pkt, libtrace_message_t *mesg, UNUSED libtrace_thread_t *t)
     156{
    158157       
    159158        if (pkt) {
     
    191190                //libtrace_packet_t * packet_copy = trace_result_packet(trace, pkt);
    192191                //trace_publish_result(trace, trace_packet_get_order(pkt), pkt);
    193                 trace_publish_packet(trace, pkt);
     192                trace_publish_result(trace, t, trace_packet_get_order(pkt), pkt, RESULT_PACKET);
    194193                //return ;
    195194        }
     
    197196                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    198197                switch (mesg->code) {
    199                         case MESSAGE_STARTED:
     198                        case MESSAGE_STARTING:
    200199                                enc_init(enc_type,key);
    201200                }
     
    204203}
    205204
     205struct libtrace_out_t *writer = 0;
     206
     207static void* write_out(libtrace_t *trace, libtrace_result_t *result, UNUSED libtrace_message_t *mesg) {
     208        static uint64_t packet_count = 0; // TESTING PURPOSES, this is not going to work with a live format
     209        if (result) {
     210                libtrace_packet_t *packet = (libtrace_packet_t*) libtrace_result_get_value(result);
     211                assert(libtrace_result_get_key(result) == packet_count++);
     212                if (trace_write_packet(writer,packet)==-1) {
     213                        trace_perror_output(writer,"writer");
     214                        trace_interrupt();
     215                }
     216                trace_free_result_packet(trace, packet);
     217        }
     218        return NULL;
     219}
     220
     221
    206222int main(int argc, char *argv[])
    207223{
    208224        //struct libtrace_t *trace = 0;
    209         struct libtrace_packet_t *packet/* = trace_create_packet()*/;
    210         struct libtrace_out_t *writer = 0;
    211225        struct sigaction sigact;
    212226        char *output = 0;
     
    380394        //trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_BUFFER_SIZE, &i);
    381395        i = 2;
    382         trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
    383        
    384         if (trace_pstart(trace, NULL, &per_packet, NULL)==-1) {
     396    trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &i);
     397       
     398        if (trace_pstart(trace, NULL, &per_packet, &write_out)==-1) {
    385399                trace_perror(trace,"trace_start");
    386400                trace_destroy_output(writer);
     
    402416        sigaction(SIGINT, &sigact, NULL);
    403417        sigaction(SIGTERM, &sigact, NULL);
    404        
    405         // Read in the resulting packets and then free them when done
    406         libtrace_vector_t res;
    407         int res_size = 0;
    408         libtrace_vector_init(&res, sizeof(libtrace_result_t));
    409         uint64_t packet_count = 0;
    410         while (!trace_finished(trace)) {
    411                 // Read messages
    412                 libtrace_message_t message;
    413                
    414                 // We just release and do work currently, maybe if something
    415                 // interesting comes through we'd deal with that
    416                 libtrace_thread_get_message(trace, &message);
    417                
    418                 while (libtrace_thread_try_get_message(trace, &message) != LIBTRACE_MQ_FAILED) { }
    419                
    420                 if ((res_size = trace_get_results(trace, &res)) == 0)
    421                         ;/*sched_yield();*/
    422                
    423                 for (i = 0 ; i < res_size ; i++) {
    424                         libtrace_result_t result;
    425                         assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
    426                         packet = libtrace_result_get_value(&result);
    427                         assert(libtrace_result_get_key(&result) == packet_count);
    428                         packet_count++;
    429                         if (trace_write_packet(writer,packet)==-1) {
    430                                 trace_perror_output(writer,"writer");
    431                                 trace_interrupt();
    432                                 break;
    433                         }
    434                         //trace_destroy_packet(packet);
    435                         trace_free_result_packet(trace, packet);
    436                 }
    437         }
     418
     419        // Wait for the trace to finish
    438420        trace_join(trace);
    439        
    440         // Grab everything that's left here
    441         res_size = trace_get_results(trace, &res);
    442        
    443         for (i = 0 ; i < res_size ; i++) {
    444                 libtrace_result_t result;
    445                 assert(libtrace_vector_get(&res, i, (void *) &result) == 1);
    446                 packet = libtrace_result_get_value(&result);
    447                 if (libtrace_result_get_key(&result) != packet_count)
    448                         printf ("Got a %"PRIu64" but expected a %"PRIu64" %d\n", libtrace_result_get_key(&result), packet_count, res_size);
    449                 assert(libtrace_result_get_key(&result) == packet_count);
    450                
    451                 packet_count++;
    452                 if (trace_write_packet(writer,packet)==-1) {
    453                         trace_perror_output(writer,"writer");
    454                         trace_interrupt();
    455                         break;
    456                 }
    457                 trace_destroy_packet(packet);
    458         }
    459         libtrace_vector_destroy(&res);
    460421       
    461422        //trace_destroy_packet(packet);
  • tools/tracertstats/tracertstats_parallel.c

    rb13b939 rf051c1b  
    209209                        // Publish and make a new one new
    210210                        //fprintf(stderr, "Publishing result %"PRIu64"\n", last_ts);
    211                         trace_publish_result(trace, (uint64_t) last_ts, results);
    212                         trace_post_reduce(trace);
     211                        trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     212                        trace_post_reporter(trace);
    213213                        results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    214214                        last_ts++;
     
    234234                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    235235                switch (mesg->code) {
    236                         case MESSAGE_STARTED:
     236                        case MESSAGE_STARTING:
    237237                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    238238                                break;
    239                         case MESSAGE_STOPPED:
     239                        case MESSAGE_STOPPING:
    240240                                // Should we always post this?
    241241                                if (results->total.count) {
    242                                         trace_publish_result(trace, (uint64_t) last_ts, results);
    243                                         trace_post_reduce(trace);
     242                                        trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     243                                        trace_post_reporter(trace);
    244244                                        results = NULL;
    245245                                }
     
    260260                                        if (next_update_time <= mesg->additional.uint64) {
    261261                                                //fprintf(stderr, "Got a tick and publishing early!!\n");
    262                                                 trace_publish_result(trace, (uint64_t) last_ts, results);
    263                                                 trace_post_reduce(trace);
     262                                                trace_publish_result(trace, t, (uint64_t) last_ts, results, RESULT_NORMAL);
     263                                                trace_post_reporter(trace);
    264264                                                results = calloc(1, sizeof(result_t) + sizeof(statistic_t) * filter_count);
    265265                                                last_ts++;
  • tools/tracestats/tracestats_parallel.c

    r9594cf9 rf051c1b  
    6767        // trace_interrupt();
    6868        // trace_pstop isn't really signal safe because its got lots of locks in it
    69         // trace_pstop(trace);
    70         if (s == 0) {
     69    trace_pstop(trace);
     70    /*if (s == 0) {
    7171                if (trace_ppause(trace) == -1)
    7272                        trace_perror(trace, "Pause failed");
     
    7575                if (trace_pstart(trace, NULL, NULL, NULL) == -1)
    7676                        trace_perror(trace, "Start failed");
    77         }
     77    }*/
    7878        s = !s;
    7979}
     
    129129                // printf ("%d.%06d READ #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));
    130130                switch (mesg->code) {
    131                         case MESSAGE_STOPPED:
    132                                 trace_publish_result(trace, 0, results); // Only ever using a single key 0
     131                        case MESSAGE_STOPPING:
     132                                trace_publish_result(trace, t, 0, results, RESULT_NORMAL); // Only ever using a single key 0
    133133                                fprintf(stderr, "Thread published resuslts WOWW\n");
    134134                                break;
    135                         case MESSAGE_STARTED:
     135                        case MESSAGE_STARTING:
    136136                                results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    137137                                break;
     
    142142                                fprintf(stderr, "Thread is pausing\n");
    143143                                break;
    144                         case MESSAGE_PAUSED:
     144                        case MESSAGE_RESUMING:
    145145                                fprintf(stderr, "Thread has paused\n");
    146146                                break;
     
    228228        int option = 2;
    229229        //option = 10000;
    230         trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
     230    //trace_set_hasher(trace, HASHER_CUSTOM, &rand_hash, NULL);
    231231        option = 2;
    232232        trace_parallel_config(trace, TRACE_OPTION_SET_PERPKT_THREAD_COUNT, &option);
Note: See TracChangeset for help on using the changeset viewer.