source: lib/data-struct/buckets.c @ 21c0d70

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 21c0d70 was 21c0d70, checked in by Shane Alcock <salcock@…>, 6 years ago

Fix memory leak with heavily filtered RT inputs

Bucket buffers that contained entirely filtered packets were never
being freed if they weren't at the front of our bucket list, which would
chew through memory very quickly as soon as we had a non-empty bucket at
the front of the list but not enough packets to result in a batch being
spread amongst the processing threads.

  • Property mode set to 100644
File size: 6.7 KB
Line 
1
2#include <stdlib.h>
3#include <assert.h>
4#include <string.h>
5#include "buckets.h"
6
7#define MAX_OUTSTANDING (200000)
8
9static void clear_bucket_node(void *node) {
10
11        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)node;
12        if (bnode->buffer)
13                free(bnode->buffer);
14        if (bnode->released)
15                free(bnode->released);
16}
17
18DLLEXPORT libtrace_bucket_t *libtrace_bucket_init() {
19
20        libtrace_bucket_t *b = (libtrace_bucket_t *) malloc(sizeof(libtrace_bucket_t));
21
22        b->packets = (libtrace_bucket_node_t **)calloc(MAX_OUTSTANDING + 1,
23                        sizeof(libtrace_bucket_node_t *));
24
25        b->nextid = 199999;
26        b->node = NULL;
27        b->nodelist = libtrace_list_init(sizeof(libtrace_bucket_node_t));
28
29        pthread_mutex_init(&b->lock, NULL);
30        pthread_cond_init(&b->cond, NULL);
31
32        return b;
33
34}
35
36DLLEXPORT void libtrace_bucket_destroy(libtrace_bucket_t *b) {
37
38        pthread_mutex_lock(&b->lock);
39        if (b->node) {
40                clear_bucket_node(b->node);
41                free(b->node);
42        }
43
44        libtrace_list_deinit(b->nodelist);
45        free(b->packets);
46        pthread_mutex_unlock(&b->lock);
47        pthread_mutex_destroy(&b->lock);
48        pthread_cond_destroy(&b->cond);
49        free(b);
50}
51
52DLLEXPORT void libtrace_create_new_bucket(libtrace_bucket_t *b, void *buffer) {
53
54        libtrace_bucket_node_t tmp;
55        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)malloc(
56                        sizeof(libtrace_bucket_node_t));
57
58        /* If the last node was never used, i.e. all packets within that node
59         * buffer were filtered, we need to make sure we free the buffer
60         * before we lose track of it.
61         */
62        pthread_mutex_lock(&b->lock);
63        if (b->node && b->node->startindex == 0) {
64                clear_bucket_node(b->node);
65                libtrace_list_pop_back(b->nodelist, &tmp);
66                free(b->node);
67        }
68        pthread_mutex_unlock(&b->lock);
69
70
71        bnode->startindex = 0;
72        bnode->buffer = buffer;
73        bnode->activemembers = 0;
74        bnode->slots = 10;
75        bnode->released = (uint8_t *)malloc(bnode->slots * sizeof(uint8_t));
76
77        memset(bnode->released, 0, bnode->slots * sizeof(uint8_t));
78
79        pthread_mutex_lock(&b->lock);
80        b->node = bnode;
81
82        libtrace_list_push_back(b->nodelist, &bnode);
83        pthread_mutex_unlock(&b->lock);
84
85}
86
87DLLEXPORT uint64_t libtrace_push_into_bucket(libtrace_bucket_t *b) {
88
89        uint16_t s;
90        uint64_t ret;
91
92        pthread_mutex_lock(&b->lock);
93        if (b->node == NULL) {
94                pthread_mutex_unlock(&b->lock);
95                return 0;
96        }
97
98        if (b->nextid >= MAX_OUTSTANDING)
99                b->nextid = 1;
100        if (b->node->startindex == 0) {
101
102                while (b->packets[b->nextid] != NULL) {
103                        /* No more packet slots available! */
104                        pthread_cond_wait(&b->cond, &b->lock);
105                        pthread_mutex_unlock(&b->lock);
106
107                }
108                b->node->startindex = b->nextid;
109                b->node->activemembers = 1;
110                b->node->released[0] = 1;
111
112                b->packets[b->nextid] = b->node;
113                b->nextid ++;
114                ret = b->node->startindex;
115
116                pthread_mutex_unlock(&b->lock);
117                return ret;
118        }
119
120        if (b->nextid < b->node->startindex) {
121                s = (MAX_OUTSTANDING - b->node->startindex) + b->nextid - 1;
122        } else {
123                s = b->nextid - b->node->startindex;
124        }
125
126        if (s >= b->node->slots) {
127                b->node->slots += 10;
128                b->node->released = (uint8_t *)realloc(b->node->released,
129                                b->node->slots * sizeof(uint8_t));
130
131                memset((b->node->released +
132                                (b->node->slots - 10) * sizeof(uint8_t)), 0,
133                                (10 * sizeof(uint8_t)));
134        }
135
136        while (b->packets[b->nextid] != NULL) {
137                /* No more packet slots available! */
138                pthread_cond_wait(&b->cond, &b->lock);
139                pthread_mutex_unlock(&b->lock);
140
141        }
142        b->packets[b->nextid] = b->node;
143        b->node->activemembers ++;
144        b->node->released[s] = 1;
145        b->nextid ++;
146        ret = b->nextid - 1;
147        pthread_mutex_unlock(&b->lock);
148
149        return ret;
150
151}
152
153DLLEXPORT void libtrace_release_bucket_id(libtrace_bucket_t *b, uint64_t id) {
154
155        uint16_t s, i;
156        libtrace_bucket_node_t *bnode, *front;
157        libtrace_list_node_t *lnode;
158        libtrace_bucket_node_t tmp;
159
160        assert(id != 0);
161
162        pthread_mutex_lock(&b->lock);
163        bnode = b->packets[id];
164        assert(bnode != NULL);
165
166
167        /* Find the right slot */
168        if (id < bnode->startindex) {
169                s = (MAX_OUTSTANDING - bnode->startindex) + id - 1;
170        } else {
171                s = id - bnode->startindex;
172        }
173        assert(s < bnode->slots);
174        assert(bnode->released[s] != 0);
175
176
177        if (bnode->released[s] == 1) {
178                uint64_t previd = b->nextid - 1;
179                if (b->nextid == 1)
180                        previd = MAX_OUTSTANDING - 1;
181
182                if (bnode == b->node && id == previd) {
183                        b->packets[id] = NULL;
184                        b->nextid = previd;
185                        bnode->released[s] = 0;
186                        if (id == bnode->startindex)
187                                bnode->startindex = 0;
188                } else {
189                        bnode->released[s] = 2;
190                }
191                bnode->activemembers -= 1;
192        }
193
194        while (libtrace_list_get_size(b->nodelist) > 1) {
195                lnode = libtrace_list_get_index(b->nodelist, 0);
196
197                front = *(libtrace_bucket_node_t **)lnode->data;
198
199                if (front->activemembers > 0) {
200                        break;
201                }
202                if (front == b->node)
203                        break;
204
205                assert(lnode->next != NULL);
206                for (i = 0; i < front->slots; i++) {
207                        if (front->released[i] == 2) {
208                                int index = i + front->startindex;
209                                if (index >= MAX_OUTSTANDING) {
210                                        index -= (MAX_OUTSTANDING - 1);
211                                }
212                                b->packets[index] = NULL;
213                        }
214                }
215
216                clear_bucket_node(front);
217                libtrace_list_pop_front(b->nodelist, &tmp);
218                free(front);
219                pthread_cond_signal(&b->cond);
220
221        }
222        pthread_mutex_unlock(&b->lock);
223
224}
Note: See TracBrowser for help on using the repository browser.