source: lib/data-struct/buckets.c @ d391ce0

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since d391ce0 was d391ce0, checked in by Shane Alcock <salcock@…>, 6 years ago

Updated format_rt to use new bucket structure

This should save us from having to memcpy every packet from the read
buffer into the packet buffer.

Added an internalid and srcbucket reference to the libtrace_packet_t
structure, so that trace_fin_packet() can release packets from any buckets
that they are owned by.

Fix race condition on trace->last_packet.

  • Property mode set to 100644
File size: 5.7 KB
Line 
1
2#include <stdlib.h>
3#include <assert.h>
4#include <string.h>
5#include "buckets.h"
6
7#define MAX_OUTSTANDING (200000)
8
9static void clear_bucket_node(void *node) {
10
11        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)node;
12        if (bnode->buffer)
13                free(bnode->buffer);
14        if (bnode->released)
15                free(bnode->released);
16}
17
18DLLEXPORT libtrace_bucket_t *libtrace_bucket_init() {
19
20        libtrace_bucket_t *b = (libtrace_bucket_t *) malloc(sizeof(libtrace_bucket_t));
21
22        b->packets = (libtrace_bucket_node_t **)calloc(MAX_OUTSTANDING + 1,
23                        sizeof(libtrace_bucket_node_t *));
24
25        b->nextid = 199999;
26        b->node = NULL;
27        b->nodelist = libtrace_list_init(sizeof(libtrace_bucket_node_t));
28
29        pthread_mutex_init(&b->lock, NULL);
30        pthread_cond_init(&b->cond, NULL);
31
32        return b;
33
34}
35
36DLLEXPORT void libtrace_bucket_destroy(libtrace_bucket_t *b) {
37
38        pthread_mutex_lock(&b->lock);
39        if (b->node) {
40                clear_bucket_node(b->node);
41                free(b->node);
42        }
43
44        libtrace_list_deinit(b->nodelist);
45        free(b->packets);
46        pthread_mutex_unlock(&b->lock);
47        pthread_mutex_destroy(&b->lock);
48        pthread_cond_destroy(&b->cond);
49        free(b);
50}
51
52DLLEXPORT void libtrace_create_new_bucket(libtrace_bucket_t *b, void *buffer) {
53
54        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)malloc(
55                        sizeof(libtrace_bucket_node_t));
56
57        bnode->startindex = 0;
58        bnode->buffer = buffer;
59        bnode->activemembers = 0;
60        bnode->slots = 10;
61        bnode->released = (uint8_t *)malloc(bnode->slots * sizeof(uint8_t));
62
63        memset(bnode->released, 0, bnode->slots * sizeof(uint8_t));
64
65        pthread_mutex_lock(&b->lock);
66        b->node = bnode;
67
68        libtrace_list_push_back(b->nodelist, &bnode);
69        pthread_mutex_unlock(&b->lock);
70
71}
72
73DLLEXPORT uint64_t libtrace_push_into_bucket(libtrace_bucket_t *b) {
74
75        uint16_t s;
76        uint64_t ret;
77
78        pthread_mutex_lock(&b->lock);
79        if (b->node == NULL) {
80                pthread_mutex_unlock(&b->lock);
81                return 0;
82        }
83
84        if (b->nextid >= MAX_OUTSTANDING)
85                b->nextid = 1;
86        if (b->node->startindex == 0) {
87
88                while (b->packets[b->nextid] != NULL) {
89                        /* No more packet slots available! */
90                        pthread_cond_wait(&b->cond, &b->lock);
91                        pthread_mutex_unlock(&b->lock);
92
93                }
94                b->node->startindex = b->nextid;
95                b->node->activemembers = 1;
96                b->node->released[0] = 1;
97
98                b->packets[b->nextid] = b->node;
99                b->nextid ++;
100                ret = b->node->startindex;
101
102                pthread_mutex_unlock(&b->lock);
103                return ret;
104        }
105
106        if (b->nextid < b->node->startindex) {
107                s = (MAX_OUTSTANDING - b->node->startindex) + b->nextid - 1;
108        } else {
109                s = b->nextid - b->node->startindex;
110        }
111
112        if (s >= b->node->slots) {
113                b->node->slots += 10;
114                b->node->released = (uint8_t *)realloc(b->node->released,
115                                b->node->slots * sizeof(uint8_t));
116
117                memset((b->node->released +
118                                (b->node->slots - 10) * sizeof(uint8_t)), 0,
119                                (10 * sizeof(uint8_t)));
120        }
121
122        while (b->packets[b->nextid] != NULL) {
123                /* No more packet slots available! */
124                pthread_cond_wait(&b->cond, &b->lock);
125                pthread_mutex_unlock(&b->lock);
126
127        }
128        b->packets[b->nextid] = b->node;
129        b->node->activemembers ++;
130        b->node->released[s] = 1;
131        b->nextid ++;
132        ret = b->nextid - 1;
133        pthread_mutex_unlock(&b->lock);
134
135        return ret;
136
137}
138
139DLLEXPORT void libtrace_release_bucket_id(libtrace_bucket_t *b, uint64_t id) {
140
141        uint16_t s, i;
142        libtrace_bucket_node_t *bnode, *front;
143        libtrace_list_node_t *lnode;
144        libtrace_bucket_node_t tmp;
145
146        assert(id != 0);
147
148        pthread_mutex_lock(&b->lock);
149        bnode = b->packets[id];
150        assert(bnode != NULL);
151
152        /* Find the right slot */
153        if (id < bnode->startindex) {
154                s = (MAX_OUTSTANDING - bnode->startindex) + id - 1;
155        } else {
156                s = id - bnode->startindex;
157        }
158        assert(s < bnode->slots);
159        assert(bnode->released[s] != 0);
160
161        if (bnode->released[s] == 1) {
162                bnode->released[s] = 2;
163                bnode->activemembers -= 1;
164        }
165
166
167        while (libtrace_list_get_size(b->nodelist) > 1) {
168                lnode = libtrace_list_get_index(b->nodelist, 0);
169
170                front = *(libtrace_bucket_node_t **)lnode->data;
171
172                if (front->activemembers > 0)
173                        break;
174                if (front == b->node)
175                        break;
176
177                assert(lnode->next != NULL);
178                for (i = 0; i < front->slots; i++) {
179                        if (front->released[i] == 2) {
180                                int index = i + front->startindex;
181                                if (index >= MAX_OUTSTANDING) {
182                                        index -= (MAX_OUTSTANDING - 1);
183                                }
184                                b->packets[index] = NULL;
185                        }
186                }
187
188                clear_bucket_node(front);
189                libtrace_list_pop_front(b->nodelist, &tmp);
190                free(front);
191                pthread_cond_signal(&b->cond);
192
193        }
194        pthread_mutex_unlock(&b->lock);
195
196}
Note: See TracBrowser for help on using the repository browser.