source: lib/data-struct/buckets.c @ ee6e802

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since ee6e802 was ee6e802, checked in by Shane Alcock <salcock@…>, 4 years ago

Updated copyright blurb on all source files

In some cases, this meant adding copyright blurbs to files that
had never had them before.

  • Property mode set to 100644
File size: 7.6 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#include <stdlib.h>
28#include <assert.h>
29#include <string.h>
30#include "buckets.h"
31
32#define MAX_OUTSTANDING (200000)
33
34static void clear_bucket_node(void *node) {
35
36        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)node;
37        if (bnode->buffer)
38                free(bnode->buffer);
39        if (bnode->released)
40                free(bnode->released);
41}
42
43DLLEXPORT libtrace_bucket_t *libtrace_bucket_init() {
44
45        libtrace_bucket_t *b = (libtrace_bucket_t *) malloc(sizeof(libtrace_bucket_t));
46
47        b->packets = (libtrace_bucket_node_t **)calloc(MAX_OUTSTANDING + 1,
48                        sizeof(libtrace_bucket_node_t *));
49
50        b->nextid = 199999;
51        b->node = NULL;
52        b->nodelist = libtrace_list_init(sizeof(libtrace_bucket_node_t));
53
54        pthread_mutex_init(&b->lock, NULL);
55        pthread_cond_init(&b->cond, NULL);
56
57        return b;
58
59}
60
61DLLEXPORT void libtrace_bucket_destroy(libtrace_bucket_t *b) {
62
63        pthread_mutex_lock(&b->lock);
64        if (b->node) {
65                clear_bucket_node(b->node);
66                free(b->node);
67        }
68
69        libtrace_list_deinit(b->nodelist);
70        free(b->packets);
71        pthread_mutex_unlock(&b->lock);
72        pthread_mutex_destroy(&b->lock);
73        pthread_cond_destroy(&b->cond);
74        free(b);
75}
76
77DLLEXPORT void libtrace_create_new_bucket(libtrace_bucket_t *b, void *buffer) {
78
79        libtrace_bucket_node_t tmp;
80        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)malloc(
81                        sizeof(libtrace_bucket_node_t));
82
83        /* If the last node was never used, i.e. all packets within that node
84         * buffer were filtered, we need to make sure we free the buffer
85         * before we lose track of it.
86         */
87        pthread_mutex_lock(&b->lock);
88        if (b->node && b->node->startindex == 0) {
89                clear_bucket_node(b->node);
90                libtrace_list_pop_back(b->nodelist, &tmp);
91                free(b->node);
92        }
93        pthread_mutex_unlock(&b->lock);
94
95
96        bnode->startindex = 0;
97        bnode->buffer = buffer;
98        bnode->activemembers = 0;
99        bnode->slots = 10;
100        bnode->released = (uint8_t *)malloc(bnode->slots * sizeof(uint8_t));
101
102        memset(bnode->released, 0, bnode->slots * sizeof(uint8_t));
103
104        pthread_mutex_lock(&b->lock);
105        b->node = bnode;
106
107        libtrace_list_push_back(b->nodelist, &bnode);
108        pthread_mutex_unlock(&b->lock);
109
110}
111
112DLLEXPORT uint64_t libtrace_push_into_bucket(libtrace_bucket_t *b) {
113
114        uint16_t s;
115        uint64_t ret;
116
117        pthread_mutex_lock(&b->lock);
118        if (b->node == NULL) {
119                pthread_mutex_unlock(&b->lock);
120                return 0;
121        }
122
123        if (b->nextid >= MAX_OUTSTANDING)
124                b->nextid = 1;
125        if (b->node->startindex == 0) {
126
127                while (b->packets[b->nextid] != NULL) {
128                        /* No more packet slots available! */
129                        pthread_cond_wait(&b->cond, &b->lock);
130                        pthread_mutex_unlock(&b->lock);
131
132                }
133                b->node->startindex = b->nextid;
134                b->node->activemembers = 1;
135                b->node->released[0] = 1;
136
137                b->packets[b->nextid] = b->node;
138                b->nextid ++;
139                ret = b->node->startindex;
140
141                pthread_mutex_unlock(&b->lock);
142                return ret;
143        }
144
145        if (b->nextid < b->node->startindex) {
146                s = (MAX_OUTSTANDING - b->node->startindex) + b->nextid - 1;
147        } else {
148                s = b->nextid - b->node->startindex;
149        }
150
151        if (s >= b->node->slots) {
152                b->node->slots += 10;
153                b->node->released = (uint8_t *)realloc(b->node->released,
154                                b->node->slots * sizeof(uint8_t));
155
156                memset((b->node->released +
157                                (b->node->slots - 10) * sizeof(uint8_t)), 0,
158                                (10 * sizeof(uint8_t)));
159        }
160
161        while (b->packets[b->nextid] != NULL) {
162                /* No more packet slots available! */
163                pthread_cond_wait(&b->cond, &b->lock);
164                pthread_mutex_unlock(&b->lock);
165
166        }
167        b->packets[b->nextid] = b->node;
168        b->node->activemembers ++;
169        b->node->released[s] = 1;
170        b->nextid ++;
171        ret = b->nextid - 1;
172        pthread_mutex_unlock(&b->lock);
173
174        return ret;
175
176}
177
178DLLEXPORT void libtrace_release_bucket_id(libtrace_bucket_t *b, uint64_t id) {
179
180        uint16_t s, i;
181        libtrace_bucket_node_t *bnode, *front;
182        libtrace_list_node_t *lnode;
183        libtrace_bucket_node_t tmp;
184
185        assert(id != 0);
186
187        pthread_mutex_lock(&b->lock);
188        bnode = b->packets[id];
189        assert(bnode != NULL);
190
191
192        /* Find the right slot */
193        if (id < bnode->startindex) {
194                s = (MAX_OUTSTANDING - bnode->startindex) + id - 1;
195        } else {
196                s = id - bnode->startindex;
197        }
198        assert(s < bnode->slots);
199        assert(bnode->released[s] != 0);
200
201
202        if (bnode->released[s] == 1) {
203                uint64_t previd = b->nextid - 1;
204                if (b->nextid == 1)
205                        previd = MAX_OUTSTANDING - 1;
206
207                if (bnode == b->node && id == previd) {
208                        b->packets[id] = NULL;
209                        b->nextid = previd;
210                        bnode->released[s] = 0;
211                        if (id == bnode->startindex)
212                                bnode->startindex = 0;
213                } else {
214                        bnode->released[s] = 2;
215                }
216                bnode->activemembers -= 1;
217        }
218
219        while (libtrace_list_get_size(b->nodelist) > 1) {
220                lnode = libtrace_list_get_index(b->nodelist, 0);
221
222                front = *(libtrace_bucket_node_t **)lnode->data;
223
224                if (front->activemembers > 0) {
225                        break;
226                }
227                if (front == b->node)
228                        break;
229
230                assert(lnode->next != NULL);
231                for (i = 0; i < front->slots; i++) {
232                        if (front->released[i] == 2) {
233                                int index = i + front->startindex;
234                                if (index >= MAX_OUTSTANDING) {
235                                        index -= (MAX_OUTSTANDING - 1);
236                                }
237                                b->packets[index] = NULL;
238                        }
239                }
240
241                clear_bucket_node(front);
242                libtrace_list_pop_front(b->nodelist, &tmp);
243                free(front);
244                pthread_cond_signal(&b->cond);
245
246        }
247        pthread_mutex_unlock(&b->lock);
248
249}
Note: See TracBrowser for help on using the repository browser.