source: lib/data-struct/ring_buffer.c @ ead9478

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since ead9478 was ead9478, checked in by Richard Sanger <rsangerarj@…>, 8 years ago

Use size_t rather than int for the data structures. Export vector, deque and ring_buffer for testing and use in programs.
Remove some inlines for now since these result in lots of compile warnings when also exported, the compiler is probably smart enough anyway.

  • Property mode set to 100644
File size: 10.5 KB
Line 
1/**
2 * A ring or circular buffer, very useful
3 */
4
5#include "ring_buffer.h"
6
7#include <stdlib.h>
8#include <assert.h>
9
10#define LOCK_TYPE_MUTEX 0 // Default if not defined
11#define LOCK_TYPE_SPIN 1
12#define LOCK_TYPE_SEMAPHORE 2
13#define LOCK_TYPE_NONE 3
14
15// No major difference noticed here between mutex and spin, both have there
16// downsides.
17
18#define USE_MODULUS 1
19#define USE_CHECK_EARLY 1
20
21#define USE_LOCK_TYPE LOCK_TYPE_SPIN
22#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
23#       define LOCK(dir) assert(pthread_spin_lock(&rb->s ## dir ## lock) == 0)
24#       define UNLOCK(dir) assert(pthread_spin_unlock(&rb->s ## dir ## lock) == 0)
25#       define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \
26                                                                action }
27#elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
28#       define LOCK(dir) assert(sem_wait(&rb->sem ## dir ## lock) == 0)
29#       define UNLOCK(dir) assert(sem_post(&rb->sem ## dir ## lock) == 0)
30#       define TRY_LOCK(dir, action) if(sem_trywait(&rb->sem ## dir ## lock) != 0) { \
31                                                                action }
32#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
33#       define LOCK(dir)
34#       define UNLOCK(dir)
35#       define TRY_LOCK(dir, action)
36#else // Mutex
37#       define LOCK(dir) assert(pthread_mutex_lock(&rb-> dir ## lock) == 0)
38#       define UNLOCK(dir) assert(pthread_mutex_unlock(&rb-> dir ## lock) == 0)
39#       define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\
40                                                                action }
41#endif
42
43
44/**
45 * Implements a FIFO queue via a ring buffer, this is a fixed size
46 * and all methods are no clobber i.e. will not overwrite old items
47 * with new ones.
48 *
49 * @param rb A pointer to a ringbuffer structure.
50 * @param size The maximum size of the ring buffer. (NOTE: one extra slot is allocated so use -1 if attempting memory alignment)
51 * @param mode The mode allows selection to use semaphores to signal when data
52 *                              becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING.
53 *                              NOTE: this mainly applies to the blocking functions
54 */
55DLLEXPORT void libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) {
56        size = size + 1;
57        assert (size > 1);
58        rb->size = size; // Only this -1 actually usable :)
59        rb->start = 0;
60        rb->end = 0;
61        rb->elements = calloc(rb->size, sizeof(void*));
62        assert(rb->elements);
63        rb->mode = mode;
64        if (mode == LIBTRACE_RINGBUFFER_BLOCKING) {
65                /* The signaling part - i.e. release when data's ready to read */
66                assert(sem_init(&rb->fulls, 0, 0) == 0);
67                assert(sem_init(&rb->emptys, 0, size - 1) == 0); // REMEMBER the -1 here :) very important
68        }
69        /* The mutual exclusion part */
70#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
71#warning "using spinners"
72        assert(pthread_spin_init(&rb->swlock, 0) == 0);
73        assert(pthread_spin_init(&rb->srlock, 0) == 0);
74#elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
75#warning "using semaphore"
76        assert(sem_init(&rb->semrlock, 0, 1) != -1);
77        assert(sem_init(&rb->semwlock, 0, 1) != -1);
78#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
79#warning "No locking used"
80#else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */
81        assert(pthread_mutex_init(&rb->wlock, NULL) == 0);
82        assert(pthread_mutex_init(&rb->rlock, NULL) == 0);
83#endif
84}
85
86/**
87 * Destroys the ring buffer along with any memory allocated to it
88 * @param rb The ringbuffer to destroy
89 */
90DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) {
91#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
92        assert(pthread_spin_destroy(&rb->swlock) == 0);
93        assert(pthread_spin_destroy(&rb->srlock) == 0);
94#elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
95        assert(sem_destroy(&rb->semrlock) != -1);
96        assert(sem_destroy(&rb->semwlock) != -1);
97#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
98#else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */
99        assert(pthread_mutex_destroy(&rb->wlock) == 0);
100        assert(pthread_mutex_destroy(&rb->rlock) == 0);
101#endif
102        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
103                assert(sem_destroy(&rb->fulls) == 0);
104                assert(sem_destroy(&rb->emptys) == 0);
105        }
106        rb->size = 0;
107        rb->start = 0;
108        rb->end = 0;
109        free((void *)rb->elements);
110        rb->elements = NULL;
111}
112
113/**
114 * Tests to see if ringbuffer is empty, when using multiple threads
115 * this doesn't guarantee that the next operation wont block. Use
116 * write/read try instead.
117 */
118DLLEXPORT int libtrace_ringbuffer_is_empty(const libtrace_ringbuffer_t * rb) {
119        return rb->start == rb->end;
120}
121
122/**
123 * Tests to see if ringbuffer is empty, when using multiple threads
124 * this doesn't guarantee that the next operation wont block. Use
125 * write/read try instead.
126 */
127DLLEXPORT int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) {
128#if USE_MODULUS
129        return rb->start == ((rb->end + 1) % rb->size);
130#else
131        return rb->start == ((rb->end + 1 < rb->size) ? rb->end + 1 : 0);
132#endif
133}
134
135/**
136 * Performs a blocking write to the buffer, upon return the value will be
137 * stored. This will not clobber old values.
138 *
139 * This assumes only one thread writing at once. Use
140 * libtrace_ringbuffer_swrite for a thread safe version.
141 *
142 * @param rb a pointer to libtrace_ringbuffer structure
143 * @param value the value to store
144 */
145DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) {
146        /* Need an empty to start with */
147        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
148                assert(sem_wait(&rb->emptys) == 0);
149        else 
150                while (libtrace_ringbuffer_is_full(rb))
151                        /* Yield our time, why?, we tried and failed to write an item
152                         * to the buffer - so we should give up our time in the hope
153                         * that the reader thread can empty the buffer giving us a good
154                         * burst to write without blocking */
155                        sched_yield();//_mm_pause();
156
157        rb->elements[rb->end] = value;
158#if USE_MODULUS
159        rb->end = (rb->end + 1) % rb->size;
160#else
161        rb->end = (rb->end + 1 < rb->size) ? rb->end + 1 : 0;
162#endif
163        /* This check is bad we can easily lose our time slice, and the reader
164         * can catch up before it should, in this case spin locking is used */
165        //if (libtrace_ringbuffer_is_empty(rb))
166        //      assert(0 == 1);
167        /* Now we've made another full */
168        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
169                assert(sem_post(&rb->fulls) == 0);
170}
171
172/**
173 * Performs a non-blocking write to the buffer, if their is no space
174 * or the list is locked by another thread this will return immediately
175 * without writing the value. Assumes that only one thread is writing.
176 * Otherwise use libtrace_ringbuffer_try_swrite.
177 *
178 * @param rb a pointer to libtrace_ringbuffer structure
179 * @param value the value to store
180 * @return 1 if a object was written otherwise 0.
181 */
182DLLEXPORT int libtrace_ringbuffer_try_write(libtrace_ringbuffer_t * rb, void* value) {
183        if (libtrace_ringbuffer_is_full(rb))
184                return 0;
185        libtrace_ringbuffer_write(rb, value);
186        return 1;
187}
188
189/**
190 * Waits and reads from the supplied buffer, note this will block forever.
191 *
192 * @param rb a pointer to libtrace_ringbuffer structure
193 * @param out a pointer to a memory address where the returned item would be placed
194 * @return The object that was read
195 */
196DLLEXPORT void* libtrace_ringbuffer_read(libtrace_ringbuffer_t *rb) {
197        void* value;
198       
199        /* We need a full slot */
200        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
201                assert(sem_wait(&rb->fulls) == 0);
202        else
203                while (libtrace_ringbuffer_is_empty(rb)) 
204                        /* Yield our time, why?, we tried and failed to read an item
205                         * from the buffer - so we should give up our time in the hope
206                         * that the writer thread can fill the buffer giving us a good
207                         * burst to read without blocking etc */
208                        sched_yield();//_mm_pause();
209       
210        value = rb->elements[rb->start];
211#if USE_MODULUS
212        rb->start = (rb->start + 1) % rb->size;
213#else
214        rb->start = (rb->start + 1 < rb->size) ? rb->start + 1 : 0;
215#endif
216        /* Now that's a empty slot */
217        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
218                assert(sem_post(&rb->emptys) == 0);
219        return value;
220}
221
222/**
223 * Tries to read from the supplied buffer if it fails this and returns
224 * 0 to indicate nothing was read.
225 *
226 * @param rb a pointer to libtrace_ringbuffer structure
227 * @param out a pointer to a memory address where the returned item would be placed
228 * @return 1 if a object was received otherwise 0, in this case out remains unchanged
229 */
230DLLEXPORT int libtrace_ringbuffer_try_read(libtrace_ringbuffer_t *rb, void ** value) {
231        if (libtrace_ringbuffer_is_empty(rb))
232                return 0;
233        *value = libtrace_ringbuffer_read(rb);
234        return 1;
235}
236
237/**
238 * A thread safe version of libtrace_ringbuffer_write
239 */
240DLLEXPORT void libtrace_ringbuffer_swrite(libtrace_ringbuffer_t * rb, void* value) {
241        LOCK(w);
242        libtrace_ringbuffer_write(rb, value);
243        UNLOCK(w);
244}
245
246/**
247 * A thread safe version of libtrace_ringbuffer_try_write
248 */
249DLLEXPORT int libtrace_ringbuffer_try_swrite(libtrace_ringbuffer_t * rb, void* value) {
250        int ret;
251#if USE_CHECK_EARLY
252        if (libtrace_ringbuffer_is_full(rb)) // Check early, drd issues
253                return 0;
254#endif
255        TRY_LOCK(w, return 0;);
256        ret = libtrace_ringbuffer_try_write(rb, value);
257        UNLOCK(w);
258        return ret;
259}
260
261/**
262 * A thread safe version of libtrace_ringbuffer_try_write
263 * Unlike libtrace_ringbuffer_try_swrite this will block on da lock just
264 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
265 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_swrite_bl
266 * and libtrace_ringbuffer_try_swrite are being used.
267 */
268DLLEXPORT int libtrace_ringbuffer_try_swrite_bl(libtrace_ringbuffer_t * rb, void* value) {
269        int ret;
270#if USE_CHECK_EARLY
271        if (libtrace_ringbuffer_is_full(rb)) // Check early
272                return 0;
273#endif
274        LOCK(w);
275        ret = libtrace_ringbuffer_try_write(rb, value);
276        UNLOCK(w);
277        return ret;
278}
279
280/**
281 * A thread safe version of libtrace_ringbuffer_read
282 */
283DLLEXPORT void * libtrace_ringbuffer_sread(libtrace_ringbuffer_t *rb) {
284        void* value;
285        LOCK(r);
286        value = libtrace_ringbuffer_read(rb);
287        UNLOCK(r);
288        return value;
289}
290
291/**
292 * A thread safe version of libtrace_ringbuffer_try_write
293 */
294DLLEXPORT int libtrace_ringbuffer_try_sread(libtrace_ringbuffer_t *rb, void ** value) {
295        int ret;
296#if USE_CHECK_EARLY
297        if (libtrace_ringbuffer_is_empty(rb)) // Check early
298                return 0;
299#endif
300        TRY_LOCK(r, return 0;);
301        ret = libtrace_ringbuffer_try_read(rb, value);
302        UNLOCK(r);
303        return ret;
304}
305
306/**
307 * A thread safe version of libtrace_ringbuffer_try_wread
308 * Unlike libtrace_ringbuffer_try_sread this will block on da lock just
309 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
310 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_sread_bl
311 * and libtrace_ringbuffer_try_sread are being used.
312 */
313DLLEXPORT int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value) {
314        int ret;
315#if USE_CHECK_EARLY
316        if (libtrace_ringbuffer_is_empty(rb)) // Check early
317                return 0;
318#endif
319        LOCK(r);
320        ret = libtrace_ringbuffer_try_read(rb, value);
321        UNLOCK(r);
322        return ret;
323}
324
325DLLEXPORT void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb)
326{
327        rb->start = 0;
328        rb->end = 0;
329        rb->size = 0;
330        rb->elements = NULL;
331}
Note: See TracBrowser for help on using the repository browser.