source: lib/data-struct/buckets.c @ 2193905

develop
Last change on this file since 2193905 was 2193905, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

Apply changes required for pull request #81

  • Property mode set to 100644
File size: 8.0 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#include <stdlib.h>
28#include <string.h>
29#include "buckets.h"
30
31#define MAX_OUTSTANDING (200000)
32
33static void clear_bucket_node(void *node) {
34
35        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)node;
36        if (bnode->buffer)
37                free(bnode->buffer);
38        if (bnode->released)
39                free(bnode->released);
40}
41
42DLLEXPORT libtrace_bucket_t *libtrace_bucket_init() {
43
44        libtrace_bucket_t *b = (libtrace_bucket_t *) malloc(sizeof(libtrace_bucket_t));
45
46        b->packets = (libtrace_bucket_node_t **)calloc(MAX_OUTSTANDING + 1,
47                        sizeof(libtrace_bucket_node_t *));
48
49        b->nextid = 199999;
50        b->node = NULL;
51        b->nodelist = libtrace_list_init(sizeof(libtrace_bucket_node_t));
52
53        pthread_mutex_init(&b->lock, NULL);
54        pthread_cond_init(&b->cond, NULL);
55
56        return b;
57
58}
59
60DLLEXPORT void libtrace_bucket_destroy(libtrace_bucket_t *b) {
61
62        pthread_mutex_lock(&b->lock);
63        if (b->node) {
64                clear_bucket_node(b->node);
65                free(b->node);
66        }
67
68        libtrace_list_deinit(b->nodelist);
69        free(b->packets);
70        pthread_mutex_unlock(&b->lock);
71        pthread_mutex_destroy(&b->lock);
72        pthread_cond_destroy(&b->cond);
73        free(b);
74}
75
76DLLEXPORT void libtrace_create_new_bucket(libtrace_bucket_t *b, void *buffer) {
77
78        libtrace_bucket_node_t tmp;
79        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)malloc(
80                        sizeof(libtrace_bucket_node_t));
81
82        /* If the last node was never used, i.e. all packets within that node
83         * buffer were filtered, we need to make sure we free the buffer
84         * before we lose track of it.
85         */
86        pthread_mutex_lock(&b->lock);
87        if (b->node && b->node->startindex == 0) {
88                clear_bucket_node(b->node);
89                libtrace_list_pop_back(b->nodelist, &tmp);
90                free(b->node);
91        }
92        pthread_mutex_unlock(&b->lock);
93
94
95        bnode->startindex = 0;
96        bnode->buffer = buffer;
97        bnode->activemembers = 0;
98        bnode->slots = 10;
99        bnode->released = (uint8_t *)malloc(bnode->slots * sizeof(uint8_t));
100
101        memset(bnode->released, 0, bnode->slots * sizeof(uint8_t));
102
103        pthread_mutex_lock(&b->lock);
104        b->node = bnode;
105
106        libtrace_list_push_back(b->nodelist, &bnode);
107        pthread_mutex_unlock(&b->lock);
108
109}
110
111DLLEXPORT uint64_t libtrace_push_into_bucket(libtrace_bucket_t *b) {
112
113        uint16_t s;
114        uint64_t ret;
115
116        pthread_mutex_lock(&b->lock);
117        if (b->node == NULL) {
118                pthread_mutex_unlock(&b->lock);
119                return 0;
120        }
121
122        if (b->nextid >= MAX_OUTSTANDING)
123                b->nextid = 1;
124        if (b->node->startindex == 0) {
125
126                while (b->packets[b->nextid] != NULL) {
127                        /* No more packet slots available! */
128                        pthread_cond_wait(&b->cond, &b->lock);
129                        pthread_mutex_unlock(&b->lock);
130
131                }
132                b->node->startindex = b->nextid;
133                b->node->activemembers = 1;
134                b->node->released[0] = 1;
135
136                b->packets[b->nextid] = b->node;
137                b->nextid ++;
138                ret = b->node->startindex;
139
140                pthread_mutex_unlock(&b->lock);
141                return ret;
142        }
143
144        if (b->nextid < b->node->startindex) {
145                s = (MAX_OUTSTANDING - b->node->startindex) + b->nextid - 1;
146        } else {
147                s = b->nextid - b->node->startindex;
148        }
149
150        if (s >= b->node->slots) {
151                b->node->slots += 10;
152                b->node->released = (uint8_t *)realloc(b->node->released,
153                                b->node->slots * sizeof(uint8_t));
154
155                memset((b->node->released +
156                                (b->node->slots - 10) * sizeof(uint8_t)), 0,
157                                (10 * sizeof(uint8_t)));
158        }
159
160        while (b->packets[b->nextid] != NULL) {
161                /* No more packet slots available! */
162                pthread_cond_wait(&b->cond, &b->lock);
163                pthread_mutex_unlock(&b->lock);
164
165        }
166        b->packets[b->nextid] = b->node;
167        b->node->activemembers ++;
168        b->node->released[s] = 1;
169        b->nextid ++;
170        ret = b->nextid - 1;
171        pthread_mutex_unlock(&b->lock);
172
173        return ret;
174
175}
176
177DLLEXPORT void libtrace_release_bucket_id(libtrace_bucket_t *b, uint64_t id) {
178
179        uint16_t s, i;
180        libtrace_bucket_node_t *bnode, *front;
181        libtrace_list_node_t *lnode;
182        libtrace_bucket_node_t tmp;
183
184        if (id == 0) {
185                fprintf(stderr, "bucket ID cannot be 0 in libtrace_release_bucket_id()\n");
186                return;
187        }
188
189        pthread_mutex_lock(&b->lock);
190        bnode = b->packets[id];
191        if (!bnode) {
192                fprintf(stderr, "bucket ID %lu is NULL in libtrace_release_bucket_id()\n", id);
193                return;
194        }
195
196
197        /* Find the right slot */
198        if (id < bnode->startindex) {
199                s = (MAX_OUTSTANDING - bnode->startindex) + id - 1;
200        } else {
201                s = id - bnode->startindex;
202        }
203        if (s >= bnode->slots) {
204                fprintf(stderr, "Error in libtrace_release_bucket_id()\n");
205                return;
206        }
207        if (bnode->released[s] == 0) {
208                fprintf(stderr, "Error in libtrace_release_bucket_id()\n");
209                return;
210        }
211
212
213        if (bnode->released[s] == 1) {
214                uint64_t previd = b->nextid - 1;
215                if (b->nextid == 1)
216                        previd = MAX_OUTSTANDING - 1;
217
218                if (bnode == b->node && id == previd) {
219                        b->packets[id] = NULL;
220                        b->nextid = previd;
221                        bnode->released[s] = 0;
222                        if (id == bnode->startindex)
223                                bnode->startindex = 0;
224                } else {
225                        bnode->released[s] = 2;
226                }
227                bnode->activemembers -= 1;
228        }
229
230        while (libtrace_list_get_size(b->nodelist) > 1) {
231                lnode = libtrace_list_get_index(b->nodelist, 0);
232
233                front = *(libtrace_bucket_node_t **)lnode->data;
234
235                if (front->activemembers > 0) {
236                        break;
237                }
238                if (front == b->node)
239                        break;
240
241                if (!lnode->next) {
242                        fprintf(stderr, "Error in libtrace_release_bucket_id()\n");
243                        return;
244                }
245                for (i = 0; i < front->slots; i++) {
246                        if (front->released[i] == 2) {
247                                int index = i + front->startindex;
248                                if (index >= MAX_OUTSTANDING) {
249                                        index -= (MAX_OUTSTANDING - 1);
250                                }
251                                b->packets[index] = NULL;
252                        }
253                }
254
255                clear_bucket_node(front);
256                libtrace_list_pop_front(b->nodelist, &tmp);
257                free(front);
258                pthread_cond_signal(&b->cond);
259
260        }
261        pthread_mutex_unlock(&b->lock);
262
263}
Note: See TracBrowser for help on using the repository browser.