source: lib/data-struct/buckets.c @ e63d80d

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since e63d80d was e63d80d, checked in by Shane Alcock <salcock@…>, 6 years ago

Add bucket data structure for keeping track of packet buffers

A bucket can contain anywhere from 1 to N packets and will point to a
buffer owned by libtrace. Libtrace can signal that it is finished with an
individual packet and when all packets associated with the buffer are
finished, the buffer will be freed automatically.

This should make buffer management easier for formats like bpf and RT where
multiple packets can be read at once and will allow us to retain zero-copy
behaviour (aside from the initial read) in a parallel libtrace program.

  • Property mode set to 100644
File size: 5.6 KB
Line 
1
2#include <stdlib.h>
3#include <assert.h>
4#include <string.h>
5#include "buckets.h"
6
7#define MAX_OUTSTANDING (200000)
8
9static void clear_bucket_node(void *node) {
10
11        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)node;
12        if (bnode->buffer)
13                free(bnode->buffer);
14        if (bnode->released)
15                free(bnode->released);
16}
17
18DLLEXPORT libtrace_bucket_t *libtrace_bucket_init() {
19
20        libtrace_bucket_t *b = (libtrace_bucket_t *) malloc(sizeof(libtrace_bucket_t));
21
22        b->packets = (libtrace_bucket_node_t **)calloc(MAX_OUTSTANDING + 1,
23                        sizeof(libtrace_bucket_node_t *));
24
25        b->nextid = 199999;
26        b->node = NULL;
27        b->nodelist = libtrace_list_init(sizeof(libtrace_bucket_node_t));
28
29        pthread_mutex_init(&b->lock, NULL);
30        pthread_cond_init(&b->cond, NULL);
31
32        return b;
33
34}
35
36DLLEXPORT void libtrace_bucket_destroy(libtrace_bucket_t *b) {
37
38        pthread_mutex_lock(&b->lock);
39        if (b->node) {
40                clear_bucket_node(b->node);
41                free(b->node);
42        }
43
44        libtrace_list_deinit(b->nodelist);
45        free(b->packets);
46        pthread_mutex_unlock(&b->lock);
47        pthread_mutex_destroy(&b->lock);
48        pthread_cond_destroy(&b->cond);
49        free(b);
50}
51
52DLLEXPORT void libtrace_create_new_bucket(libtrace_bucket_t *b, void *buffer) {
53
54        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)malloc(
55                        sizeof(libtrace_bucket_node_t));
56
57        bnode->startindex = 0;
58        bnode->buffer = buffer;
59        bnode->activemembers = 0;
60        bnode->slots = 10;
61        bnode->released = (uint8_t *)malloc(bnode->slots * sizeof(uint8_t));
62
63        memset(bnode->released, 0, bnode->slots * sizeof(uint8_t));
64
65        pthread_mutex_lock(&b->lock);
66        b->node = bnode;
67
68        libtrace_list_push_back(b->nodelist, &bnode);
69        pthread_mutex_unlock(&b->lock);
70
71}
72
73DLLEXPORT uint64_t libtrace_push_into_bucket(libtrace_bucket_t *b) {
74
75        uint16_t s;
76        uint64_t ret;
77
78        pthread_mutex_lock(&b->lock);
79        if (b->node == NULL) {
80                pthread_mutex_unlock(&b->lock);
81                return 0;
82        }
83
84        if (b->nextid >= MAX_OUTSTANDING)
85                b->nextid = 1;
86        if (b->node->startindex == 0) {
87
88                while (b->packets[b->nextid] != NULL) {
89                        /* No more packet slots available! */
90                        pthread_cond_wait(&b->cond, &b->lock);
91                        pthread_mutex_unlock(&b->lock);
92
93                }
94                b->node->startindex = b->nextid;
95                b->node->activemembers = 1;
96                b->node->released[0] = 1;
97
98                b->packets[b->nextid] = b->node;
99                b->nextid ++;
100                ret = b->node->startindex;
101
102                pthread_mutex_unlock(&b->lock);
103                return ret;
104        }
105
106        if (b->nextid < b->node->startindex) {
107                s = (MAX_OUTSTANDING - b->node->startindex) + b->nextid - 1;
108        } else {
109                s = b->nextid - b->node->startindex;
110        }
111
112        if (s >= b->node->slots) {
113                b->node->slots += 10;
114                b->node->released = (uint8_t *)realloc(b->node->released,
115                                b->node->slots * sizeof(uint8_t));
116
117                memset((b->node->released +
118                                (b->node->slots - 10) * sizeof(uint8_t)), 0,
119                                (10 * sizeof(uint8_t)));
120        }
121
122        while (b->packets[b->nextid] != NULL) {
123                /* No more packet slots available! */
124                pthread_cond_wait(&b->cond, &b->lock);
125                pthread_mutex_unlock(&b->lock);
126
127        }
128        b->packets[b->nextid] = b->node;
129        b->node->activemembers ++;
130        b->node->released[s] = 1;
131        b->nextid ++;
132        ret = b->nextid - 1;
133        pthread_mutex_unlock(&b->lock);
134
135        return ret;
136
137}
138
139DLLEXPORT void libtrace_release_bucket_id(libtrace_bucket_t *b, uint64_t id) {
140
141        uint16_t s, i;
142        libtrace_bucket_node_t *bnode, *front;
143        libtrace_list_node_t *lnode;
144        libtrace_bucket_node_t tmp;
145
146        assert(id != 0);
147
148        pthread_mutex_lock(&b->lock);
149        bnode = b->packets[id];
150        assert(bnode != NULL);
151
152        /* Find the right slot */
153        if (id < bnode->startindex) {
154                s = (MAX_OUTSTANDING - bnode->startindex) + id - 1;
155        } else {
156                s = id - bnode->startindex;
157        }
158        assert(s < bnode->slots);
159        assert(bnode->released[s] != 0);
160
161        if (bnode->released[s] == 1) {
162                bnode->released[s] = 2;
163                bnode->activemembers -= 1;
164        }
165
166        while (libtrace_list_get_size(b->nodelist) > 0) {
167                lnode = libtrace_list_get_index(b->nodelist, 0);
168
169                front = *(libtrace_bucket_node_t **)lnode->data;
170
171                if (front->activemembers > 0)
172                        break;
173                if (front == b->node)
174                        break;
175
176                for (i = 0; i < front->slots; i++) {
177                        if (front->released[i] == 2) {
178                                int index = i + front->startindex;
179                                if (index >= MAX_OUTSTANDING) {
180                                        index -= (MAX_OUTSTANDING - 1);
181                                }
182                                b->packets[index] = NULL;
183                        }
184                }
185
186                clear_bucket_node(front);
187                libtrace_list_pop_front(b->nodelist, &tmp);
188                free(front);
189                pthread_cond_signal(&b->cond);
190
191        }
192        pthread_mutex_unlock(&b->lock);
193
194}
Note: See TracBrowser for help on using the repository browser.