Changeset f01c479


Ignore:
Timestamp:
03/25/15 14:28:31 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
5e43b8b
Parents:
a330384
Message:

Move reporter threashold check to after adding the next item, so it makes more sense
in the case the threashold is set to 1.

Add logic to the order combiner allowing multiple results with the same key
to be sent even if all queues are not full.

Location:
lib
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • lib/combiner_ordered.c

    r6a6e6a8 rf01c479  
    2121static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
    2222        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
     23        //while (libtrace_deque_get_size(&t->deque) >= 1000)
     24        //      sched_yield();
     25        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
     26
    2327        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
    2428                trace_post_reporter(trace);
    2529        }
    26         //while (libtrace_deque_get_size(&t->deque) >= 1000)
    27         //      sched_yield();
    28         libtrace_deque_push_back(queue, res); // Automatically locking for us :)
    2930}
    3031
     
    3536        uint64_t key[libtrace_get_perpkt_count(trace)]; // Cached keys
    3637        uint64_t min_key = UINT64_MAX;
     38        uint64_t prev_min = 0;
    3739        int min_queue = -1;
    3840
     
    5557        }
    5658
    57         /* Now remove the smallest and loop - special case if all threads have joined we always flush what's left */
    58         while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count && final)) {
    59                 // || (live_count && ((flags & REDUCE_SEQUENTIAL && min_key == trace->expected_key)))
     59        /* Now remove the smallest and loop - special case if all threads have
     60         * joined we always flush what's left. Or the next smallest is the same
     61         * value or less than the previous */
     62        while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count && final)
     63               || (live_count && prev_min >= min_key)) {
    6064                /* Get the minimum queue and then do stuff */
    6165                libtrace_result_t r;
     
    6468                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
    6569                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
    66 
    67                 // We expect the key we read +1 now , todo put expected in our storage area
    68                 //trace->expected_key = key[min_queue] + 1;
    6970
    7071                // Now update the one we just removed
     
    8990                        live[min_queue] = false;
    9091                        live_count--;
     92                        prev_min = min_key;
    9193                        min_key = UINT64_MAX; // Update our minimum
    9294                        // Check all find the smallest again - all are alive
    9395                        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
    94                                 // Still not 100% TODO (what if order is wrong or not increasing)
    9596                                if (live[i] && min_key >= key[i]) {
    9697                                        min_key = key[i];
  • lib/combiner_unordered.c

    r62b3c4e rf01c479  
    1919static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
    2020        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
     21        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
     22
    2123        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
    2224                trace_post_reporter(trace);
    2325        }
    24         libtrace_deque_push_back(queue, res); // Automatically locking for us :)
    2526}
    2627
Note: See TracChangeset for help on using the changeset viewer.