source: lib/combiner_ordered.c @ 44f9892

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 44f9892 was 44f9892, checked in by Shane Alcock <salcock@…>, 6 years ago

Fix issues with bad ordering in the ordered combiner

Mostly this was down to a false assumption that interval ticks
would be processed at the same time by each processing thread, i.e. all
packets after a tick in one thread would always follow the packets
preceding the same tick in another thread.

Instead, we now simply skip past ticks in the queues when considering
what is next in the queue, rather than trying to continue looping until
all queues have the same tick as their next message.

Of course, we still pass the first instance of any given tick on to the
reporter as soon as we see it -- now we just continue trying to process
the queue it came from if the next result has the lowest order value.

  • Property mode set to 100644
File size: 8.9 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7/* TODO hook up configuration option for sequentual packets again */
8
9static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
10        int i = 0;
11        assert(trace_get_perpkt_threads(t) > 0);
12        libtrace_queue_t *queues;
13        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
14        queues = c->queues;
15        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
16                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
17        }
18        return 0;
19}
20
21static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
22        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
23        //while (libtrace_deque_get_size(&t->deque) >= 1000)
24        //      sched_yield();
25        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
26
27        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
28                trace_post_reporter(trace);
29        }
30}
31
32inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
33                libtrace_queue_t *v, uint64_t *key, libtrace_result_t *peeked) {
34
35        libtrace_result_t r;
36        if (!peeked) {
37                libtrace_deque_peek_front(v, (void *) &r);
38                peeked = &r;
39        }
40
41        /* Ticks are a bit tricky, because we can get TS
42         * ticks in amongst packets indexed by their cardinal
43         * order and vice versa. Also, every thread will
44         * produce an equivalent tick and we should really
45         * combine those into a single tick for the reporter
46         * thread.
47         */
48
49        if (peeked->type == RESULT_TICK_INTERVAL) {
50                if (peeked->key > c->last_ts_tick) {
51                        c->last_ts_tick = peeked->key;
52
53                        /* Tick doesn't match packet order */
54                        if (!trace_is_parallel(trace)) {
55                                /* Pass straight to reporter */
56                                libtrace_generic_t gt = {.res = peeked};
57                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
58                                send_message(trace, &trace->reporter_thread,
59                                                MESSAGE_RESULT, gt,
60                                                &trace->reporter_thread);
61                                return 0;
62                        }
63                        /* Tick matches packet order */
64                        *key = peeked->key;
65                        return 1;
66
67                } else {
68                        /* Duplicate -- pop it */
69                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
70                        return 0;
71                }
72        }
73
74        if (peeked->type == RESULT_TICK_COUNT) {
75                if (peeked->key > c->last_count_tick) {
76                        c->last_count_tick = peeked->key;
77
78                        /* Tick doesn't match packet order */
79                        if (trace_is_parallel(trace)) {
80                                /* Pass straight to reporter */
81                                libtrace_generic_t gt = {.res = peeked};
82                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
83                                send_message(trace, &trace->reporter_thread,
84                                                MESSAGE_RESULT, gt,
85                                                &trace->reporter_thread);
86                                return 0;
87                        }
88                        /* Tick matches packet order */
89                        *key = peeked->key;
90                        return 1;
91
92                        /* Tick doesn't match packet order */
93                } else {
94                        /* Duplicate -- pop it */
95                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
96                        return 0;
97                }
98        }
99
100        *key = peeked->key;
101        return 1;
102}
103
104inline static uint64_t next_message(libtrace_t *trace, libtrace_combine_t *c,
105                libtrace_queue_t *v) {
106
107        libtrace_result_t r;
108        uint64_t nextkey = 0;
109
110        do {
111                if (libtrace_deque_peek_front(v, (void *) &r) == 0) {
112                        return 0;
113                }
114        } while (peek_queue(trace, c, v, &nextkey, &r) == 0);
115
116        return nextkey;
117}
118
119
120inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
121        int i;
122        int live_count = 0;
123        libtrace_queue_t *queues = c->queues;
124        bool allactive = true;
125        bool live[trace_get_perpkt_threads(trace)]; // Set if a trace is alive
126        uint64_t key[trace_get_perpkt_threads(trace)]; // Cached keys
127        uint64_t min_key = UINT64_MAX;
128        uint64_t prev_min = 0;
129        uint64_t peeked = 0;
130        int min_queue = -1;
131
132        /* Loop through check all are alive (have data) and find the smallest */
133        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
134                libtrace_queue_t *v = &queues[i];
135                if (libtrace_deque_get_size(v) != 0 &&
136                                peek_queue(trace, c, v, &peeked, NULL)) {
137                        live_count ++;
138                        live[i] = true;
139                        key[i] = peeked;
140                        if (i == 0 || min_key > peeked) {
141                                min_key = peeked;
142                                min_queue = i;
143                        }
144                } else {
145                        allactive = false;
146                        live[i] = false;
147                        key[i] = 0;
148                }
149        }
150
151        /* Now remove the smallest and loop - special case if all threads have
152         * joined we always flush what's left. Or the next smallest is the same
153         * value or less than the previous */
154        while (allactive || (live_count && final)) {
155                /* Get the minimum queue and then do stuff */
156                libtrace_result_t r;
157                libtrace_generic_t gt = {.res = &r};
158
159                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
160
161                send_message(trace, &trace->reporter_thread,
162                                MESSAGE_RESULT, gt,
163                                NULL);
164
165                // Now update the one we just removed
166                peeked = next_message(trace, c, &queues[min_queue]);
167                if (peeked != 0) {
168
169                        key[min_queue] = peeked;
170                        // We are still the smallest, might be out of order :(
171                        if (key[min_queue] <= min_key) {
172                                min_key = key[min_queue];
173                        } else {
174                                min_key = key[min_queue]; // Update our minimum
175                                // Check all find the smallest again - all are alive
176                                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
177                                        if (live[i] && min_key >= key[i]) {
178                                                min_key = key[i];
179                                                min_queue = i;
180                                        }
181                                }
182                        }
183                } else {
184                        allactive = false;
185                        live[min_queue] = false;
186                        key[min_queue] = 0;
187                        live_count--;
188                        prev_min = min_key;
189                        min_key = UINT64_MAX; // Update our minimum
190                        min_queue = -1;
191                        // Check all find the smallest again - all are alive
192                        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
193                                if (live[i] && min_key >= key[i]) {
194                                        min_key = key[i];
195                                        min_queue = i;
196                                }
197                        }
198                }
199        }
200}
201
202static void read(libtrace_t *trace, libtrace_combine_t *c) {
203        read_internal(trace, c, false);
204}
205
206static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
207        int empty = 0, i;
208        libtrace_queue_t *q = c->queues;
209
210        do {
211                read_internal(trace, c, true);
212                empty = 0;
213                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
214                        if (libtrace_deque_get_size(&q[i]) == 0)
215                                empty ++;
216                }
217        }
218        while (empty < trace_get_perpkt_threads(trace));
219}
220
221static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
222        int i;
223        libtrace_queue_t *queues = c->queues;
224
225        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
226                assert(libtrace_deque_get_size(&queues[i]) == 0);
227        }
228        free(queues);
229        queues = NULL;
230}
231
232
233static void pause(libtrace_t *trace, libtrace_combine_t *c) {
234        libtrace_queue_t *queues = c->queues;
235        int i;
236        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
237                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
238        }
239}
240
241DLLEXPORT const libtrace_combine_t combiner_ordered = {
242        init_combiner,  /* initialise */
243        destroy,                /* destroy */
244        publish,                /* publish */
245        read,                   /* read */
246        read_final,             /* read_final */
247        pause,                  /* pause */
248        NULL,                   /* queues */
249        0,                      /* last_count_tick */
250        0,                      /* last_ts_tick */
251        {0}                             /* opts */
252};
Note: See TracBrowser for help on using the repository browser.