source: lib/combiner_ordered.c @ ee6e802

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since ee6e802 was ee6e802, checked in by Shane Alcock <salcock@…>, 5 years ago

Updated copyright blurb on all source files

In some cases, this meant adding copyright blurbs to files that
had never had them before.

  • Property mode set to 100644
File size: 9.6 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27
28#include "libtrace.h"
29#include "libtrace_int.h"
30#include "data-struct/deque.h"
31#include <assert.h>
32#include <stdlib.h>
33
34/* TODO hook up configuration option for sequentual packets again */
35
36static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
37        int i = 0;
38        assert(trace_get_perpkt_threads(t) > 0);
39        libtrace_queue_t *queues;
40        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
41        queues = c->queues;
42        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
43                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
44        }
45        return 0;
46}
47
48static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
49        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
50        //while (libtrace_deque_get_size(&t->deque) >= 1000)
51        //      sched_yield();
52        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
53
54        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
55                trace_post_reporter(trace);
56        }
57}
58
59inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
60                libtrace_queue_t *v, uint64_t *key, libtrace_result_t *peeked) {
61
62        libtrace_result_t r;
63        if (!peeked) {
64                libtrace_deque_peek_front(v, (void *) &r);
65                peeked = &r;
66        }
67
68        /* Ticks are a bit tricky, because we can get TS
69         * ticks in amongst packets indexed by their cardinal
70         * order and vice versa. Also, every thread will
71         * produce an equivalent tick and we should really
72         * combine those into a single tick for the reporter
73         * thread.
74         */
75
76        if (peeked->type == RESULT_TICK_INTERVAL) {
77                if (peeked->key > c->last_ts_tick) {
78                        c->last_ts_tick = peeked->key;
79
80                        /* Pass straight to reporter */
81                        libtrace_generic_t gt = {.res = peeked};
82                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
83                        send_message(trace, &trace->reporter_thread,
84                                        MESSAGE_RESULT, gt,
85                                        &trace->reporter_thread);
86                        return 0;
87
88                } else {
89                        /* Duplicate -- pop it */
90                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
91                        return 0;
92                }
93        }
94
95        if (peeked->type == RESULT_TICK_COUNT) {
96                if (peeked->key > c->last_count_tick) {
97                        c->last_count_tick = peeked->key;
98
99                        /* Tick doesn't match packet order */
100                        if (trace_is_parallel(trace)) {
101                                /* Pass straight to reporter */
102                                libtrace_generic_t gt = {.res = peeked};
103                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
104                                send_message(trace, &trace->reporter_thread,
105                                                MESSAGE_RESULT, gt,
106                                                &trace->reporter_thread);
107                                return 0;
108                        }
109                        /* Tick matches packet order */
110                        *key = peeked->key;
111                        return 1;
112
113                        /* Tick doesn't match packet order */
114                } else {
115                        /* Duplicate -- pop it */
116                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
117                        return 0;
118                }
119        }
120
121        *key = peeked->key;
122        return 1;
123}
124
125inline static uint64_t next_message(libtrace_t *trace, libtrace_combine_t *c,
126                libtrace_queue_t *v) {
127
128        libtrace_result_t r;
129        uint64_t nextkey = 0;
130
131        do {
132                if (libtrace_deque_peek_front(v, (void *) &r) == 0) {
133                        return 0;
134                }
135        } while (peek_queue(trace, c, v, &nextkey, &r) == 0);
136
137        return nextkey;
138}
139
140
141inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
142        int i;
143        int live_count = 0;
144        libtrace_queue_t *queues = c->queues;
145        bool allactive = true;
146        bool live[trace_get_perpkt_threads(trace)]; // Set if a trace is alive
147        uint64_t key[trace_get_perpkt_threads(trace)]; // Cached keys
148        uint64_t min_key = UINT64_MAX;
149        uint64_t peeked = 0;
150        int min_queue = -1;
151
152        /* Loop through check all are alive (have data) and find the smallest */
153        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
154                libtrace_queue_t *v = &queues[i];
155                if (libtrace_deque_get_size(v) != 0 &&
156                                peek_queue(trace, c, v, &peeked, NULL)) {
157                        live_count ++;
158                        live[i] = true;
159                        key[i] = peeked;
160                        if (i == 0 || min_key > peeked) {
161                                min_key = peeked;
162                                min_queue = i;
163                        }
164                } else {
165                        allactive = false;
166                        live[i] = false;
167                        key[i] = 0;
168                }
169        }
170
171        /* Now remove the smallest and loop - special case if all threads have
172         * joined we always flush what's left. Or the next smallest is the same
173         * value or less than the previous */
174        while (allactive || (live_count && final)) {
175                /* Get the minimum queue and then do stuff */
176                libtrace_result_t r;
177                libtrace_generic_t gt = {.res = &r};
178
179                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
180
181                //printf("%lu %lu %lu %lu %d\n", key[0], key[1], key[2], key[3], min_queue);
182
183                send_message(trace, &trace->reporter_thread,
184                                MESSAGE_RESULT, gt,
185                                NULL);
186
187                // Now update the one we just removed
188                peeked = next_message(trace, c, &queues[min_queue]);
189                if (peeked != 0) {
190
191                        key[min_queue] = peeked;
192                        // We are still the smallest, might be out of order :(
193                        if (key[min_queue] <= min_key) {
194                                min_key = key[min_queue];
195                        } else {
196                                min_key = key[min_queue]; // Update our minimum
197                                // Check all find the smallest again - all are alive
198                                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
199                                        if (live[i] && min_key >= key[i]) {
200                                                min_key = key[i];
201                                                min_queue = i;
202                                        }
203                                }
204                        }
205                } else {
206                        allactive = false;
207                        live[min_queue] = false;
208                        key[min_queue] = 0;
209                        live_count--;
210                        min_key = UINT64_MAX; // Update our minimum
211                        min_queue = -1;
212                        // Check all find the smallest again - all are alive
213                        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
214                                if (live[i] && min_key >= key[i]) {
215                                        min_key = key[i];
216                                        min_queue = i;
217                                }
218                        }
219                }
220        }
221}
222
223static void read(libtrace_t *trace, libtrace_combine_t *c) {
224        read_internal(trace, c, false);
225}
226
227static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
228        int empty = 0, i;
229        libtrace_queue_t *q = c->queues;
230
231        do {
232                read_internal(trace, c, true);
233                empty = 0;
234                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
235                        if (libtrace_deque_get_size(&q[i]) == 0)
236                                empty ++;
237                }
238        }
239        while (empty < trace_get_perpkt_threads(trace));
240}
241
242static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
243        int i;
244        libtrace_queue_t *queues = c->queues;
245
246        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
247                assert(libtrace_deque_get_size(&queues[i]) == 0);
248        }
249        free(queues);
250        queues = NULL;
251}
252
253
254static void pause(libtrace_t *trace, libtrace_combine_t *c) {
255        libtrace_queue_t *queues = c->queues;
256        int i;
257        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
258                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
259        }
260}
261
262DLLEXPORT const libtrace_combine_t combiner_ordered = {
263        init_combiner,  /* initialise */
264        destroy,                /* destroy */
265        publish,                /* publish */
266        read,                   /* read */
267        read_final,             /* read_final */
268        pause,                  /* pause */
269        NULL,                   /* queues */
270        0,                      /* last_count_tick */
271        0,                      /* last_ts_tick */
272        {0}                             /* opts */
273};
Note: See TracBrowser for help on using the repository browser.