Changeset 3dd5acc


Ignore:
Timestamp:
09/09/15 14:04:10 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
a2dcdad
Parents:
0f6bc3f
Message:

Fix problems with combiners and ticks.

  • Each tick should only be passed to the reporter once at most. In the case of a sorted combiner, all ticks are discarded.
  • In the case of an ordered combiner, ticks that do not use the same ordering as packets (e.g. ts ticks vs count ticks) are not used for any ordering comparisons. A read operation will read from each queue until it encounters a tick in that queue -- if it is a new tick, that will be reported, otherwise the tick is discarded and the queue is marked as not "live".

In terms of testing: works OK when mixing timestamp ticks with non-parallel
input. Still needs to be tested the other way around.

Location:
lib
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • lib/combiner_ordered.c

    rf01c479 r3dd5acc  
    3030}
    3131
    32 inline static void read_internal(libtrace_t *trace, libtrace_queue_t *queues, const bool final){
     32inline static uint64_t next_message(libtrace_queue_t *v) {
     33
     34        libtrace_result_t r;
     35        if (libtrace_deque_peek_front(v, (void *) &r) == 0)
     36                return 0;
     37
     38        if (r.type == RESULT_TICK_INTERVAL || r.type == RESULT_TICK_COUNT)
     39                return 0;
     40
     41        return r.key;
     42}
     43
     44inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
     45                libtrace_queue_t *v, uint64_t *key) {
     46
     47        libtrace_result_t r;
     48        libtrace_deque_peek_front(v, (void *) &r);
     49
     50        /* Ticks are a bit tricky, because we can get TS
     51         * ticks in amongst packets indexed by their cardinal
     52         * order and vice versa. Also, every thread will
     53         * produce an equivalent tick and we should really
     54         * combine those into a single tick for the reporter
     55         * thread.
     56         */
     57
     58        if (r.type == RESULT_TICK_INTERVAL) {
     59                if (r.key > c->last_ts_tick) {
     60                        c->last_ts_tick = r.key;
     61
     62                        /* Tick doesn't match packet order */
     63                        if (!trace_is_parallel(trace)) {
     64                                /* Pass straight to reporter */
     65                                libtrace_generic_t gt = {.res = &r};
     66                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     67                                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
     68                                return 0;
     69                        }
     70                        /* Tick matches packet order */
     71                        *key = r.key;
     72                        return 1;
     73
     74                } else {
     75                        /* Duplicate -- pop it */
     76                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     77                        return 0;
     78                }
     79        }
     80
     81        if (r.type == RESULT_TICK_COUNT) {
     82                if (r.key > c->last_count_tick) {
     83                        c->last_count_tick = r.key;
     84
     85                        /* Tick doesn't match packet order */
     86                        if (trace_is_parallel(trace)) {
     87                                /* Pass straight to reporter */
     88                                libtrace_generic_t gt = {.res = &r};
     89                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     90                                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
     91                                return 0;
     92                        }
     93                        /* Tick matches packet order */
     94                        *key = r.key;
     95                        return 1;
     96
     97                        /* Tick doesn't match packet order */
     98                } else {
     99                        /* Duplicate -- pop it */
     100                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     101                        return 0;
     102                }
     103        }
     104       
     105        *key = r.key;
     106        return 1;
     107}
     108
     109inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
    33110        int i;
    34111        int live_count = 0;
    35         bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive
     112        libtrace_queue_t *queues = c->queues;
     113        bool allactive = true;
     114        bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive
    36115        uint64_t key[libtrace_get_perpkt_count(trace)]; // Cached keys
    37116        uint64_t min_key = UINT64_MAX;
    38117        uint64_t prev_min = 0;
     118        uint64_t peeked = 0;
    39119        int min_queue = -1;
    40120
    41121        /* Loop through check all are alive (have data) and find the smallest */
    42         for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
     122        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
    43123                libtrace_queue_t *v = &queues[i];
    44124                if (libtrace_deque_get_size(v) != 0) {
    45                         libtrace_result_t r;
    46                         libtrace_deque_peek_front(v, (void *) &r);
    47                         live_count++;
    48                         live[i] = true;
    49                         key[i] = r.key;
    50                         if (i==0 || min_key > key[i]) {
    51                                 min_key = key[i];
    52                                 min_queue = i;
    53                         }
    54                 } else {
    55                         live[i] = false;
    56                 }
     125                        if (peek_queue(trace, c, v, &peeked)) {
     126                                live_count ++;
     127                                live[i] = true;
     128                                key[i] = peeked;
     129                                if (i == 0 || min_key > peeked) {
     130                                        min_key = peeked;
     131                                        min_queue = i;
     132                                }
     133                        } else {
     134                                live[i] = false;
     135                                key[i] = 0;
     136                        }
     137                } else {
     138                        allactive = false;
     139                        live[i] = false;
     140                        key[i] = 0;
     141                }
    57142        }
    58143
     
    60145         * joined we always flush what's left. Or the next smallest is the same
    61146         * value or less than the previous */
    62         while ((live_count == libtrace_get_perpkt_count(trace)) || (live_count && final)
     147        while ((allactive && min_queue != -1) || (live_count && final)
    63148               || (live_count && prev_min >= min_key)) {
    64149                /* Get the minimum queue and then do stuff */
     
    70155
    71156                // Now update the one we just removed
    72                 if (libtrace_deque_get_size(&queues[min_queue]) )
    73                 {
    74                         libtrace_deque_peek_front(&queues[min_queue], (void *) &r);
    75                         key[min_queue] = r.key;
    76                         if (key[min_queue] <= min_key) {
    77                                 // We are still the smallest, might be out of order though :(
    78                                 min_key = key[min_queue];
    79                         } else {
    80                                 min_key = key[min_queue]; // Update our minimum
    81                                 // Check all find the smallest again - all are alive
    82                                 for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
    83                                         if (live[i] && min_key > key[i]) {
    84                                                 min_key = key[i];
    85                                                 min_queue = i;
    86                                         }
    87                                 }
    88                         }
     157                peeked = next_message(&queues[min_queue]);
     158                if (libtrace_deque_get_size(&queues[min_queue]) &&
     159                                peeked != 0) {
     160
     161                        key[min_queue] = peeked;
     162                        // We are still the smallest, might be out of order :(
     163                        if (key[min_queue] <= min_key) {
     164                                min_key = key[min_queue];
     165                        } else {
     166                                min_key = key[min_queue]; // Update our minimum
     167                                // Check all find the smallest again - all are alive
     168                                for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
     169                                        if (live[i] && min_key >= key[i]) {
     170                                                min_key = key[i];
     171                                                min_queue = i;
     172                                        }
     173                                }
     174                        }
    89175                } else {
    90                         live[min_queue] = false;
     176                        allactive = false;
     177                        live[min_queue] = false;
    91178                        live_count--;
    92179                        prev_min = min_key;
     
    104191
    105192static void read(libtrace_t *trace, libtrace_combine_t *c) {
    106         read_internal(trace, c->queues, false);
     193        read_internal(trace, c, false);
    107194}
    108195
    109196static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
    110         read_internal(trace, c->queues, true);
     197        int empty = 0, i;
     198        libtrace_queue_t *q = c->queues;
     199
     200        do {
     201                read_internal(trace, c, true);
     202                empty = 0;
     203                for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
     204                        if (libtrace_deque_get_size(&q[i]) == 0)
     205                                empty ++;
     206                }
     207        }
     208        while (empty < libtrace_get_perpkt_count(trace));
    111209}
    112210
     
    139237        pause,                  /* pause */
    140238        NULL,                   /* queues */
     239        0,                      /* last_count_tick */
     240        0,                      /* last_ts_tick */
    141241        {0}                             /* opts */
    142242};
  • lib/combiner_sorted.c

    r6a6e6a8 r3dd5acc  
    6262                libtrace_generic_t gt = {.res = &r};
    6363                ASSERT_RET (libtrace_vector_get(&queues[0], a, (void *) &r), == 1);
    64                 trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
     64                if (r.type == RESULT_TICK_INTERVAL ||
     65                                r.type == RESULT_TICK_COUNT) {
     66                        /* Ticks are essentially useless for this combiner? */
     67                        continue;
     68                }
     69                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
    6570        }
    6671        libtrace_vector_empty(&queues[0]);
     
    8792    pause,                      /* pause */
    8893    NULL,                       /* queues */
     94    0,                          /* last_count_tick */
     95    0,                          /* last_ts_tick */
    8996    {0}                         /* opts */
    9097};
  • lib/combiner_unordered.c

    rf01c479 r3dd5acc  
    3535                while (libtrace_deque_get_size(v) != 0) {
    3636                        libtrace_result_t r;
    37                         libtrace_generic_t gt = {.res = &r};
     37                        libtrace_generic_t gt = {.res = &r};
    3838                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
     39                        /* Ignore any ticks that we've already seen */
     40                        if (r.type == RESULT_TICK_INTERVAL) {
     41                                if (r.key <= c->last_ts_tick)
     42                                        continue;
     43                                c->last_ts_tick = r.key;
     44                        }
     45
     46                        if (r.type == RESULT_TICK_COUNT) {
     47                                if (r.key <= c->last_count_tick)
     48                                        continue;
     49                                c->last_count_tick = r.key;
     50                        }
    3951                        trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
    4052                }
     
    6173    read,                       /* pause */
    6274    NULL,                       /* queues */
     75    0,                          /* last_count_tick */
     76    0,                          /* last_ts_tick */
    6377    {0}                         /* opts */
    6478};
  • lib/libtrace_parallel.h

    rb3ff33b r3dd5acc  
    392392         */
    393393        void *queues;
     394
     395        uint64_t last_count_tick;
     396        uint64_t last_ts_tick;
    394397
    395398        /**
Note: See TracChangeset for help on using the changeset viewer.