source: lib/combiner_ordered.c @ 3dd5acc

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 3dd5acc was 3dd5acc, checked in by Shane Alcock <salcock@…>, 6 years ago

Fix problems with combiners and ticks.

  • Each tick should only be passed to the reporter once at most. In the case of a sorted combiner, all ticks are discarded.
  • In the case of an ordered combiner, ticks that do not use the same ordering as packets (e.g. ts ticks vs count ticks) are not used for any ordering comparisons. A read operation will read from each queue until it encounters a tick in that queue -- if it is a new tick, that will be reported, otherwise the tick is discarded and the queue is marked as not "live".

In terms of testing: works OK when mixing timestamp ticks with non-parallel
input. Still needs to be tested the other way around.

  • Property mode set to 100644
File size: 8.6 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7/* TODO hook up configuration option for sequentual packets again */
8
9static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
10        int i = 0;
11        assert(libtrace_get_perpkt_count(t) > 0);
12        libtrace_queue_t *queues;
13        c->queues = calloc(sizeof(libtrace_queue_t), libtrace_get_perpkt_count(t));
14        queues = c->queues;
15        for (i = 0; i < libtrace_get_perpkt_count(t); ++i) {
16                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
17        }
18        return 0;
19}
20
21static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
22        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
23        //while (libtrace_deque_get_size(&t->deque) >= 1000)
24        //      sched_yield();
25        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
26
27        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
28                trace_post_reporter(trace);
29        }
30}
31
32inline static uint64_t next_message(libtrace_queue_t *v) {
33
34        libtrace_result_t r;
35        if (libtrace_deque_peek_front(v, (void *) &r) == 0)
36                return 0;
37
38        if (r.type == RESULT_TICK_INTERVAL || r.type == RESULT_TICK_COUNT)
39                return 0;
40
41        return r.key;
42}
43
44inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
45                libtrace_queue_t *v, uint64_t *key) {
46
47        libtrace_result_t r;
48        libtrace_deque_peek_front(v, (void *) &r);
49
50        /* Ticks are a bit tricky, because we can get TS
51         * ticks in amongst packets indexed by their cardinal
52         * order and vice versa. Also, every thread will
53         * produce an equivalent tick and we should really
54         * combine those into a single tick for the reporter
55         * thread.
56         */
57
58        if (r.type == RESULT_TICK_INTERVAL) {
59                if (r.key > c->last_ts_tick) {
60                        c->last_ts_tick = r.key;
61
62                        /* Tick doesn't match packet order */
63                        if (!trace_is_parallel(trace)) {
64                                /* Pass straight to reporter */
65                                libtrace_generic_t gt = {.res = &r};
66                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
67                                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
68                                return 0;
69                        }
70                        /* Tick matches packet order */
71                        *key = r.key;
72                        return 1;
73
74                } else {
75                        /* Duplicate -- pop it */
76                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
77                        return 0;
78                }
79        }
80
81        if (r.type == RESULT_TICK_COUNT) {
82                if (r.key > c->last_count_tick) {
83                        c->last_count_tick = r.key;
84
85                        /* Tick doesn't match packet order */
86                        if (trace_is_parallel(trace)) {
87                                /* Pass straight to reporter */
88                                libtrace_generic_t gt = {.res = &r};
89                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
90                                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
91                                return 0;
92                        }
93                        /* Tick matches packet order */
94                        *key = r.key;
95                        return 1;
96
97                        /* Tick doesn't match packet order */
98                } else {
99                        /* Duplicate -- pop it */
100                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);
101                        return 0;
102                }
103        }
104       
105        *key = r.key;
106        return 1;
107}
108
109inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
110        int i;
111        int live_count = 0;
112        libtrace_queue_t *queues = c->queues;
113        bool allactive = true;
114        bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive
115        uint64_t key[libtrace_get_perpkt_count(trace)]; // Cached keys
116        uint64_t min_key = UINT64_MAX;
117        uint64_t prev_min = 0;
118        uint64_t peeked = 0;
119        int min_queue = -1;
120
121        /* Loop through check all are alive (have data) and find the smallest */
122        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
123                libtrace_queue_t *v = &queues[i];
124                if (libtrace_deque_get_size(v) != 0) {
125                        if (peek_queue(trace, c, v, &peeked)) {
126                                live_count ++;
127                                live[i] = true;
128                                key[i] = peeked;
129                                if (i == 0 || min_key > peeked) {
130                                        min_key = peeked;
131                                        min_queue = i;
132                                }
133                        } else {
134                                live[i] = false;
135                                key[i] = 0;
136                        }
137                } else {
138                        allactive = false;
139                        live[i] = false;
140                        key[i] = 0;
141                }
142        }
143
144        /* Now remove the smallest and loop - special case if all threads have
145         * joined we always flush what's left. Or the next smallest is the same
146         * value or less than the previous */
147        while ((allactive && min_queue != -1) || (live_count && final)
148               || (live_count && prev_min >= min_key)) {
149                /* Get the minimum queue and then do stuff */
150                libtrace_result_t r;
151                libtrace_generic_t gt = {.res = &r};
152
153                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
154                trace->reporter(trace, MESSAGE_RESULT, gt, &trace->reporter_thread);
155
156                // Now update the one we just removed
157                peeked = next_message(&queues[min_queue]);
158                if (libtrace_deque_get_size(&queues[min_queue]) &&
159                                peeked != 0) {
160
161                        key[min_queue] = peeked;
162                        // We are still the smallest, might be out of order :(
163                        if (key[min_queue] <= min_key) {
164                                min_key = key[min_queue];
165                        } else {
166                                min_key = key[min_queue]; // Update our minimum
167                                // Check all find the smallest again - all are alive
168                                for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
169                                        if (live[i] && min_key >= key[i]) {
170                                                min_key = key[i];
171                                                min_queue = i;
172                                        }
173                                }
174                        }
175                } else {
176                        allactive = false;
177                        live[min_queue] = false;
178                        live_count--;
179                        prev_min = min_key;
180                        min_key = UINT64_MAX; // Update our minimum
181                        // Check all find the smallest again - all are alive
182                        for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
183                                if (live[i] && min_key >= key[i]) {
184                                        min_key = key[i];
185                                        min_queue = i;
186                                }
187                        }
188                }
189        }
190}
191
192static void read(libtrace_t *trace, libtrace_combine_t *c) {
193        read_internal(trace, c, false);
194}
195
196static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
197        int empty = 0, i;
198        libtrace_queue_t *q = c->queues;
199
200        do {
201                read_internal(trace, c, true);
202                empty = 0;
203                for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) {
204                        if (libtrace_deque_get_size(&q[i]) == 0)
205                                empty ++;
206                }
207        }
208        while (empty < libtrace_get_perpkt_count(trace));
209}
210
211static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
212        int i;
213        libtrace_queue_t *queues = c->queues;
214
215        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
216                assert(libtrace_deque_get_size(&queues[i]) == 0);
217        }
218        free(queues);
219        queues = NULL;
220}
221
222
223static void pause(libtrace_t *trace, libtrace_combine_t *c) {
224        libtrace_queue_t *queues = c->queues;
225        int i;
226        for (i = 0; i < libtrace_get_perpkt_count(trace); i++) {
227                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
228        }
229}
230
231DLLEXPORT const libtrace_combine_t combiner_ordered = {
232        init_combiner,  /* initialise */
233        destroy,                /* destroy */
234        publish,                /* publish */
235        read,                   /* read */
236        read_final,             /* read_final */
237        pause,                  /* pause */
238        NULL,                   /* queues */
239        0,                      /* last_count_tick */
240        0,                      /* last_ts_tick */
241        {0}                             /* opts */
242};
Note: See TracBrowser for help on using the repository browser.