Changeset 44f9892 for lib


Ignore:
Timestamp:
10/13/15 09:51:22 (6 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
21c0d70
Parents:
92a2bf6
Message:

Fix issues with bad ordering in the ordered combiner

Mostly this was down to a false assumption that interval ticks
would be processed at the same time by each processing thread, i.e. all
packets after a tick in one thread would always follow the packets
preceding the same tick in another thread.

Instead, we now simply skip past ticks in the queues when considering
what is next in the queue, rather than trying to continue looping until
all queues have the same tick as their next message.

Of course, we still pass the first instance of any given tick on to the
reporter as soon as we see it -- now we just continue trying to process
the queue it came from if the next result has the lowest order value.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/combiner_ordered.c

    ra31e166 r44f9892  
    3030}
    3131
    32 inline static uint64_t next_message(libtrace_queue_t *v) {
     32inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
     33                libtrace_queue_t *v, uint64_t *key, libtrace_result_t *peeked) {
    3334
    3435        libtrace_result_t r;
    35         if (libtrace_deque_peek_front(v, (void *) &r) == 0)
    36                 return 0;
    37 
    38         if (r.type == RESULT_TICK_INTERVAL || r.type == RESULT_TICK_COUNT)
    39                 return 0;
    40 
    41         return r.key;
    42 }
    43 
    44 inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
    45                 libtrace_queue_t *v, uint64_t *key) {
    46 
    47         libtrace_result_t r;
    48         libtrace_deque_peek_front(v, (void *) &r);
     36        if (!peeked) {
     37                libtrace_deque_peek_front(v, (void *) &r);
     38                peeked = &r;
     39        }
    4940
    5041        /* Ticks are a bit tricky, because we can get TS
     
    5647         */
    5748
    58         if (r.type == RESULT_TICK_INTERVAL) {
    59                 if (r.key > c->last_ts_tick) {
    60                         c->last_ts_tick = r.key;
     49        if (peeked->type == RESULT_TICK_INTERVAL) {
     50                if (peeked->key > c->last_ts_tick) {
     51                        c->last_ts_tick = peeked->key;
    6152
    6253                        /* Tick doesn't match packet order */
    6354                        if (!trace_is_parallel(trace)) {
    6455                                /* Pass straight to reporter */
    65                                 libtrace_generic_t gt = {.res = &r};
    66                                 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     56                                libtrace_generic_t gt = {.res = peeked};
     57                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
    6758                                send_message(trace, &trace->reporter_thread,
    6859                                                MESSAGE_RESULT, gt,
     
    7162                        }
    7263                        /* Tick matches packet order */
    73                         *key = r.key;
     64                        *key = peeked->key;
    7465                        return 1;
    7566
    7667                } else {
    7768                        /* Duplicate -- pop it */
    78                         ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     69                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
    7970                        return 0;
    8071                }
    8172        }
    8273
    83         if (r.type == RESULT_TICK_COUNT) {
    84                 if (r.key > c->last_count_tick) {
    85                         c->last_count_tick = r.key;
     74        if (peeked->type == RESULT_TICK_COUNT) {
     75                if (peeked->key > c->last_count_tick) {
     76                        c->last_count_tick = peeked->key;
    8677
    8778                        /* Tick doesn't match packet order */
    8879                        if (trace_is_parallel(trace)) {
    8980                                /* Pass straight to reporter */
    90                                 libtrace_generic_t gt = {.res = &r};
    91                                 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     81                                libtrace_generic_t gt = {.res = peeked};
     82                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
    9283                                send_message(trace, &trace->reporter_thread,
    9384                                                MESSAGE_RESULT, gt,
     
    9687                        }
    9788                        /* Tick matches packet order */
    98                         *key = r.key;
     89                        *key = peeked->key;
    9990                        return 1;
    10091
     
    10293                } else {
    10394                        /* Duplicate -- pop it */
    104                         ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     95                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
    10596                        return 0;
    10697                }
    10798        }
    108        
    109         *key = r.key;
     99
     100        *key = peeked->key;
    110101        return 1;
    111102}
     103
     104inline static uint64_t next_message(libtrace_t *trace, libtrace_combine_t *c,
     105                libtrace_queue_t *v) {
     106
     107        libtrace_result_t r;
     108        uint64_t nextkey = 0;
     109
     110        do {
     111                if (libtrace_deque_peek_front(v, (void *) &r) == 0) {
     112                        return 0;
     113                }
     114        } while (peek_queue(trace, c, v, &nextkey, &r) == 0);
     115
     116        return nextkey;
     117}
     118
    112119
    113120inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
     
    126133        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
    127134                libtrace_queue_t *v = &queues[i];
    128                 if (libtrace_deque_get_size(v) != 0) {
    129                         if (peek_queue(trace, c, v, &peeked)) {
    130                                 live_count ++;
    131                                 live[i] = true;
    132                                 key[i] = peeked;
    133                                 if (i == 0 || min_key > peeked) {
    134                                         min_key = peeked;
    135                                         min_queue = i;
    136                                 }
    137                         } else {
    138                                 live[i] = false;
    139                                 key[i] = 0;
     135                if (libtrace_deque_get_size(v) != 0 &&
     136                                peek_queue(trace, c, v, &peeked, NULL)) {
     137                        live_count ++;
     138                        live[i] = true;
     139                        key[i] = peeked;
     140                        if (i == 0 || min_key > peeked) {
     141                                min_key = peeked;
     142                                min_queue = i;
    140143                        }
    141144                } else {
     
    149152         * joined we always flush what's left. Or the next smallest is the same
    150153         * value or less than the previous */
    151         while ((allactive && min_queue != -1) || (live_count && final)
    152                || (live_count && prev_min >= min_key)) {
     154        while (allactive || (live_count && final)) {
    153155                /* Get the minimum queue and then do stuff */
    154156                libtrace_result_t r;
     
    156158
    157159                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
     160
    158161                send_message(trace, &trace->reporter_thread,
    159162                                MESSAGE_RESULT, gt,
     
    161164
    162165                // Now update the one we just removed
    163                 peeked = next_message(&queues[min_queue]);
    164                 if (libtrace_deque_get_size(&queues[min_queue]) &&
    165                                 peeked != 0) {
     166                peeked = next_message(trace, c, &queues[min_queue]);
     167                if (peeked != 0) {
    166168
    167169                        key[min_queue] = peeked;
     
    182184                        allactive = false;
    183185                        live[min_queue] = false;
     186                        key[min_queue] = 0;
    184187                        live_count--;
    185188                        prev_min = min_key;
    186189                        min_key = UINT64_MAX; // Update our minimum
     190                        min_queue = -1;
    187191                        // Check all find the smallest again - all are alive
    188192                        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
Note: See TracChangeset for help on using the changeset viewer.