source: lib/data-struct/ring_buffer.c @ ee6e802

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since ee6e802 was ee6e802, checked in by Shane Alcock <salcock@…>, 4 years ago

Updated copyright blurb on all source files

In some cases, this meant adding copyright blurbs to files that
had never had them before.

  • Property mode set to 100644
File size: 15.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26/**
27 * A ring or circular buffer, very useful
28 */
29
30#include "ring_buffer.h"
31
32#include <stdlib.h>
33#include <assert.h>
34#include <string.h>
35#include <stdio.h>
36
37#define LOCK_TYPE_MUTEX 0 // Default if not defined
38#define LOCK_TYPE_SPIN 1
39#define LOCK_TYPE_NONE 2
40
41// No major difference noticed here between mutex and spin, both have there
42// downsides.
43
44#define USE_CHECK_EARLY 1
45
46#define USE_LOCK_TYPE LOCK_TYPE_MUTEX
47#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
48#       define LOCK(dir) ASSERT_RET(pthread_spin_lock(&rb->s ## dir ## lock), == 0)
49#       define UNLOCK(dir) ASSERT_RET(pthread_spin_unlock(&rb->s ## dir ## lock), == 0)
50#       define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \
51                                                                action }
52#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
53#       define LOCK(dir)
54#       define UNLOCK(dir)
55#       define TRY_LOCK(dir, action)
56#else // Mutex
57#       define LOCK(dir) ASSERT_RET(pthread_mutex_lock(&rb-> dir ## lock), == 0)
58#       define UNLOCK(dir) ASSERT_RET(pthread_mutex_unlock(&rb-> dir ## lock), == 0)
59#       define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\
60                                                                action }
61#endif
62
63
64/**
65 * Implements a FIFO queue via a ring buffer, this is a fixed size
66 * and all methods are no clobber i.e. will not overwrite old items
67 * with new ones.
68 *
69 * @param rb A pointer to a ringbuffer structure.
70 * @param size The maximum size of the ring buffer. (NOTE: one extra slot is allocated so use -1 if attempting memory alignment)
71 * @param mode The mode allows selection to use semaphores to signal when data
72 *                              becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING.
73 *                              NOTE: this mainly applies to the blocking functions
74 * @return If successful returns 0 otherwise -1 upon failure.
75 */
76DLLEXPORT int libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) {
77        size = size + 1;
78        if (!(size > 1))
79                return -1;
80        rb->size = size;
81        rb->start = 0;
82        rb->end = 0;
83        rb->elements = calloc(rb->size, sizeof(void*));
84        if (!rb->elements)
85                return -1;
86        rb->mode = mode;
87        if (mode == LIBTRACE_RINGBUFFER_BLOCKING) {
88                /* The signaling part - i.e. release when data is ready to read */
89                pthread_cond_init(&rb->full_cond, NULL);
90                pthread_cond_init(&rb->empty_cond, NULL);
91                ASSERT_RET(pthread_mutex_init(&rb->empty_lock, NULL), == 0);
92                ASSERT_RET(pthread_mutex_init(&rb->full_lock, NULL), == 0);
93        }
94        /* The mutual exclusion part */
95#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
96#warning "using spinners"
97        ASSERT_RET(pthread_spin_init(&rb->swlock, 0), == 0);
98        ASSERT_RET(pthread_spin_init(&rb->srlock, 0), == 0);
99#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
100#warning "No locking used"
101#else
102        ASSERT_RET(pthread_mutex_init(&rb->wlock, NULL), == 0);
103        ASSERT_RET(pthread_mutex_init(&rb->rlock, NULL), == 0);
104#endif
105        return 0;
106}
107
108/**
109 * Destroys the ring buffer along with any memory allocated to it
110 * @param rb The ringbuffer to destroy
111 */
112DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) {
113#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
114        ASSERT_RET(pthread_spin_destroy(&rb->swlock), == 0);
115        ASSERT_RET(pthread_spin_destroy(&rb->srlock), == 0);
116#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
117#endif
118        ASSERT_RET(pthread_mutex_destroy(&rb->wlock), == 0);
119        ASSERT_RET(pthread_mutex_destroy(&rb->rlock), == 0);
120        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
121                pthread_cond_destroy(&rb->full_cond);
122                pthread_cond_destroy(&rb->empty_cond);
123        }
124        rb->size = 0;
125        rb->start = 0;
126        rb->end = 0;
127        free((void *)rb->elements);
128        rb->elements = NULL;
129}
130
131/**
132 * Tests to see if ringbuffer is empty, when using multiple threads
133 * this doesn't guarantee that the next operation wont block. Use
134 * write/read try instead.
135 */
136DLLEXPORT int libtrace_ringbuffer_is_empty(const libtrace_ringbuffer_t * rb) {
137        return rb->start == rb->end;
138}
139
140/**
141 * Tests to see if ringbuffer is empty, when using multiple threads
142 * this doesn't guarantee that the next operation wont block. Use
143 * write/read try instead.
144 */
145DLLEXPORT int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) {
146        return rb->start == ((rb->end + 1) % rb->size);
147}
148
149static inline size_t libtrace_ringbuffer_nb_full(const libtrace_ringbuffer_t *rb) {
150        if (rb->end < rb->start)
151                return rb->end + rb->size - rb->start;
152        else
153                return rb->end - rb->start;
154        // return (rb->end + rb->size - rb->start) % rb->size;
155}
156
157static inline size_t libtrace_ringbuffer_nb_empty(const libtrace_ringbuffer_t *rb) {
158        if (rb->start <= rb->end)
159                return rb->start + rb->size - rb->end - 1;
160        else
161                return rb->start - rb->end - 1;
162        // return (rb->start + rb->size - rb->end - 1) % rb->size;
163}
164
165/**
166 * Waits for a empty slot, that we can write to.
167 * @param rb The ringbuffer
168 */
169static inline void wait_for_empty(libtrace_ringbuffer_t *rb) {
170        /* Need an empty to start with */
171        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
172                pthread_mutex_lock(&rb->empty_lock);
173                while (libtrace_ringbuffer_is_full(rb))
174                        pthread_cond_wait(&rb->empty_cond, &rb->empty_lock);
175                pthread_mutex_unlock(&rb->empty_lock);
176        } else {
177                while (libtrace_ringbuffer_is_full(rb))
178                        /* Yield our time, why?, we tried and failed to write an item
179                         * to the buffer - so we should give up our time in the hope
180                         * that the reader thread can empty the buffer giving us a good
181                         * burst to write without blocking */
182                        sched_yield();//_mm_pause();
183        }
184}
185
186/**
187 * Waits for a full slot, that we read from.
188 * @param rb The ringbuffer
189 */
190static inline void wait_for_full(libtrace_ringbuffer_t *rb) {
191        /* Need an empty to start with */
192        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
193                pthread_mutex_lock(&rb->full_lock);
194                while (libtrace_ringbuffer_is_empty(rb))
195                        pthread_cond_wait(&rb->full_cond, &rb->full_lock);
196                pthread_mutex_unlock(&rb->full_lock);
197        } else {
198                while (libtrace_ringbuffer_is_empty(rb))
199                        /* Yield our time, why?, we tried and failed to write an item
200                         * to the buffer - so we should give up our time in the hope
201                         * that the reader thread can empty the buffer giving us a good
202                         * burst to write without blocking */
203                        sched_yield();//_mm_pause();
204        }
205}
206
207/**
208 * Notifies we have created a full slot, after a write.
209 * @param rb The ringbuffer
210 */
211static inline void notify_full(libtrace_ringbuffer_t *rb) {
212        /* Need an empty to start with */
213        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
214                pthread_mutex_lock(&rb->full_lock);
215                pthread_cond_broadcast(&rb->full_cond);
216                pthread_mutex_unlock(&rb->full_lock);
217        }
218}
219
220/**
221 * Notifies we have created an empty slot, after a read.
222 * @param rb The ringbuffer
223 */
224static inline void notify_empty(libtrace_ringbuffer_t *rb) {
225        /* Need an empty to start with */
226        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
227                pthread_mutex_lock(&rb->empty_lock);
228                pthread_cond_broadcast(&rb->empty_cond);
229                pthread_mutex_unlock(&rb->empty_lock);
230        }
231}
232
233/**
234 * Performs a blocking write to the buffer, upon return the value will be
235 * stored. This will not clobber old values.
236 *
237 * This assumes only one thread writing at once. Use
238 * libtrace_ringbuffer_swrite for a thread safe version.
239 *
240 * @param rb a pointer to libtrace_ringbuffer structure
241 * @param value the value to store
242 */
243DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) {
244        /* Need an empty to start with */
245        wait_for_empty(rb);
246        rb->elements[rb->end] = value;
247        rb->end = (rb->end + 1) % rb->size;
248        notify_full(rb);
249}
250
251/**
252 * Performs a blocking write to the buffer, upon return the value will be
253 * stored. This will not clobber old values.
254 *
255 * This assumes only one thread writing at once. Use
256 * libtrace_ringbuffer_swrite for a thread safe version.
257 *
258 * Packets are written out from start to end in order, if only some packets are
259 * written those at the end of the array will be still be unwritten.
260 *
261 * @param rb a pointer to libtrace_ringbuffer structure
262 * @param values A pointer to a memory address read in
263 * @param nb_buffer The maximum buffers to write i.e. the length of values
264 * @param min_nb_buffers The minimum number of buffers to write
265 * @param value the value to store
266 */
267DLLEXPORT size_t libtrace_ringbuffer_write_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
268        size_t nb_ready;
269        size_t i = 0;
270       
271        assert(min_nb_buffers <= nb_buffers);
272        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb))
273                return 0;
274
275        do {
276                register size_t end;
277                wait_for_empty(rb);
278                nb_ready = libtrace_ringbuffer_nb_empty(rb);
279                nb_ready = MIN(nb_ready, nb_buffers-i);
280                nb_ready += i;
281                // TODO consider optimising into at most 2 memcpys??
282                end = rb->end;
283                for (; i < nb_ready; i++) {
284                        rb->elements[end] = values[i];
285                        end = (end + 1) % rb->size;
286                }
287                rb->end = end;
288                notify_full(rb);
289        } while (i < min_nb_buffers);
290        return i;
291}
292
293/**
294 * Performs a non-blocking write to the buffer, if their is no space
295 * or the list is locked by another thread this will return immediately
296 * without writing the value. Assumes that only one thread is writing.
297 * Otherwise use libtrace_ringbuffer_try_swrite.
298 *
299 * @param rb a pointer to libtrace_ringbuffer structure
300 * @param value the value to store
301 * @return 1 if a object was written otherwise 0.
302 */
303DLLEXPORT int libtrace_ringbuffer_try_write(libtrace_ringbuffer_t * rb, void* value) {
304        if (libtrace_ringbuffer_is_full(rb))
305                return 0;
306        libtrace_ringbuffer_write(rb, value);
307        return 1;
308}
309
310/**
311 * Waits and reads from the supplied buffer, note this will block forever.
312 *
313 * @param rb a pointer to libtrace_ringbuffer structure
314 * @param out a pointer to a memory address where the returned item would be placed
315 * @return The object that was read
316 */
317DLLEXPORT void* libtrace_ringbuffer_read(libtrace_ringbuffer_t *rb) {
318        void* value;
319       
320        /* We need a full slot */
321        wait_for_full(rb);
322        value = rb->elements[rb->start];
323        rb->start = (rb->start + 1) % rb->size;
324        /* Now that's an empty slot */
325        notify_empty(rb);
326        return value;
327}
328
329/**
330 * Waits and reads from the supplied buffer, note this will block forever.
331 * Attempts to read the requested number of packets, however will return
332 * with only the number that are currently ready.
333 *
334 * Set min_nb_buffers to 0 to 'try' read packets.
335 *
336 * The buffer is filled from start to finish i.e. if 2 is returned [0] and [1]
337 * are valid.
338 *
339 * @param rb a pointer to libtrace_ringbuffer structure
340 * @param values A pointer to a memory address where the returned item would be placed
341 * @param nb_buffer The maximum buffers to read i.e. the length of values
342 * @param min_nb_buffers The minimum number of buffers to read
343 * @return The number of packets read
344 */
345DLLEXPORT size_t libtrace_ringbuffer_read_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
346        size_t nb_ready;
347        size_t i = 0;
348       
349        assert(min_nb_buffers <= nb_buffers);
350
351        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb))
352                return 0;
353
354        do {
355                register size_t start;
356                /* We need a full slot */
357                wait_for_full(rb);
358
359                nb_ready = libtrace_ringbuffer_nb_full(rb);
360                nb_ready = MIN(nb_ready, nb_buffers-i);
361                // Additional to the i we've already read
362                nb_ready += i;
363                start = rb->start;
364                for (; i < nb_ready; i++) {
365                        values[i] = rb->elements[start];
366                        start = (start + 1) % rb->size;
367                }
368                rb->start = start;
369                /* Now that's an empty slot */
370                notify_empty(rb);
371        } while (i < min_nb_buffers);
372        return i;
373}
374
375
376/**
377 * Tries to read from the supplied buffer if it fails this and returns
378 * 0 to indicate nothing was read.
379 *
380 * @param rb a pointer to libtrace_ringbuffer structure
381 * @param out a pointer to a memory address where the returned item would be placed
382 * @return 1 if a object was received otherwise 0, in this case out remains unchanged
383 */
384DLLEXPORT int libtrace_ringbuffer_try_read(libtrace_ringbuffer_t *rb, void ** value) {
385        if (libtrace_ringbuffer_is_empty(rb))
386                return 0;
387        *value = libtrace_ringbuffer_read(rb);
388        return 1;
389}
390
391/**
392 * A thread safe version of libtrace_ringbuffer_write
393 */
394DLLEXPORT void libtrace_ringbuffer_swrite(libtrace_ringbuffer_t * rb, void* value) {
395        LOCK(w);
396        libtrace_ringbuffer_write(rb, value);
397        UNLOCK(w);
398}
399
400/**
401 * A thread safe version of libtrace_ringbuffer_write_bulk
402 */
403DLLEXPORT size_t libtrace_ringbuffer_swrite_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
404        size_t ret;
405#if USE_CHECK_EARLY
406        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb)) // Check early
407                return 0;
408#endif
409        LOCK(w);
410        ret = libtrace_ringbuffer_write_bulk(rb, values, nb_buffers, min_nb_buffers);
411        UNLOCK(w);
412        return ret;
413}
414
415/**
416 * A thread safe version of libtrace_ringbuffer_try_write
417 */
418DLLEXPORT int libtrace_ringbuffer_try_swrite(libtrace_ringbuffer_t * rb, void* value) {
419        int ret;
420#if USE_CHECK_EARLY
421        if (libtrace_ringbuffer_is_full(rb)) // Check early, drd issues
422                return 0;
423#endif
424        TRY_LOCK(w, return 0;);
425        ret = libtrace_ringbuffer_try_write(rb, value);
426        UNLOCK(w);
427        return ret;
428}
429
430/**
431 * A thread safe version of libtrace_ringbuffer_try_write
432 * Unlike libtrace_ringbuffer_try_swrite this will block on da lock just
433 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
434 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_swrite_bl
435 * and libtrace_ringbuffer_try_swrite are being used.
436 */
437DLLEXPORT int libtrace_ringbuffer_try_swrite_bl(libtrace_ringbuffer_t * rb, void* value) {
438        int ret;
439#if USE_CHECK_EARLY
440        if (libtrace_ringbuffer_is_full(rb)) // Check early
441                return 0;
442#endif
443        LOCK(w);
444        ret = libtrace_ringbuffer_try_write(rb, value);
445        UNLOCK(w);
446        return ret;
447}
448
449/**
450 * A thread safe version of libtrace_ringbuffer_read
451 */
452DLLEXPORT void * libtrace_ringbuffer_sread(libtrace_ringbuffer_t *rb) {
453        void* value;
454        LOCK(r);
455        value = libtrace_ringbuffer_read(rb);
456        UNLOCK(r);
457        return value;
458}
459
460/**
461 * A thread safe version of libtrace_ringbuffer_read_bulk
462 */
463DLLEXPORT size_t libtrace_ringbuffer_sread_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
464        size_t ret;
465#if USE_CHECK_EARLY
466        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb)) // Check early
467                return 0;
468#endif
469        LOCK(r);
470        ret = libtrace_ringbuffer_read_bulk(rb, values, nb_buffers, min_nb_buffers);
471        UNLOCK(r);
472        return ret;
473}
474
475/**
476 * A thread safe version of libtrace_ringbuffer_try_write
477 */
478DLLEXPORT int libtrace_ringbuffer_try_sread(libtrace_ringbuffer_t *rb, void ** value) {
479        int ret;
480#if USE_CHECK_EARLY
481        if (libtrace_ringbuffer_is_empty(rb)) // Check early
482                return 0;
483#endif
484        TRY_LOCK(r, return 0;);
485        ret = libtrace_ringbuffer_try_read(rb, value);
486        UNLOCK(r);
487        return ret;
488}
489
490/**
491 * A thread safe version of libtrace_ringbuffer_try_wread
492 * Unlike libtrace_ringbuffer_try_sread this will block on da lock just
493 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
494 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_sread_bl
495 * and libtrace_ringbuffer_try_sread are being used.
496 */
497DLLEXPORT int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value) {
498        int ret;
499#if USE_CHECK_EARLY
500        if (libtrace_ringbuffer_is_empty(rb)) // Check early
501                return 0;
502#endif
503        LOCK(r);
504        ret = libtrace_ringbuffer_try_read(rb, value);
505        UNLOCK(r);
506        return ret;
507}
508
509DLLEXPORT void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb)
510{
511        rb->start = 0;
512        rb->end = 0;
513        rb->size = 0;
514        rb->elements = NULL;
515}
516
517
Note: See TracBrowser for help on using the repository browser.