source: lib/combiner_ordered.c @ 21c0d70

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 21c0d70 was 21c0d70, checked in by Shane Alcock <salcock@…>, 6 years ago

Fix memory leak with heavily filtered RT inputs

Bucket buffers that contained entirely filtered packets were never
being freed if they weren't at the front of our bucket list, which would
chew through memory very quickly as soon as we had a non-empty bucket at
the front of the list but not enough packets to result in a batch being
spread amongst the processing threads.

  • Property mode set to 100644
File size: 8.9 KB
Line 
1#include "libtrace.h"
2#include "libtrace_int.h"
3#include "data-struct/deque.h"
4#include <assert.h>
5#include <stdlib.h>
6
7/* TODO hook up configuration option for sequentual packets again */
8
9static int init_combiner(libtrace_t *t, libtrace_combine_t *c) {
10        int i = 0;
11        assert(trace_get_perpkt_threads(t) > 0);
12        libtrace_queue_t *queues;
13        c->queues = calloc(sizeof(libtrace_queue_t), trace_get_perpkt_threads(t));
14        queues = c->queues;
15        for (i = 0; i < trace_get_perpkt_threads(t); ++i) {
16                libtrace_deque_init(&queues[i], sizeof(libtrace_result_t));
17        }
18        return 0;
19}
20
21static void publish(libtrace_t *trace, int t_id, libtrace_combine_t *c, libtrace_result_t *res) {
22        libtrace_queue_t *queue = &((libtrace_queue_t*)c->queues)[t_id];
23        //while (libtrace_deque_get_size(&t->deque) >= 1000)
24        //      sched_yield();
25        libtrace_deque_push_back(queue, res); // Automatically locking for us :)
26
27        if (libtrace_deque_get_size(queue) >= trace->config.reporter_thold) {
28                trace_post_reporter(trace);
29        }
30}
31
32inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c,
33                libtrace_queue_t *v, uint64_t *key, libtrace_result_t *peeked) {
34
35        libtrace_result_t r;
36        if (!peeked) {
37                libtrace_deque_peek_front(v, (void *) &r);
38                peeked = &r;
39        }
40
41        /* Ticks are a bit tricky, because we can get TS
42         * ticks in amongst packets indexed by their cardinal
43         * order and vice versa. Also, every thread will
44         * produce an equivalent tick and we should really
45         * combine those into a single tick for the reporter
46         * thread.
47         */
48
49        if (peeked->type == RESULT_TICK_INTERVAL) {
50                if (peeked->key > c->last_ts_tick) {
51                        c->last_ts_tick = peeked->key;
52
53                        /* Tick doesn't match packet order */
54                        if (!trace_is_parallel(trace)) {
55                                /* Pass straight to reporter */
56                                libtrace_generic_t gt = {.res = peeked};
57                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
58                                send_message(trace, &trace->reporter_thread,
59                                                MESSAGE_RESULT, gt,
60                                                &trace->reporter_thread);
61                                return 0;
62                        }
63                        /* Tick matches packet order */
64                        *key = peeked->key;
65                        return 1;
66
67                } else {
68                        /* Duplicate -- pop it */
69                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
70                        return 0;
71                }
72        }
73
74        if (peeked->type == RESULT_TICK_COUNT) {
75                if (peeked->key > c->last_count_tick) {
76                        c->last_count_tick = peeked->key;
77
78                        /* Tick doesn't match packet order */
79                        if (trace_is_parallel(trace)) {
80                                /* Pass straight to reporter */
81                                libtrace_generic_t gt = {.res = peeked};
82                                ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
83                                send_message(trace, &trace->reporter_thread,
84                                                MESSAGE_RESULT, gt,
85                                                &trace->reporter_thread);
86                                return 0;
87                        }
88                        /* Tick matches packet order */
89                        *key = peeked->key;
90                        return 1;
91
92                        /* Tick doesn't match packet order */
93                } else {
94                        /* Duplicate -- pop it */
95                        ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1);
96                        return 0;
97                }
98        }
99
100        *key = peeked->key;
101        return 1;
102}
103
104inline static uint64_t next_message(libtrace_t *trace, libtrace_combine_t *c,
105                libtrace_queue_t *v) {
106
107        libtrace_result_t r;
108        uint64_t nextkey = 0;
109
110        do {
111                if (libtrace_deque_peek_front(v, (void *) &r) == 0) {
112                        return 0;
113                }
114        } while (peek_queue(trace, c, v, &nextkey, &r) == 0);
115
116        return nextkey;
117}
118
119
120inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){
121        int i;
122        int live_count = 0;
123        libtrace_queue_t *queues = c->queues;
124        bool allactive = true;
125        bool live[trace_get_perpkt_threads(trace)]; // Set if a trace is alive
126        uint64_t key[trace_get_perpkt_threads(trace)]; // Cached keys
127        uint64_t min_key = UINT64_MAX;
128        uint64_t peeked = 0;
129        int min_queue = -1;
130
131        /* Loop through check all are alive (have data) and find the smallest */
132        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
133                libtrace_queue_t *v = &queues[i];
134                if (libtrace_deque_get_size(v) != 0 &&
135                                peek_queue(trace, c, v, &peeked, NULL)) {
136                        live_count ++;
137                        live[i] = true;
138                        key[i] = peeked;
139                        if (i == 0 || min_key > peeked) {
140                                min_key = peeked;
141                                min_queue = i;
142                        }
143                } else {
144                        allactive = false;
145                        live[i] = false;
146                        key[i] = 0;
147                }
148        }
149
150        /* Now remove the smallest and loop - special case if all threads have
151         * joined we always flush what's left. Or the next smallest is the same
152         * value or less than the previous */
153        while (allactive || (live_count && final)) {
154                /* Get the minimum queue and then do stuff */
155                libtrace_result_t r;
156                libtrace_generic_t gt = {.res = &r};
157
158                ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1);
159
160                send_message(trace, &trace->reporter_thread,
161                                MESSAGE_RESULT, gt,
162                                NULL);
163
164                // Now update the one we just removed
165                peeked = next_message(trace, c, &queues[min_queue]);
166                if (peeked != 0) {
167
168                        key[min_queue] = peeked;
169                        // We are still the smallest, might be out of order :(
170                        if (key[min_queue] <= min_key) {
171                                min_key = key[min_queue];
172                        } else {
173                                min_key = key[min_queue]; // Update our minimum
174                                // Check all find the smallest again - all are alive
175                                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
176                                        if (live[i] && min_key >= key[i]) {
177                                                min_key = key[i];
178                                                min_queue = i;
179                                        }
180                                }
181                        }
182                } else {
183                        allactive = false;
184                        live[min_queue] = false;
185                        key[min_queue] = 0;
186                        live_count--;
187                        min_key = UINT64_MAX; // Update our minimum
188                        min_queue = -1;
189                        // Check all find the smallest again - all are alive
190                        for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
191                                if (live[i] && min_key >= key[i]) {
192                                        min_key = key[i];
193                                        min_queue = i;
194                                }
195                        }
196                }
197        }
198}
199
200static void read(libtrace_t *trace, libtrace_combine_t *c) {
201        read_internal(trace, c, false);
202}
203
204static void read_final(libtrace_t *trace, libtrace_combine_t *c) {
205        int empty = 0, i;
206        libtrace_queue_t *q = c->queues;
207
208        do {
209                read_internal(trace, c, true);
210                empty = 0;
211                for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
212                        if (libtrace_deque_get_size(&q[i]) == 0)
213                                empty ++;
214                }
215        }
216        while (empty < trace_get_perpkt_threads(trace));
217}
218
219static void destroy(libtrace_t *trace, libtrace_combine_t *c) {
220        int i;
221        libtrace_queue_t *queues = c->queues;
222
223        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
224                assert(libtrace_deque_get_size(&queues[i]) == 0);
225        }
226        free(queues);
227        queues = NULL;
228}
229
230
231static void pause(libtrace_t *trace, libtrace_combine_t *c) {
232        libtrace_queue_t *queues = c->queues;
233        int i;
234        for (i = 0; i < trace_get_perpkt_threads(trace); i++) {
235                libtrace_deque_apply_function(&queues[i], (deque_data_fn) libtrace_make_result_safe);
236        }
237}
238
239DLLEXPORT const libtrace_combine_t combiner_ordered = {
240        init_combiner,  /* initialise */
241        destroy,                /* destroy */
242        publish,                /* publish */
243        read,                   /* read */
244        read_final,             /* read_final */
245        pause,                  /* pause */
246        NULL,                   /* queues */
247        0,                      /* last_count_tick */
248        0,                      /* last_ts_tick */
249        {0}                             /* opts */
250};
Note: See TracBrowser for help on using the repository browser.