source: lib/data-struct/buckets.c @ 348396b

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 348396b was 348396b, checked in by Shane Alcock <salcock@…>, 6 years ago

Fix deadlock when reading parallel RT with a filter

The problem was that filtered packets were still consuming slots in the
buckets, but matching packets would only get released once parallel libtrace had
read a full batch (10 packets, by default). If the filter didn't match many
packets, the 200,000 packet slots would wrap around before a batch was
complete -- leaving us with nowhere to store new packets.

The fix was to allow the bucket structure to "reclaim" a slot if it is
released as soon as it is allocated, i.e. if the packet was filtered. This
means that only matching packets will consume slots and we can't run out
before we read a full batch.

  • Property mode set to 100644
File size: 6.1 KB
Line 
1
2#include <stdlib.h>
3#include <assert.h>
4#include <string.h>
5#include "buckets.h"
6
7#define MAX_OUTSTANDING (200000)
8
9static void clear_bucket_node(void *node) {
10
11        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)node;
12        if (bnode->buffer)
13                free(bnode->buffer);
14        if (bnode->released)
15                free(bnode->released);
16}
17
18DLLEXPORT libtrace_bucket_t *libtrace_bucket_init() {
19
20        libtrace_bucket_t *b = (libtrace_bucket_t *) malloc(sizeof(libtrace_bucket_t));
21
22        b->packets = (libtrace_bucket_node_t **)calloc(MAX_OUTSTANDING + 1,
23                        sizeof(libtrace_bucket_node_t *));
24
25        b->nextid = 199999;
26        b->node = NULL;
27        b->nodelist = libtrace_list_init(sizeof(libtrace_bucket_node_t));
28
29        pthread_mutex_init(&b->lock, NULL);
30        pthread_cond_init(&b->cond, NULL);
31
32        return b;
33
34}
35
36DLLEXPORT void libtrace_bucket_destroy(libtrace_bucket_t *b) {
37
38        pthread_mutex_lock(&b->lock);
39        if (b->node) {
40                clear_bucket_node(b->node);
41                free(b->node);
42        }
43
44        libtrace_list_deinit(b->nodelist);
45        free(b->packets);
46        pthread_mutex_unlock(&b->lock);
47        pthread_mutex_destroy(&b->lock);
48        pthread_cond_destroy(&b->cond);
49        free(b);
50}
51
52DLLEXPORT void libtrace_create_new_bucket(libtrace_bucket_t *b, void *buffer) {
53
54        libtrace_bucket_node_t *bnode = (libtrace_bucket_node_t *)malloc(
55                        sizeof(libtrace_bucket_node_t));
56
57        bnode->startindex = 0;
58        bnode->buffer = buffer;
59        bnode->activemembers = 0;
60        bnode->slots = 10;
61        bnode->released = (uint8_t *)malloc(bnode->slots * sizeof(uint8_t));
62
63        memset(bnode->released, 0, bnode->slots * sizeof(uint8_t));
64
65        pthread_mutex_lock(&b->lock);
66        b->node = bnode;
67
68        libtrace_list_push_back(b->nodelist, &bnode);
69        pthread_mutex_unlock(&b->lock);
70
71}
72
73DLLEXPORT uint64_t libtrace_push_into_bucket(libtrace_bucket_t *b) {
74
75        uint16_t s;
76        uint64_t ret;
77
78        pthread_mutex_lock(&b->lock);
79        if (b->node == NULL) {
80                pthread_mutex_unlock(&b->lock);
81                return 0;
82        }
83
84        if (b->nextid >= MAX_OUTSTANDING)
85                b->nextid = 1;
86        if (b->node->startindex == 0) {
87
88                while (b->packets[b->nextid] != NULL) {
89                        /* No more packet slots available! */
90                        pthread_cond_wait(&b->cond, &b->lock);
91                        pthread_mutex_unlock(&b->lock);
92
93                }
94                b->node->startindex = b->nextid;
95                b->node->activemembers = 1;
96                b->node->released[0] = 1;
97
98                b->packets[b->nextid] = b->node;
99                b->nextid ++;
100                ret = b->node->startindex;
101
102                pthread_mutex_unlock(&b->lock);
103                return ret;
104        }
105
106        if (b->nextid < b->node->startindex) {
107                s = (MAX_OUTSTANDING - b->node->startindex) + b->nextid - 1;
108        } else {
109                s = b->nextid - b->node->startindex;
110        }
111
112        if (s >= b->node->slots) {
113                b->node->slots += 10;
114                b->node->released = (uint8_t *)realloc(b->node->released,
115                                b->node->slots * sizeof(uint8_t));
116
117                memset((b->node->released +
118                                (b->node->slots - 10) * sizeof(uint8_t)), 0,
119                                (10 * sizeof(uint8_t)));
120        }
121
122        while (b->packets[b->nextid] != NULL) {
123                /* No more packet slots available! */
124                pthread_cond_wait(&b->cond, &b->lock);
125                pthread_mutex_unlock(&b->lock);
126
127        }
128        b->packets[b->nextid] = b->node;
129        b->node->activemembers ++;
130        b->node->released[s] = 1;
131        b->nextid ++;
132        ret = b->nextid - 1;
133        pthread_mutex_unlock(&b->lock);
134
135        return ret;
136
137}
138
139DLLEXPORT void libtrace_release_bucket_id(libtrace_bucket_t *b, uint64_t id) {
140
141        uint16_t s, i;
142        libtrace_bucket_node_t *bnode, *front;
143        libtrace_list_node_t *lnode;
144        libtrace_bucket_node_t tmp;
145
146        assert(id != 0);
147
148        pthread_mutex_lock(&b->lock);
149        bnode = b->packets[id];
150        assert(bnode != NULL);
151
152
153        /* Find the right slot */
154        if (id < bnode->startindex) {
155                s = (MAX_OUTSTANDING - bnode->startindex) + id - 1;
156        } else {
157                s = id - bnode->startindex;
158        }
159        assert(s < bnode->slots);
160        assert(bnode->released[s] != 0);
161       
162
163        if (bnode->released[s] == 1) {
164                uint64_t previd = b->nextid - 1;
165                if (b->nextid == 1)
166                        previd = MAX_OUTSTANDING - 1;
167
168                if (bnode == b->node && id == previd) {
169                        b->packets[id] = NULL;
170                        b->nextid = previd;
171                        bnode->released[s] = 0;
172                } else {
173                        bnode->released[s] = 2;
174                }
175                bnode->activemembers -= 1;
176        }
177
178        while (libtrace_list_get_size(b->nodelist) > 1) {
179                lnode = libtrace_list_get_index(b->nodelist, 0);
180
181                front = *(libtrace_bucket_node_t **)lnode->data;
182
183                if (front->activemembers > 0) {
184                        break;
185                }
186                if (front == b->node)
187                        break;
188
189                assert(lnode->next != NULL);
190                for (i = 0; i < front->slots; i++) {
191                        if (front->released[i] == 2) {
192                                int index = i + front->startindex;
193                                if (index >= MAX_OUTSTANDING) {
194                                        index -= (MAX_OUTSTANDING - 1);
195                                }
196                                b->packets[index] = NULL;
197                        }
198                }
199
200                clear_bucket_node(front);
201                libtrace_list_pop_front(b->nodelist, &tmp);
202                free(front);
203                pthread_cond_signal(&b->cond);
204
205        }
206        pthread_mutex_unlock(&b->lock);
207
208}
Note: See TracBrowser for help on using the repository browser.