source: lib/data-struct/ring_buffer.c @ 6a082f8

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 6a082f8 was 04bf7c5, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Remove unused sliding window code.
Refactored pstart and added some proper error handling.

  • Property mode set to 100644
File size: 15.0 KB
Line 
1/**
2 * A ring or circular buffer, very useful
3 */
4
5#include "ring_buffer.h"
6
7#include <stdlib.h>
8#include <assert.h>
9#include <string.h>
10#include <stdio.h>
11
12#define LOCK_TYPE_MUTEX 0 // Default if not defined
13#define LOCK_TYPE_SPIN 1
14#define LOCK_TYPE_NONE 2
15
16// No major difference noticed here between mutex and spin, both have there
17// downsides.
18
19#define USE_CHECK_EARLY 1
20
21#define USE_LOCK_TYPE LOCK_TYPE_MUTEX
22#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
23#       define LOCK(dir) ASSERT_RET(pthread_spin_lock(&rb->s ## dir ## lock), == 0)
24#       define UNLOCK(dir) ASSERT_RET(pthread_spin_unlock(&rb->s ## dir ## lock), == 0)
25#       define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \
26                                                                action }
27#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
28#       define LOCK(dir)
29#       define UNLOCK(dir)
30#       define TRY_LOCK(dir, action)
31#else // Mutex
32#       define LOCK(dir) ASSERT_RET(pthread_mutex_lock(&rb-> dir ## lock), == 0)
33#       define UNLOCK(dir) ASSERT_RET(pthread_mutex_unlock(&rb-> dir ## lock), == 0)
34#       define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\
35                                                                action }
36#endif
37
38
39/**
40 * Implements a FIFO queue via a ring buffer, this is a fixed size
41 * and all methods are no clobber i.e. will not overwrite old items
42 * with new ones.
43 *
44 * @param rb A pointer to a ringbuffer structure.
45 * @param size The maximum size of the ring buffer. (NOTE: one extra slot is allocated so use -1 if attempting memory alignment)
46 * @param mode The mode allows selection to use semaphores to signal when data
47 *                              becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING.
48 *                              NOTE: this mainly applies to the blocking functions
49 * @return If successful returns 0 otherwise -1 upon failure.
50 */
51DLLEXPORT int libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) {
52        size = size + 1;
53        if (!(size > 1))
54                return -1;
55        rb->size = size;
56        rb->start = 0;
57        rb->end = 0;
58        rb->elements = calloc(rb->size, sizeof(void*));
59        if (!rb->elements)
60                return -1;
61        rb->mode = mode;
62        if (mode == LIBTRACE_RINGBUFFER_BLOCKING) {
63                /* The signaling part - i.e. release when data is ready to read */
64                pthread_cond_init(&rb->full_cond, NULL);
65                pthread_cond_init(&rb->empty_cond, NULL);
66                ASSERT_RET(pthread_mutex_init(&rb->empty_lock, NULL), == 0);
67                ASSERT_RET(pthread_mutex_init(&rb->full_lock, NULL), == 0);
68        }
69        /* The mutual exclusion part */
70#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
71#warning "using spinners"
72        ASSERT_RET(pthread_spin_init(&rb->swlock, 0), == 0);
73        ASSERT_RET(pthread_spin_init(&rb->srlock, 0), == 0);
74#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
75#warning "No locking used"
76#else
77        ASSERT_RET(pthread_mutex_init(&rb->wlock, NULL), == 0);
78        ASSERT_RET(pthread_mutex_init(&rb->rlock, NULL), == 0);
79#endif
80        return 0;
81}
82
83/**
84 * Destroys the ring buffer along with any memory allocated to it
85 * @param rb The ringbuffer to destroy
86 */
87DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) {
88#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
89        ASSERT_RET(pthread_spin_destroy(&rb->swlock), == 0);
90        ASSERT_RET(pthread_spin_destroy(&rb->srlock), == 0);
91#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
92#endif
93        ASSERT_RET(pthread_mutex_destroy(&rb->wlock), == 0);
94        ASSERT_RET(pthread_mutex_destroy(&rb->rlock), == 0);
95        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
96                pthread_cond_destroy(&rb->full_cond);
97                pthread_cond_destroy(&rb->empty_cond);
98        }
99        rb->size = 0;
100        rb->start = 0;
101        rb->end = 0;
102        free((void *)rb->elements);
103        rb->elements = NULL;
104}
105
106/**
107 * Tests to see if ringbuffer is empty, when using multiple threads
108 * this doesn't guarantee that the next operation wont block. Use
109 * write/read try instead.
110 */
111DLLEXPORT int libtrace_ringbuffer_is_empty(const libtrace_ringbuffer_t * rb) {
112        return rb->start == rb->end;
113}
114
115/**
116 * Tests to see if ringbuffer is empty, when using multiple threads
117 * this doesn't guarantee that the next operation wont block. Use
118 * write/read try instead.
119 */
120DLLEXPORT int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) {
121        return rb->start == ((rb->end + 1) % rb->size);
122}
123
124static inline size_t libtrace_ringbuffer_nb_full(const libtrace_ringbuffer_t *rb) {
125        if (rb->end < rb->start)
126                return rb->end + rb->size - rb->start;
127        else
128                return rb->end - rb->start;
129        // return (rb->end + rb->size - rb->start) % rb->size;
130}
131
132static inline size_t libtrace_ringbuffer_nb_empty(const libtrace_ringbuffer_t *rb) {
133        if (rb->start <= rb->end)
134                return rb->start + rb->size - rb->end - 1;
135        else
136                return rb->start - rb->end - 1;
137        // return (rb->start + rb->size - rb->end - 1) % rb->size;
138}
139
140/**
141 * Waits for a empty slot, that we can write to.
142 * @param rb The ringbuffer
143 */
144static inline void wait_for_empty(libtrace_ringbuffer_t *rb) {
145        /* Need an empty to start with */
146        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
147                pthread_mutex_lock(&rb->empty_lock);
148                while (libtrace_ringbuffer_is_full(rb))
149                        pthread_cond_wait(&rb->empty_cond, &rb->empty_lock);
150                pthread_mutex_unlock(&rb->empty_lock);
151        } else {
152                while (libtrace_ringbuffer_is_full(rb))
153                        /* Yield our time, why?, we tried and failed to write an item
154                         * to the buffer - so we should give up our time in the hope
155                         * that the reader thread can empty the buffer giving us a good
156                         * burst to write without blocking */
157                        sched_yield();//_mm_pause();
158        }
159}
160
161/**
162 * Waits for a full slot, that we read from.
163 * @param rb The ringbuffer
164 */
165static inline void wait_for_full(libtrace_ringbuffer_t *rb) {
166        /* Need an empty to start with */
167        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
168                pthread_mutex_lock(&rb->full_lock);
169                while (libtrace_ringbuffer_is_empty(rb))
170                        pthread_cond_wait(&rb->full_cond, &rb->full_lock);
171                pthread_mutex_unlock(&rb->full_lock);
172        } else {
173                while (libtrace_ringbuffer_is_empty(rb))
174                        /* Yield our time, why?, we tried and failed to write an item
175                         * to the buffer - so we should give up our time in the hope
176                         * that the reader thread can empty the buffer giving us a good
177                         * burst to write without blocking */
178                        sched_yield();//_mm_pause();
179        }
180}
181
182/**
183 * Notifies we have created a full slot, after a write.
184 * @param rb The ringbuffer
185 */
186static inline void notify_full(libtrace_ringbuffer_t *rb) {
187        /* Need an empty to start with */
188        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
189                pthread_mutex_lock(&rb->full_lock);
190                pthread_cond_broadcast(&rb->full_cond);
191                pthread_mutex_unlock(&rb->full_lock);
192        }
193}
194
195/**
196 * Notifies we have created an empty slot, after a read.
197 * @param rb The ringbuffer
198 */
199static inline void notify_empty(libtrace_ringbuffer_t *rb) {
200        /* Need an empty to start with */
201        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
202                pthread_mutex_lock(&rb->empty_lock);
203                pthread_cond_broadcast(&rb->empty_cond);
204                pthread_mutex_unlock(&rb->empty_lock);
205        }
206}
207
208/**
209 * Performs a blocking write to the buffer, upon return the value will be
210 * stored. This will not clobber old values.
211 *
212 * This assumes only one thread writing at once. Use
213 * libtrace_ringbuffer_swrite for a thread safe version.
214 *
215 * @param rb a pointer to libtrace_ringbuffer structure
216 * @param value the value to store
217 */
218DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) {
219        /* Need an empty to start with */
220        wait_for_empty(rb);
221        rb->elements[rb->end] = value;
222        rb->end = (rb->end + 1) % rb->size;
223        notify_full(rb);
224}
225
226/**
227 * Performs a blocking write to the buffer, upon return the value will be
228 * stored. This will not clobber old values.
229 *
230 * This assumes only one thread writing at once. Use
231 * libtrace_ringbuffer_swrite for a thread safe version.
232 *
233 * Packets are written out from start to end in order, if only some packets are
234 * written those at the end of the array will be still be unwritten.
235 *
236 * @param rb a pointer to libtrace_ringbuffer structure
237 * @param values A pointer to a memory address read in
238 * @param nb_buffer The maximum buffers to write i.e. the length of values
239 * @param min_nb_buffers The minimum number of buffers to write
240 * @param value the value to store
241 */
242DLLEXPORT size_t libtrace_ringbuffer_write_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
243        size_t nb_ready;
244        size_t i = 0;
245       
246        assert(min_nb_buffers <= nb_buffers);
247        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb))
248                return 0;
249
250        do {
251                register size_t end;
252                wait_for_empty(rb);
253                nb_ready = libtrace_ringbuffer_nb_empty(rb);
254                nb_ready = MIN(nb_ready, nb_buffers-i);
255                nb_ready += i;
256                // TODO consider optimising into at most 2 memcpys??
257                end = rb->end;
258                for (; i < nb_ready; i++) {
259                        rb->elements[end] = values[i];
260                        end = (end + 1) % rb->size;
261                }
262                rb->end = end;
263                notify_full(rb);
264        } while (i < min_nb_buffers);
265        return i;
266}
267
268/**
269 * Performs a non-blocking write to the buffer, if their is no space
270 * or the list is locked by another thread this will return immediately
271 * without writing the value. Assumes that only one thread is writing.
272 * Otherwise use libtrace_ringbuffer_try_swrite.
273 *
274 * @param rb a pointer to libtrace_ringbuffer structure
275 * @param value the value to store
276 * @return 1 if a object was written otherwise 0.
277 */
278DLLEXPORT int libtrace_ringbuffer_try_write(libtrace_ringbuffer_t * rb, void* value) {
279        if (libtrace_ringbuffer_is_full(rb))
280                return 0;
281        libtrace_ringbuffer_write(rb, value);
282        return 1;
283}
284
285/**
286 * Waits and reads from the supplied buffer, note this will block forever.
287 *
288 * @param rb a pointer to libtrace_ringbuffer structure
289 * @param out a pointer to a memory address where the returned item would be placed
290 * @return The object that was read
291 */
292DLLEXPORT void* libtrace_ringbuffer_read(libtrace_ringbuffer_t *rb) {
293        void* value;
294       
295        /* We need a full slot */
296        wait_for_full(rb);
297        value = rb->elements[rb->start];
298        rb->start = (rb->start + 1) % rb->size;
299        /* Now that's an empty slot */
300        notify_empty(rb);
301        return value;
302}
303
304/**
305 * Waits and reads from the supplied buffer, note this will block forever.
306 * Attempts to read the requested number of packets, however will return
307 * with only the number that are currently ready.
308 *
309 * Set min_nb_buffers to 0 to 'try' read packets.
310 *
311 * The buffer is filled from start to finish i.e. if 2 is returned [0] and [1]
312 * are valid.
313 *
314 * @param rb a pointer to libtrace_ringbuffer structure
315 * @param values A pointer to a memory address where the returned item would be placed
316 * @param nb_buffer The maximum buffers to read i.e. the length of values
317 * @param min_nb_buffers The minimum number of buffers to read
318 * @return The number of packets read
319 */
320DLLEXPORT size_t libtrace_ringbuffer_read_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
321        size_t nb_ready;
322        size_t i = 0;
323       
324        assert(min_nb_buffers <= nb_buffers);
325
326        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb))
327                return 0;
328
329        do {
330                register size_t start;
331                /* We need a full slot */
332                wait_for_full(rb);
333
334                nb_ready = libtrace_ringbuffer_nb_full(rb);
335                nb_ready = MIN(nb_ready, nb_buffers-i);
336                // Additional to the i we've already read
337                nb_ready += i;
338                start = rb->start;
339                for (; i < nb_ready; i++) {
340                        values[i] = rb->elements[start];
341                        start = (start + 1) % rb->size;
342                }
343                rb->start = start;
344                /* Now that's an empty slot */
345                notify_empty(rb);
346        } while (i < min_nb_buffers);
347        return i;
348}
349
350
351/**
352 * Tries to read from the supplied buffer if it fails this and returns
353 * 0 to indicate nothing was read.
354 *
355 * @param rb a pointer to libtrace_ringbuffer structure
356 * @param out a pointer to a memory address where the returned item would be placed
357 * @return 1 if a object was received otherwise 0, in this case out remains unchanged
358 */
359DLLEXPORT int libtrace_ringbuffer_try_read(libtrace_ringbuffer_t *rb, void ** value) {
360        if (libtrace_ringbuffer_is_empty(rb))
361                return 0;
362        *value = libtrace_ringbuffer_read(rb);
363        return 1;
364}
365
366/**
367 * A thread safe version of libtrace_ringbuffer_write
368 */
369DLLEXPORT void libtrace_ringbuffer_swrite(libtrace_ringbuffer_t * rb, void* value) {
370        LOCK(w);
371        libtrace_ringbuffer_write(rb, value);
372        UNLOCK(w);
373}
374
375/**
376 * A thread safe version of libtrace_ringbuffer_write_bulk
377 */
378DLLEXPORT size_t libtrace_ringbuffer_swrite_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
379        size_t ret;
380#if USE_CHECK_EARLY
381        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb)) // Check early
382                return 0;
383#endif
384        LOCK(w);
385        ret = libtrace_ringbuffer_write_bulk(rb, values, nb_buffers, min_nb_buffers);
386        UNLOCK(w);
387        return ret;
388}
389
390/**
391 * A thread safe version of libtrace_ringbuffer_try_write
392 */
393DLLEXPORT int libtrace_ringbuffer_try_swrite(libtrace_ringbuffer_t * rb, void* value) {
394        int ret;
395#if USE_CHECK_EARLY
396        if (libtrace_ringbuffer_is_full(rb)) // Check early, drd issues
397                return 0;
398#endif
399        TRY_LOCK(w, return 0;);
400        ret = libtrace_ringbuffer_try_write(rb, value);
401        UNLOCK(w);
402        return ret;
403}
404
405/**
406 * A thread safe version of libtrace_ringbuffer_try_write
407 * Unlike libtrace_ringbuffer_try_swrite this will block on da lock just
408 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
409 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_swrite_bl
410 * and libtrace_ringbuffer_try_swrite are being used.
411 */
412DLLEXPORT int libtrace_ringbuffer_try_swrite_bl(libtrace_ringbuffer_t * rb, void* value) {
413        int ret;
414#if USE_CHECK_EARLY
415        if (libtrace_ringbuffer_is_full(rb)) // Check early
416                return 0;
417#endif
418        LOCK(w);
419        ret = libtrace_ringbuffer_try_write(rb, value);
420        UNLOCK(w);
421        return ret;
422}
423
424/**
425 * A thread safe version of libtrace_ringbuffer_read
426 */
427DLLEXPORT void * libtrace_ringbuffer_sread(libtrace_ringbuffer_t *rb) {
428        void* value;
429        LOCK(r);
430        value = libtrace_ringbuffer_read(rb);
431        UNLOCK(r);
432        return value;
433}
434
435/**
436 * A thread safe version of libtrace_ringbuffer_read_bulk
437 */
438DLLEXPORT size_t libtrace_ringbuffer_sread_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
439        size_t ret;
440#if USE_CHECK_EARLY
441        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb)) // Check early
442                return 0;
443#endif
444        LOCK(r);
445        ret = libtrace_ringbuffer_read_bulk(rb, values, nb_buffers, min_nb_buffers);
446        UNLOCK(r);
447        return ret;
448}
449
450/**
451 * A thread safe version of libtrace_ringbuffer_try_write
452 */
453DLLEXPORT int libtrace_ringbuffer_try_sread(libtrace_ringbuffer_t *rb, void ** value) {
454        int ret;
455#if USE_CHECK_EARLY
456        if (libtrace_ringbuffer_is_empty(rb)) // Check early
457                return 0;
458#endif
459        TRY_LOCK(r, return 0;);
460        ret = libtrace_ringbuffer_try_read(rb, value);
461        UNLOCK(r);
462        return ret;
463}
464
465/**
466 * A thread safe version of libtrace_ringbuffer_try_wread
467 * Unlike libtrace_ringbuffer_try_sread this will block on da lock just
468 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
469 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_sread_bl
470 * and libtrace_ringbuffer_try_sread are being used.
471 */
472DLLEXPORT int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value) {
473        int ret;
474#if USE_CHECK_EARLY
475        if (libtrace_ringbuffer_is_empty(rb)) // Check early
476                return 0;
477#endif
478        LOCK(r);
479        ret = libtrace_ringbuffer_try_read(rb, value);
480        UNLOCK(r);
481        return ret;
482}
483
484DLLEXPORT void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb)
485{
486        rb->start = 0;
487        rb->end = 0;
488        rb->size = 0;
489        rb->elements = NULL;
490}
491
492
Note: See TracBrowser for help on using the repository browser.