source: lib/combiner_ordered.c

develop
Last change on this file was 2193905, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Apply changes required for pull request #81

  • Property mode set to 100644
File size: 9.8 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#include "libtrace.h"
29#include "libtrace_int.h"
30#include "data-struct/deque.h"
31#include <assert.h>
32#include <stdlib.h>
33
34/* TODO hook up configuration option for sequentual packets again */
35
36static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
37        int i = 0;
38        if (trace_get_perpkt_threads(t) <= 0) {
39                trace_set_err(t, TRACE_ERR_INIT_FAILED, "You must have atleast 1 processing thread");
40                return -1;
41        }
42        libtrace_queue_t *queues;
43        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
44        queues = c->queues;
45        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
46                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
47        }
48        return 0;
49}
50
51static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
52        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
53        //while (libtrace_deque_get_size(&t->deque) >= 1000)
54        //      sched_yield();
55        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
56
57        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
58                trace_post_reporter(trace);
59        }
60}
61
62inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
63                libtrace_queue_t *v, uint64_t *key, libtrace_result_t *peeked) {
64
65        libtrace_result_t r;
66        if (!peeked) {
67                libtrace_deque_peek_front(v, (void *) &r);
68                peeked = &r;
69        }
70
71        /* Ticks are a bit tricky, because we can get TS
72         * ticks in amongst packets indexed by their cardinal
73         * order and vice versa. Also, every thread will
74         * produce an equivalent tick and we should really
75         * combine those into a single tick for the reporter
76         * thread.
77         */
78
79        if (peeked->type == RESULT_TICK_INTERVAL) {
80                if (peeked->key > c->last_ts_tick) {
81                        c->last_ts_tick = peeked->key;
82
83                        /* Pass straight to reporter */
84                        libtrace_generic_t gt = {.res = peeked};
85                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
86                        send_message(trace, &trace->reporter_thread,
87                                        MESSAGE_RESULT, gt,
88                                        &trace->reporter_thread);
89                        return 0;
90
91                } else {
92                        /* Duplicate -- pop it */
93                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
94                        return 0;
95                }
96        }
97
98        if (peeked->type == RESULT_TICK_COUNT) {
99                if (peeked->key > c->last_count_tick) {
100                        c->last_count_tick = peeked->key;
101
102                        /* Tick doesn't match packet order */
103                        if (trace_is_parallel(trace)) {
104                                /* Pass straight to reporter */
105                                libtrace_generic_t gt = {.res = peeked};
106                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
107                                send_message(trace, &trace->reporter_thread,
108                                                MESSAGE_RESULT, gt,
109                                                &trace->reporter_thread);
110                                return 0;
111                        }
112                        /* Tick matches packet order */
113                        *key = peeked->key;
114                        return 1;
115
116                        /* Tick doesn't match packet order */
117                } else {
118                        /* Duplicate -- pop it */
119                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
120                        return 0;
121                }
122        }
123
124        *key = peeked->key;
125        return 1;
126}
127
128inline static uint64_t next_message(libtrace_t *trace, libtrace_combine_t *c,
129                libtrace_queue_t *v) {
130
131        libtrace_result_t r;
132        uint64_t nextkey = 0;
133
134        do {
135                if (libtrace_deque_peek_front(v, (void *) &r) == 0) {
136                        return 0;
137                }
138        } while (peek_queue(trace, c, v, &nextkey, &r) == 0);
139
140        return nextkey;
141}
142
143
144inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
145        int i;
146        int live_count = 0;
147        libtrace_queue_t *queues = c->queues;
148        bool allactive = true;
149        bool live[trace_get_perpkt_threads(trace)]; // Set if a trace is alive
150        uint64_t key[trace_get_perpkt_threads(trace)]; // Cached keys
151        uint64_t min_key = UINT64_MAX;
152        uint64_t peeked = 0;
153        int min_queue = -1;
154
155        /* Loop through check all are alive (have data) and find the smallest */
156        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
157                libtrace_queue_t *v = &queues[i];
158                if (libtrace_deque_get_size(v) != 0 &&
159                                peek_queue(trace, c, v, &peeked, NULL)) {
160                        live_count ++;
161                        live[i] = true;
162                        key[i] = peeked;
163                        if (i == 0 || min_key > peeked) {
164                                min_key = peeked;
165                                min_queue = i;
166                        }
167                } else {
168                        allactive = false;
169                        live[i] = false;
170                        key[i] = 0;
171                }
172        }
173
174        /* Now remove the smallest and loop - special case if all threads have
175         * joined we always flush what's left. Or the next smallest is the same
176         * value or less than the previous */
177        while (allactive || (live_count && final)) {
178                /* Get the minimum queue and then do stuff */
179                libtrace_result_t r;
180                libtrace_generic_t gt = {.res = &r};
181
182                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
183
184                //printf("%lu %lu %lu %lu %d\n", key[0], key[1], key[2], key[3], min_queue);
185
186                send_message(trace, &trace->reporter_thread,
187                                MESSAGE_RESULT, gt,
188                                NULL);
189
190                // Now update the one we just removed
191                peeked = next_message(trace, c, &queues[min_queue]);
192                if (peeked != 0) {
193
194                        key[min_queue] = peeked;
195                        // We are still the smallest, might be out of order :(
196                        if (key[min_queue] <= min_key) {
197                                min_key = key[min_queue];
198                        } else {
199                                min_key = key[min_queue]; // Update our minimum
200                                // Check all find the smallest again - all are alive
201                                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
202                                        if (live[i] && min_key >= key[i]) {
203                                                min_key = key[i];
204                                                min_queue = i;
205                                        }
206                                }
207                        }
208                } else {
209                        allactive = false;
210                        live[min_queue] = false;
211                        key[min_queue] = 0;
212                        live_count--;
213                        min_key = UINT64_MAX; // Update our minimum
214                        min_queue = -1;
215                        // Check all find the smallest again - all are alive
216                        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
217                                if (live[i] && min_key >= key[i]) {
218                                        min_key = key[i];
219                                        min_queue = i;
220                                }
221                        }
222                }
223        }
224}
225
226static void read(libtrace_t *trace, libtrace_combine_t *c) {
227        read_internal(trace, c, false);
228}
229
230static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
231        int empty = 0, i;
232        libtrace_queue_t *q = c->queues;
233
234        do {
235                read_internal(trace, c, true);
236                empty = 0;
237                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
238                        if (libtrace_deque_get_size(&q[i]) == 0)
239                                empty ++;
240                }
241        }
242        while (empty < trace_get_perpkt_threads(trace));
243}
244
245static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
246        int i;
247        libtrace_queue_t *queues = c->queues;
248
249        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
250                if (libtrace_deque_get_size(&queues[i]) != 0) {
251                        trace_set_err(trace, TRACE_ERR_COMBINER,
252                                "Failed to destroy queues, A thread still has data in destroy()");
253                        return;
254                }
255        }
256        free(queues);
257        queues = NULL;
258}
259
260
261static void pause(libtrace_t *trace, libtrace_combine_t *c) {
262        libtrace_queue_t *queues = c->queues;
263        int i;
264        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
265                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
266        }
267}
268
269DLLEXPORT const libtrace_combine_t combiner_ordered = {
270        init_combiner,  /* initialise */
271        destroy,                /* destroy */
272        publish,                /* publish */
273        read,                   /* read */
274        read_final,             /* read_final */
275        pause,                  /* pause */
276        NULL,                   /* queues */
277        0,                      /* last_count_tick */
278        0,                      /* last_ts_tick */
279        {0}                             /* opts */
280};
Note: See TracBrowser for help on using the repository browser.