source: lib/data-struct/ring_buffer.c @ 0a474e3

develop
Last change on this file since 0a474e3 was 0a474e3, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

And more..

  • Property mode set to 100644
File size: 16.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26/**
27 * A ring or circular buffer, very useful
28 */
29
30#include "ring_buffer.h"
31
32#include <stdlib.h>
33#include <assert.h>
34#include <string.h>
35#include <stdio.h>
36
37#define LOCK_TYPE_MUTEX 0 // Default if not defined
38#define LOCK_TYPE_SPIN 1
39#define LOCK_TYPE_NONE 2
40
41// No major difference noticed here between mutex and spin, both have there
42// downsides.
43
44#define USE_CHECK_EARLY 1
45
46#define USE_LOCK_TYPE LOCK_TYPE_MUTEX
47#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
48#       define LOCK(dir) ASSERT_RET(pthread_spin_lock(&rb->s ## dir ## lock), == 0)
49#       define UNLOCK(dir) ASSERT_RET(pthread_spin_unlock(&rb->s ## dir ## lock), == 0)
50#       define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \
51                                                                action }
52#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
53#       define LOCK(dir)
54#       define UNLOCK(dir)
55#       define TRY_LOCK(dir, action)
56#else // Mutex
57#       define LOCK(dir) ASSERT_RET(pthread_mutex_lock(&rb-> dir ## lock), == 0)
58#       define UNLOCK(dir) ASSERT_RET(pthread_mutex_unlock(&rb-> dir ## lock), == 0)
59#       define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\
60                                                                action }
61#endif
62
63
64/**
65 * Implements a FIFO queue via a ring buffer, this is a fixed size
66 * and all methods are no clobber i.e. will not overwrite old items
67 * with new ones.
68 *
69 * @param rb A pointer to a ringbuffer structure.
70 * @param size The maximum size of the ring buffer. (NOTE: one extra slot is allocated so use -1 if attempting memory alignment)
71 * @param mode The mode allows selection to use semaphores to signal when data
72 *                              becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING.
73 *                              NOTE: this mainly applies to the blocking functions
74 * @return If successful returns 0 otherwise -1 upon failure.
75 */
76DLLEXPORT int libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) {
77        size = size + 1;
78        if (!(size > 1))
79                return -1;
80        rb->size = size;
81        rb->start = 0;
82        rb->end = 0;
83        rb->elements = calloc(rb->size, sizeof(void*));
84        if (!rb->elements)
85                return -1;
86        rb->mode = mode;
87        if (mode == LIBTRACE_RINGBUFFER_BLOCKING) {
88                /* The signaling part - i.e. release when data is ready to read */
89                pthread_cond_init(&rb->full_cond, NULL);
90                pthread_cond_init(&rb->empty_cond, NULL);
91                ASSERT_RET(pthread_mutex_init(&rb->empty_lock, NULL), == 0);
92                ASSERT_RET(pthread_mutex_init(&rb->full_lock, NULL), == 0);
93        }
94        /* The mutual exclusion part */
95#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
96#warning "using spinners"
97        ASSERT_RET(pthread_spin_init(&rb->swlock, 0), == 0);
98        ASSERT_RET(pthread_spin_init(&rb->srlock, 0), == 0);
99#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
100#warning "No locking used"
101#else
102        ASSERT_RET(pthread_mutex_init(&rb->wlock, NULL), == 0);
103        ASSERT_RET(pthread_mutex_init(&rb->rlock, NULL), == 0);
104#endif
105        return 0;
106}
107
108/**
109 * Destroys the ring buffer along with any memory allocated to it
110 * @param rb The ringbuffer to destroy
111 */
112DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) {
113#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
114        ASSERT_RET(pthread_spin_destroy(&rb->swlock), == 0);
115        ASSERT_RET(pthread_spin_destroy(&rb->srlock), == 0);
116#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
117#endif
118        ASSERT_RET(pthread_mutex_destroy(&rb->wlock), == 0);
119        ASSERT_RET(pthread_mutex_destroy(&rb->rlock), == 0);
120        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
121                pthread_cond_destroy(&rb->full_cond);
122                pthread_cond_destroy(&rb->empty_cond);
123        }
124        rb->size = 0;
125        rb->start = 0;
126        rb->end = 0;
127        free((void *)rb->elements);
128        rb->elements = NULL;
129}
130
131/**
132 * Tests to see if ringbuffer is empty, when using multiple threads
133 * this doesn't guarantee that the next operation wont block. Use
134 * write/read try instead.
135 */
136DLLEXPORT int libtrace_ringbuffer_is_empty(const libtrace_ringbuffer_t * rb) {
137        return rb->start == rb->end;
138}
139
140/**
141 * Tests to see if ringbuffer is empty, when using multiple threads
142 * this doesn't guarantee that the next operation wont block. Use
143 * write/read try instead.
144 */
145DLLEXPORT int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) {
146        return rb->start == ((rb->end + 1) % rb->size);
147}
148
149static inline size_t libtrace_ringbuffer_nb_full(const libtrace_ringbuffer_t *rb) {
150        if (rb->end < rb->start)
151                return rb->end + rb->size - rb->start;
152        else
153                return rb->end - rb->start;
154        // return (rb->end + rb->size - rb->start) % rb->size;
155}
156
157static inline size_t libtrace_ringbuffer_nb_empty(const libtrace_ringbuffer_t *rb) {
158        if (rb->start <= rb->end)
159                return rb->start + rb->size - rb->end - 1;
160        else
161                return rb->start - rb->end - 1;
162        // return (rb->start + rb->size - rb->end - 1) % rb->size;
163}
164
165/**
166 * Waits for a empty slot, that we can write to.
167 * @param rb The ringbuffer
168 */
169static inline void wait_for_empty(libtrace_ringbuffer_t *rb) {
170        /* Need an empty to start with */
171        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
172                pthread_mutex_lock(&rb->empty_lock);
173                while (libtrace_ringbuffer_is_full(rb))
174                        pthread_cond_wait(&rb->empty_cond, &rb->empty_lock);
175                pthread_mutex_unlock(&rb->empty_lock);
176        } else {
177                while (libtrace_ringbuffer_is_full(rb))
178                        /* Yield our time, why?, we tried and failed to write an item
179                         * to the buffer - so we should give up our time in the hope
180                         * that the reader thread can empty the buffer giving us a good
181                         * burst to write without blocking */
182                        sched_yield();//_mm_pause();
183        }
184}
185
186/**
187 * Waits for a full slot, that we read from.
188 * @param rb The ringbuffer
189 */
190static inline void wait_for_full(libtrace_ringbuffer_t *rb) {
191        /* Need an empty to start with */
192        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
193                pthread_mutex_lock(&rb->full_lock);
194                while (libtrace_ringbuffer_is_empty(rb))
195                        pthread_cond_wait(&rb->full_cond, &rb->full_lock);
196                pthread_mutex_unlock(&rb->full_lock);
197        } else {
198                while (libtrace_ringbuffer_is_empty(rb))
199                        /* Yield our time, why?, we tried and failed to write an item
200                         * to the buffer - so we should give up our time in the hope
201                         * that the reader thread can empty the buffer giving us a good
202                         * burst to write without blocking */
203                        sched_yield();//_mm_pause();
204        }
205}
206
207/**
208 * Notifies we have created a full slot, after a write.
209 * @param rb The ringbuffer
210 */
211static inline void notify_full(libtrace_ringbuffer_t *rb) {
212        /* Need an empty to start with */
213        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
214                pthread_mutex_lock(&rb->full_lock);
215                pthread_cond_broadcast(&rb->full_cond);
216                pthread_mutex_unlock(&rb->full_lock);
217        }
218}
219
220/**
221 * Notifies we have created an empty slot, after a read.
222 * @param rb The ringbuffer
223 */
224static inline void notify_empty(libtrace_ringbuffer_t *rb) {
225        /* Need an empty to start with */
226        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
227                pthread_mutex_lock(&rb->empty_lock);
228                pthread_cond_broadcast(&rb->empty_cond);
229                pthread_mutex_unlock(&rb->empty_lock);
230        }
231}
232
233/**
234 * Performs a blocking write to the buffer, upon return the value will be
235 * stored. This will not clobber old values.
236 *
237 * This assumes only one thread writing at once. Use
238 * libtrace_ringbuffer_swrite for a thread safe version.
239 *
240 * @param rb a pointer to libtrace_ringbuffer structure
241 * @param value the value to store
242 */
243DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) {
244        /* Need an empty to start with */
245        wait_for_empty(rb);
246        rb->elements[rb->end] = value;
247        rb->end = (rb->end + 1) % rb->size;
248        notify_full(rb);
249}
250
251/**
252 * Performs a blocking write to the buffer, upon return the value will be
253 * stored. This will not clobber old values.
254 *
255 * This assumes only one thread writing at once. Use
256 * libtrace_ringbuffer_swrite for a thread safe version.
257 *
258 * Packets are written out from start to end in order, if only some packets are
259 * written those at the end of the array will be still be unwritten.
260 *
261 * @param rb a pointer to libtrace_ringbuffer structure
262 * @param values A pointer to a memory address read in
263 * @param nb_buffer The maximum buffers to write i.e. the length of values
264 * @param min_nb_buffers The minimum number of buffers to write
265 * @param value the value to store
266 */
267DLLEXPORT size_t libtrace_ringbuffer_write_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
268        size_t nb_ready;
269        size_t i = 0;
270       
271        /*assert(min_nb_buffers <= nb_buffers);*/
272        if (min_nb_buffers > nb_buffers) {
273                fprintf(stderr, "min_nb_buffers must be greater than or equal to nb_buffers in libtrace_ringbuffer_write_bulk()\n");
274                return ~0U;
275        }
276        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb))
277                return 0;
278
279        do {
280                register size_t end;
281                wait_for_empty(rb);
282                nb_ready = libtrace_ringbuffer_nb_empty(rb);
283                nb_ready = MIN(nb_ready, nb_buffers-i);
284                nb_ready += i;
285                // TODO consider optimising into at most 2 memcpys??
286                end = rb->end;
287                for (; i < nb_ready; i++) {
288                        rb->elements[end] = values[i];
289                        end = (end + 1) % rb->size;
290                }
291                rb->end = end;
292                notify_full(rb);
293        } while (i < min_nb_buffers);
294        return i;
295}
296
297/**
298 * Performs a non-blocking write to the buffer, if their is no space
299 * or the list is locked by another thread this will return immediately
300 * without writing the value. Assumes that only one thread is writing.
301 * Otherwise use libtrace_ringbuffer_try_swrite.
302 *
303 * @param rb a pointer to libtrace_ringbuffer structure
304 * @param value the value to store
305 * @return 1 if a object was written otherwise 0.
306 */
307DLLEXPORT int libtrace_ringbuffer_try_write(libtrace_ringbuffer_t * rb, void* value) {
308        if (libtrace_ringbuffer_is_full(rb))
309                return 0;
310        libtrace_ringbuffer_write(rb, value);
311        return 1;
312}
313
314/**
315 * Waits and reads from the supplied buffer, note this will block forever.
316 *
317 * @param rb a pointer to libtrace_ringbuffer structure
318 * @param out a pointer to a memory address where the returned item would be placed
319 * @return The object that was read
320 */
321DLLEXPORT void* libtrace_ringbuffer_read(libtrace_ringbuffer_t *rb) {
322        void* value;
323       
324        /* We need a full slot */
325        wait_for_full(rb);
326        value = rb->elements[rb->start];
327        rb->start = (rb->start + 1) % rb->size;
328        /* Now that's an empty slot */
329        notify_empty(rb);
330        return value;
331}
332
333/**
334 * Waits and reads from the supplied buffer, note this will block forever.
335 * Attempts to read the requested number of packets, however will return
336 * with only the number that are currently ready.
337 *
338 * Set min_nb_buffers to 0 to 'try' read packets.
339 *
340 * The buffer is filled from start to finish i.e. if 2 is returned [0] and [1]
341 * are valid.
342 *
343 * @param rb a pointer to libtrace_ringbuffer structure
344 * @param values A pointer to a memory address where the returned item would be placed
345 * @param nb_buffer The maximum buffers to read i.e. the length of values
346 * @param min_nb_buffers The minimum number of buffers to read
347 * @return The number of packets read
348 */
349DLLEXPORT size_t libtrace_ringbuffer_read_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
350        size_t nb_ready;
351        size_t i = 0;
352
353        /*assert(min_nb_buffers <= nb_buffers);*/
354        if (min_nb_buffers > nb_buffers) {
355                fprintf(stderr, "min_nb_buffers must be greater than or equal to nb_buffers in libtrace_ringbuffer_write_bulk()\n");
356                return ~0U;
357        }
358
359        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb))
360                return 0;
361
362        do {
363                register size_t start;
364                /* We need a full slot */
365                wait_for_full(rb);
366
367                nb_ready = libtrace_ringbuffer_nb_full(rb);
368                nb_ready = MIN(nb_ready, nb_buffers-i);
369                // Additional to the i we've already read
370                nb_ready += i;
371                start = rb->start;
372                for (; i < nb_ready; i++) {
373                        values[i] = rb->elements[start];
374                        start = (start + 1) % rb->size;
375                }
376                rb->start = start;
377                /* Now that's an empty slot */
378                notify_empty(rb);
379        } while (i < min_nb_buffers);
380        return i;
381}
382
383
384/**
385 * Tries to read from the supplied buffer if it fails this and returns
386 * 0 to indicate nothing was read.
387 *
388 * @param rb a pointer to libtrace_ringbuffer structure
389 * @param out a pointer to a memory address where the returned item would be placed
390 * @return 1 if a object was received otherwise 0, in this case out remains unchanged
391 */
392DLLEXPORT int libtrace_ringbuffer_try_read(libtrace_ringbuffer_t *rb, void ** value) {
393        if (libtrace_ringbuffer_is_empty(rb))
394                return 0;
395        *value = libtrace_ringbuffer_read(rb);
396        return 1;
397}
398
399/**
400 * A thread safe version of libtrace_ringbuffer_write
401 */
402DLLEXPORT void libtrace_ringbuffer_swrite(libtrace_ringbuffer_t * rb, void* value) {
403        LOCK(w);
404        libtrace_ringbuffer_write(rb, value);
405        UNLOCK(w);
406}
407
408/**
409 * A thread safe version of libtrace_ringbuffer_write_bulk
410 */
411DLLEXPORT size_t libtrace_ringbuffer_swrite_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
412        size_t ret;
413#if USE_CHECK_EARLY
414        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb)) // Check early
415                return 0;
416#endif
417        LOCK(w);
418        ret = libtrace_ringbuffer_write_bulk(rb, values, nb_buffers, min_nb_buffers);
419        UNLOCK(w);
420        return ret;
421}
422
423/**
424 * A thread safe version of libtrace_ringbuffer_try_write
425 */
426DLLEXPORT int libtrace_ringbuffer_try_swrite(libtrace_ringbuffer_t * rb, void* value) {
427        int ret;
428#if USE_CHECK_EARLY
429        if (libtrace_ringbuffer_is_full(rb)) // Check early, drd issues
430                return 0;
431#endif
432        TRY_LOCK(w, return 0;);
433        ret = libtrace_ringbuffer_try_write(rb, value);
434        UNLOCK(w);
435        return ret;
436}
437
438/**
439 * A thread safe version of libtrace_ringbuffer_try_write
440 * Unlike libtrace_ringbuffer_try_swrite this will block on da lock just
441 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
442 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_swrite_bl
443 * and libtrace_ringbuffer_try_swrite are being used.
444 */
445DLLEXPORT int libtrace_ringbuffer_try_swrite_bl(libtrace_ringbuffer_t * rb, void* value) {
446        int ret;
447#if USE_CHECK_EARLY
448        if (libtrace_ringbuffer_is_full(rb)) // Check early
449                return 0;
450#endif
451        LOCK(w);
452        ret = libtrace_ringbuffer_try_write(rb, value);
453        UNLOCK(w);
454        return ret;
455}
456
457/**
458 * A thread safe version of libtrace_ringbuffer_read
459 */
460DLLEXPORT void * libtrace_ringbuffer_sread(libtrace_ringbuffer_t *rb) {
461        void* value;
462        LOCK(r);
463        value = libtrace_ringbuffer_read(rb);
464        UNLOCK(r);
465        return value;
466}
467
468/**
469 * A thread safe version of libtrace_ringbuffer_read_bulk
470 */
471DLLEXPORT size_t libtrace_ringbuffer_sread_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
472        size_t ret;
473#if USE_CHECK_EARLY
474        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb)) // Check early
475                return 0;
476#endif
477        LOCK(r);
478        ret = libtrace_ringbuffer_read_bulk(rb, values, nb_buffers, min_nb_buffers);
479        UNLOCK(r);
480        return ret;
481}
482
483/**
484 * A thread safe version of libtrace_ringbuffer_try_write
485 */
486DLLEXPORT int libtrace_ringbuffer_try_sread(libtrace_ringbuffer_t *rb, void ** value) {
487        int ret;
488#if USE_CHECK_EARLY
489        if (libtrace_ringbuffer_is_empty(rb)) // Check early
490                return 0;
491#endif
492        TRY_LOCK(r, return 0;);
493        ret = libtrace_ringbuffer_try_read(rb, value);
494        UNLOCK(r);
495        return ret;
496}
497
498/**
499 * A thread safe version of libtrace_ringbuffer_try_wread
500 * Unlike libtrace_ringbuffer_try_sread this will block on da lock just
501 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
502 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_sread_bl
503 * and libtrace_ringbuffer_try_sread are being used.
504 */
505DLLEXPORT int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value) {
506        int ret;
507#if USE_CHECK_EARLY
508        if (libtrace_ringbuffer_is_empty(rb)) // Check early
509                return 0;
510#endif
511        LOCK(r);
512        ret = libtrace_ringbuffer_try_read(rb, value);
513        UNLOCK(r);
514        return ret;
515}
516
517DLLEXPORT void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb)
518{
519        rb->start = 0;
520        rb->end = 0;
521        rb->size = 0;
522        rb->elements = NULL;
523}
524
525
Note: See TracBrowser for help on using the repository browser.