source: lib/combiner_ordered.c @ 733c8b4

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 733c8b4 was 733c8b4, checked in by Shane Alcock <salcock@…>, 6 years ago

Always automatically push TS ticks to the reporter

This now happens even if the packet ordering is based on timestamps,
as the ticks are not necessarily in the right order relative to other
packets in the queue.

  • Property mode set to 100644
File size: 8.7 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7/* TODO hook up configuration option for sequentual packets again */
8
9static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
10        int i = 0;
11        assert(trace_get_perpkt_threads(t) > 0);
12        libtrace_queue_t *queues;
13        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
14        queues = c->queues;
15        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
16                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
17        }
18        return 0;
19}
20
21static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
22        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
23        //while (libtrace_deque_get_size(&t->deque) >= 1000)
24        //      sched_yield();
25        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
26
27        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
28                trace_post_reporter(trace);
29        }
30}
31
32inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
33                libtrace_queue_t *v, uint64_t *key, libtrace_result_t *peeked) {
34
35        libtrace_result_t r;
36        if (!peeked) {
37                libtrace_deque_peek_front(v, (void *) &r);
38                peeked = &r;
39        }
40
41        /* Ticks are a bit tricky, because we can get TS
42         * ticks in amongst packets indexed by their cardinal
43         * order and vice versa. Also, every thread will
44         * produce an equivalent tick and we should really
45         * combine those into a single tick for the reporter
46         * thread.
47         */
48
49        if (peeked->type == RESULT_TICK_INTERVAL) {
50                if (peeked->key > c->last_ts_tick) {
51                        c->last_ts_tick = peeked->key;
52
53                        /* Pass straight to reporter */
54                        libtrace_generic_t gt = {.res = peeked};
55                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
56                        send_message(trace, &trace->reporter_thread,
57                                        MESSAGE_RESULT, gt,
58                                        &trace->reporter_thread);
59                        return 0;
60
61                } else {
62                        /* Duplicate -- pop it */
63                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
64                        return 0;
65                }
66        }
67
68        if (peeked->type == RESULT_TICK_COUNT) {
69                if (peeked->key > c->last_count_tick) {
70                        c->last_count_tick = peeked->key;
71
72                        /* Tick doesn't match packet order */
73                        if (trace_is_parallel(trace)) {
74                                /* Pass straight to reporter */
75                                libtrace_generic_t gt = {.res = peeked};
76                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
77                                send_message(trace, &trace->reporter_thread,
78                                                MESSAGE_RESULT, gt,
79                                                &trace->reporter_thread);
80                                return 0;
81                        }
82                        /* Tick matches packet order */
83                        *key = peeked->key;
84                        return 1;
85
86                        /* Tick doesn't match packet order */
87                } else {
88                        /* Duplicate -- pop it */
89                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
90                        return 0;
91                }
92        }
93
94        *key = peeked->key;
95        return 1;
96}
97
98inline static uint64_t next_message(libtrace_t *trace, libtrace_combine_t *c,
99                libtrace_queue_t *v) {
100
101        libtrace_result_t r;
102        uint64_t nextkey = 0;
103
104        do {
105                if (libtrace_deque_peek_front(v, (void *) &r) == 0) {
106                        return 0;
107                }
108        } while (peek_queue(trace, c, v, &nextkey, &r) == 0);
109
110        return nextkey;
111}
112
113
114inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
115        int i;
116        int live_count = 0;
117        libtrace_queue_t *queues = c->queues;
118        bool allactive = true;
119        bool live[trace_get_perpkt_threads(trace)]; // Set if a trace is alive
120        uint64_t key[trace_get_perpkt_threads(trace)]; // Cached keys
121        uint64_t min_key = UINT64_MAX;
122        uint64_t peeked = 0;
123        int min_queue = -1;
124
125        /* Loop through check all are alive (have data) and find the smallest */
126        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
127                libtrace_queue_t *v = &queues[i];
128                if (libtrace_deque_get_size(v) != 0 &&
129                                peek_queue(trace, c, v, &peeked, NULL)) {
130                        live_count ++;
131                        live[i] = true;
132                        key[i] = peeked;
133                        if (i == 0 || min_key > peeked) {
134                                min_key = peeked;
135                                min_queue = i;
136                        }
137                } else {
138                        allactive = false;
139                        live[i] = false;
140                        key[i] = 0;
141                }
142        }
143
144        /* Now remove the smallest and loop - special case if all threads have
145         * joined we always flush what's left. Or the next smallest is the same
146         * value or less than the previous */
147        while (allactive || (live_count && final)) {
148                /* Get the minimum queue and then do stuff */
149                libtrace_result_t r;
150                libtrace_generic_t gt = {.res = &r};
151
152                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
153
154                //printf("%lu %lu %lu %lu %d\n", key[0], key[1], key[2], key[3], min_queue);
155
156                send_message(trace, &trace->reporter_thread,
157                                MESSAGE_RESULT, gt,
158                                NULL);
159
160                // Now update the one we just removed
161                peeked = next_message(trace, c, &queues[min_queue]);
162                if (peeked != 0) {
163
164                        key[min_queue] = peeked;
165                        // We are still the smallest, might be out of order :(
166                        if (key[min_queue] <= min_key) {
167                                min_key = key[min_queue];
168                        } else {
169                                min_key = key[min_queue]; // Update our minimum
170                                // Check all find the smallest again - all are alive
171                                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
172                                        if (live[i] && min_key >= key[i]) {
173                                                min_key = key[i];
174                                                min_queue = i;
175                                        }
176                                }
177                        }
178                } else {
179                        allactive = false;
180                        live[min_queue] = false;
181                        key[min_queue] = 0;
182                        live_count--;
183                        min_key = UINT64_MAX; // Update our minimum
184                        min_queue = -1;
185                        // Check all find the smallest again - all are alive
186                        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
187                                if (live[i] && min_key >= key[i]) {
188                                        min_key = key[i];
189                                        min_queue = i;
190                                }
191                        }
192                }
193        }
194}
195
196static void read(libtrace_t *trace, libtrace_combine_t *c) {
197        read_internal(trace, c, false);
198}
199
200static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
201        int empty = 0, i;
202        libtrace_queue_t *q = c->queues;
203
204        do {
205                read_internal(trace, c, true);
206                empty = 0;
207                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
208                        if (libtrace_deque_get_size(&q[i]) == 0)
209                                empty ++;
210                }
211        }
212        while (empty < trace_get_perpkt_threads(trace));
213}
214
215static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
216        int i;
217        libtrace_queue_t *queues = c->queues;
218
219        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
220                assert(libtrace_deque_get_size(&queues[i]) == 0);
221        }
222        free(queues);
223        queues = NULL;
224}
225
226
227static void pause(libtrace_t *trace, libtrace_combine_t *c) {
228        libtrace_queue_t *queues = c->queues;
229        int i;
230        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
231                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
232        }
233}
234
235DLLEXPORT const libtrace_combine_t combiner_ordered = {
236        init_combiner,  /* initialise */
237        destroy,                /* destroy */
238        publish,                /* publish */
239        read,                   /* read */
240        read_final,             /* read_final */
241        pause,                  /* pause */
242        NULL,                   /* queues */
243        0,                      /* last_count_tick */
244        0,                      /* last_ts_tick */
245        {0}                             /* opts */
246};
Note: See TracBrowser for help on using the repository browser.