source: lib/data-struct/ring_buffer.c @ a49a9eb

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since a49a9eb was a49a9eb, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Add an object cache with thread local caches
All packets used by a trace are put through this.
Adds bulk read/write operations to the ringbuffer (used by the object cache)
Replace semaphores with condition variables to support these bulk operations.
Internally use bulk read operations from a single threaded formats to reduce lock overhead.
Replaces the asserts around pthread_* functions with a version that will still run the command if NDEBUG

  • Property mode set to 100644
File size: 14.9 KB
Line 
1/**
2 * A ring or circular buffer, very useful
3 */
4
5#include "ring_buffer.h"
6
7#include <stdlib.h>
8#include <assert.h>
9#include <string.h>
10#include <stdio.h>
11
12#define LOCK_TYPE_MUTEX 0 // Default if not defined
13#define LOCK_TYPE_SPIN 1
14#define LOCK_TYPE_NONE 2
15
16// No major difference noticed here between mutex and spin, both have there
17// downsides.
18
19#define USE_CHECK_EARLY 1
20
21#define USE_LOCK_TYPE LOCK_TYPE_MUTEX
22#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
23#       define LOCK(dir) ASSERT_RET(pthread_spin_lock(&rb->s ## dir ## lock), == 0)
24#       define UNLOCK(dir) ASSERT_RET(pthread_spin_unlock(&rb->s ## dir ## lock), == 0)
25#       define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \
26                                                                action }
27#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
28#       define LOCK(dir)
29#       define UNLOCK(dir)
30#       define TRY_LOCK(dir, action)
31#else // Mutex
32#       define LOCK(dir) ASSERT_RET(pthread_mutex_lock(&rb-> dir ## lock), == 0)
33#       define UNLOCK(dir) ASSERT_RET(pthread_mutex_unlock(&rb-> dir ## lock), == 0)
34#       define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\
35                                                                action }
36#endif
37
38
39/**
40 * Implements a FIFO queue via a ring buffer, this is a fixed size
41 * and all methods are no clobber i.e. will not overwrite old items
42 * with new ones.
43 *
44 * @param rb A pointer to a ringbuffer structure.
45 * @param size The maximum size of the ring buffer. (NOTE: one extra slot is allocated so use -1 if attempting memory alignment)
46 * @param mode The mode allows selection to use semaphores to signal when data
47 *                              becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING.
48 *                              NOTE: this mainly applies to the blocking functions
49 */
50DLLEXPORT void libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) {
51        size = size + 1;
52        assert (size > 1);
53        rb->size = size; // Only this -1 actually usable :)
54        rb->start = 0;
55        rb->end = 0;
56        rb->elements = calloc(rb->size, sizeof(void*));
57        assert(rb->elements);
58        rb->mode = mode;
59        if (mode == LIBTRACE_RINGBUFFER_BLOCKING) {
60                /* The signaling part - i.e. release when data's ready to read */
61                pthread_cond_init(&rb->full_cond, NULL);
62                pthread_cond_init(&rb->empty_cond, NULL);
63                ASSERT_RET(pthread_mutex_init(&rb->empty_lock, NULL), == 0);
64                ASSERT_RET(pthread_mutex_init(&rb->full_lock, NULL), == 0);
65        }
66        /* The mutual exclusion part */
67#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
68#warning "using spinners"
69        ASSERT_RET(pthread_spin_init(&rb->swlock, 0), == 0);
70        ASSERT_RET(pthread_spin_init(&rb->srlock, 0), == 0);
71#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
72#warning "No locking used"
73#else
74        ASSERT_RET(pthread_mutex_init(&rb->wlock, NULL), == 0);
75        ASSERT_RET(pthread_mutex_init(&rb->rlock, NULL), == 0);
76#endif
77}
78
79/**
80 * Destroys the ring buffer along with any memory allocated to it
81 * @param rb The ringbuffer to destroy
82 */
83DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) {
84#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
85        ASSERT_RET(pthread_spin_destroy(&rb->swlock), == 0);
86        ASSERT_RET(pthread_spin_destroy(&rb->srlock), == 0);
87#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
88#endif
89        ASSERT_RET(pthread_mutex_destroy(&rb->wlock), == 0);
90        ASSERT_RET(pthread_mutex_destroy(&rb->rlock), == 0);
91        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
92                pthread_cond_destroy(&rb->full_cond);
93                pthread_cond_destroy(&rb->empty_cond);
94        }
95        rb->size = 0;
96        rb->start = 0;
97        rb->end = 0;
98        free((void *)rb->elements);
99        rb->elements = NULL;
100}
101
102/**
103 * Tests to see if ringbuffer is empty, when using multiple threads
104 * this doesn't guarantee that the next operation wont block. Use
105 * write/read try instead.
106 */
107DLLEXPORT int libtrace_ringbuffer_is_empty(const libtrace_ringbuffer_t * rb) {
108        return rb->start == rb->end;
109}
110
111/**
112 * Tests to see if ringbuffer is empty, when using multiple threads
113 * this doesn't guarantee that the next operation wont block. Use
114 * write/read try instead.
115 */
116DLLEXPORT int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) {
117        return rb->start == ((rb->end + 1) % rb->size);
118}
119
120static inline size_t libtrace_ringbuffer_nb_full(const libtrace_ringbuffer_t *rb) {
121        if (rb->end < rb->start)
122                return rb->end + rb->size - rb->start;
123        else
124                return rb->end - rb->start;
125        // return (rb->end + rb->size - rb->start) % rb->size;
126}
127
128static inline size_t libtrace_ringbuffer_nb_empty(const libtrace_ringbuffer_t *rb) {
129        if (rb->start <= rb->end)
130                return rb->start + rb->size - rb->end - 1;
131        else
132                return rb->start - rb->end - 1;
133        // return (rb->start + rb->size - rb->end - 1) % rb->size;
134}
135
136/**
137 * Waits for a empty slot, that we can write to.
138 * @param rb The ringbuffer
139 */
140static inline void wait_for_empty(libtrace_ringbuffer_t *rb) {
141        /* Need an empty to start with */
142        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
143                pthread_mutex_lock(&rb->empty_lock);
144                while (libtrace_ringbuffer_is_full(rb))
145                        pthread_cond_wait(&rb->empty_cond, &rb->empty_lock);
146                pthread_mutex_unlock(&rb->empty_lock);
147        } else {
148                while (libtrace_ringbuffer_is_full(rb))
149                        /* Yield our time, why?, we tried and failed to write an item
150                         * to the buffer - so we should give up our time in the hope
151                         * that the reader thread can empty the buffer giving us a good
152                         * burst to write without blocking */
153                        sched_yield();//_mm_pause();
154        }
155}
156
157/**
158 * Waits for a full slot, that we read from.
159 * @param rb The ringbuffer
160 */
161static inline void wait_for_full(libtrace_ringbuffer_t *rb) {
162        /* Need an empty to start with */
163        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
164                pthread_mutex_lock(&rb->full_lock);
165                while (libtrace_ringbuffer_is_empty(rb))
166                        pthread_cond_wait(&rb->full_cond, &rb->full_lock);
167                pthread_mutex_unlock(&rb->full_lock);
168        } else {
169                while (libtrace_ringbuffer_is_empty(rb))
170                        /* Yield our time, why?, we tried and failed to write an item
171                         * to the buffer - so we should give up our time in the hope
172                         * that the reader thread can empty the buffer giving us a good
173                         * burst to write without blocking */
174                        sched_yield();//_mm_pause();
175        }
176}
177
178/**
179 * Notifies we have created a full slot, after a write.
180 * @param rb The ringbuffer
181 */
182static inline void notify_full(libtrace_ringbuffer_t *rb) {
183        /* Need an empty to start with */
184        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
185                pthread_mutex_lock(&rb->full_lock);
186                pthread_cond_broadcast(&rb->full_cond);
187                pthread_mutex_unlock(&rb->full_lock);
188        }
189}
190
191/**
192 * Notifies we have created an empty slot, after a read.
193 * @param rb The ringbuffer
194 */
195static inline void notify_empty(libtrace_ringbuffer_t *rb) {
196        /* Need an empty to start with */
197        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
198                pthread_mutex_lock(&rb->empty_lock);
199                pthread_cond_broadcast(&rb->empty_cond);
200                pthread_mutex_unlock(&rb->empty_lock);
201        }
202}
203
204/**
205 * Performs a blocking write to the buffer, upon return the value will be
206 * stored. This will not clobber old values.
207 *
208 * This assumes only one thread writing at once. Use
209 * libtrace_ringbuffer_swrite for a thread safe version.
210 *
211 * @param rb a pointer to libtrace_ringbuffer structure
212 * @param value the value to store
213 */
214DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) {
215        /* Need an empty to start with */
216        wait_for_empty(rb);
217        rb->elements[rb->end] = value;
218        rb->end = (rb->end + 1) % rb->size;
219        notify_full(rb);
220}
221
222/**
223 * Performs a blocking write to the buffer, upon return the value will be
224 * stored. This will not clobber old values.
225 *
226 * This assumes only one thread writing at once. Use
227 * libtrace_ringbuffer_swrite for a thread safe version.
228 *
229 * Packets are written out from start to end in order, if only some packets are
230 * written those at the end of the array will be still be unwritten.
231 *
232 * @param rb a pointer to libtrace_ringbuffer structure
233 * @param values A pointer to a memory address read in
234 * @param nb_buffer The maximum buffers to write i.e. the length of values
235 * @param min_nb_buffers The minimum number of buffers to write
236 * @param value the value to store
237 */
238DLLEXPORT size_t libtrace_ringbuffer_write_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
239        size_t nb_ready;
240        size_t i = 0;
241       
242        assert(min_nb_buffers <= nb_buffers);
243        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb))
244                return 0;
245
246        do {
247                register size_t end;
248                wait_for_empty(rb);
249                nb_ready = libtrace_ringbuffer_nb_empty(rb);
250                nb_ready = MIN(nb_ready, nb_buffers-i);
251                nb_ready += i;
252                // TODO consider optimising into at most 2 memcpys??
253                end = rb->end;
254                for (; i < nb_ready; i++) {
255                        rb->elements[end] = values[i];
256                        end = (end + 1) % rb->size;
257                }
258                rb->end = end;
259                notify_full(rb);
260        } while (i < min_nb_buffers);
261        return i;
262}
263
264/**
265 * Performs a non-blocking write to the buffer, if their is no space
266 * or the list is locked by another thread this will return immediately
267 * without writing the value. Assumes that only one thread is writing.
268 * Otherwise use libtrace_ringbuffer_try_swrite.
269 *
270 * @param rb a pointer to libtrace_ringbuffer structure
271 * @param value the value to store
272 * @return 1 if a object was written otherwise 0.
273 */
274DLLEXPORT int libtrace_ringbuffer_try_write(libtrace_ringbuffer_t * rb, void* value) {
275        if (libtrace_ringbuffer_is_full(rb))
276                return 0;
277        libtrace_ringbuffer_write(rb, value);
278        return 1;
279}
280
281/**
282 * Waits and reads from the supplied buffer, note this will block forever.
283 *
284 * @param rb a pointer to libtrace_ringbuffer structure
285 * @param out a pointer to a memory address where the returned item would be placed
286 * @return The object that was read
287 */
288DLLEXPORT void* libtrace_ringbuffer_read(libtrace_ringbuffer_t *rb) {
289        void* value;
290       
291        /* We need a full slot */
292        wait_for_full(rb);
293        value = rb->elements[rb->start];
294        rb->start = (rb->start + 1) % rb->size;
295        /* Now that's an empty slot */
296        notify_empty(rb);
297        return value;
298}
299
300/**
301 * Waits and reads from the supplied buffer, note this will block forever.
302 * Attempts to read the requested number of packets, however will return
303 * with only the number that are currently ready.
304 *
305 * Set min_nb_buffers to 0 to 'try' read packets.
306 *
307 * The buffer is filled from start to finish i.e. if 2 is returned [0] and [1]
308 * are valid.
309 *
310 * @param rb a pointer to libtrace_ringbuffer structure
311 * @param values A pointer to a memory address where the returned item would be placed
312 * @param nb_buffer The maximum buffers to read i.e. the length of values
313 * @param min_nb_buffers The minimum number of buffers to read
314 * @return The number of packets read
315 */
316DLLEXPORT size_t libtrace_ringbuffer_read_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
317        size_t nb_ready;
318        size_t i = 0;
319       
320        assert(min_nb_buffers <= nb_buffers);
321
322        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb))
323                return 0;
324
325        do {
326                register size_t start;
327                /* We need a full slot */
328                wait_for_full(rb);
329
330                nb_ready = libtrace_ringbuffer_nb_full(rb);
331                nb_ready = MIN(nb_ready, nb_buffers-i);
332                // Additional to the i we've already read
333                nb_ready += i;
334                start = rb->start;
335                for (; i < nb_ready; i++) {
336                        values[i] = rb->elements[start];
337                        start = (start + 1) % rb->size;
338                }
339                rb->start = start;
340                /* Now that's an empty slot */
341                notify_empty(rb);
342        } while (i < min_nb_buffers);
343        return i;
344}
345
346
347/**
348 * Tries to read from the supplied buffer if it fails this and returns
349 * 0 to indicate nothing was read.
350 *
351 * @param rb a pointer to libtrace_ringbuffer structure
352 * @param out a pointer to a memory address where the returned item would be placed
353 * @return 1 if a object was received otherwise 0, in this case out remains unchanged
354 */
355DLLEXPORT int libtrace_ringbuffer_try_read(libtrace_ringbuffer_t *rb, void ** value) {
356        if (libtrace_ringbuffer_is_empty(rb))
357                return 0;
358        *value = libtrace_ringbuffer_read(rb);
359        return 1;
360}
361
362/**
363 * A thread safe version of libtrace_ringbuffer_write
364 */
365DLLEXPORT void libtrace_ringbuffer_swrite(libtrace_ringbuffer_t * rb, void* value) {
366        LOCK(w);
367        libtrace_ringbuffer_write(rb, value);
368        UNLOCK(w);
369}
370
371/**
372 * A thread safe version of libtrace_ringbuffer_write_bulk
373 */
374DLLEXPORT size_t libtrace_ringbuffer_swrite_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
375        size_t ret;
376#if USE_CHECK_EARLY
377        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb)) // Check early
378                return 0;
379#endif
380        LOCK(w);
381        ret = libtrace_ringbuffer_write_bulk(rb, values, nb_buffers, min_nb_buffers);
382        UNLOCK(w);
383        return ret;
384}
385
386/**
387 * A thread safe version of libtrace_ringbuffer_try_write
388 */
389DLLEXPORT int libtrace_ringbuffer_try_swrite(libtrace_ringbuffer_t * rb, void* value) {
390        int ret;
391#if USE_CHECK_EARLY
392        if (libtrace_ringbuffer_is_full(rb)) // Check early, drd issues
393                return 0;
394#endif
395        TRY_LOCK(w, return 0;);
396        ret = libtrace_ringbuffer_try_write(rb, value);
397        UNLOCK(w);
398        return ret;
399}
400
401/**
402 * A thread safe version of libtrace_ringbuffer_try_write
403 * Unlike libtrace_ringbuffer_try_swrite this will block on da lock just
404 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
405 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_swrite_bl
406 * and libtrace_ringbuffer_try_swrite are being used.
407 */
408DLLEXPORT int libtrace_ringbuffer_try_swrite_bl(libtrace_ringbuffer_t * rb, void* value) {
409        int ret;
410#if USE_CHECK_EARLY
411        if (libtrace_ringbuffer_is_full(rb)) // Check early
412                return 0;
413#endif
414        LOCK(w);
415        ret = libtrace_ringbuffer_try_write(rb, value);
416        UNLOCK(w);
417        return ret;
418}
419
420/**
421 * A thread safe version of libtrace_ringbuffer_read
422 */
423DLLEXPORT void * libtrace_ringbuffer_sread(libtrace_ringbuffer_t *rb) {
424        void* value;
425        LOCK(r);
426        value = libtrace_ringbuffer_read(rb);
427        UNLOCK(r);
428        return value;
429}
430
431/**
432 * A thread safe version of libtrace_ringbuffer_read_bulk
433 */
434DLLEXPORT size_t libtrace_ringbuffer_sread_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
435        size_t ret;
436#if USE_CHECK_EARLY
437        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb)) // Check early
438                return 0;
439#endif
440        LOCK(r);
441        ret = libtrace_ringbuffer_read_bulk(rb, values, nb_buffers, min_nb_buffers);
442        UNLOCK(r);
443        return ret;
444}
445
446/**
447 * A thread safe version of libtrace_ringbuffer_try_write
448 */
449DLLEXPORT int libtrace_ringbuffer_try_sread(libtrace_ringbuffer_t *rb, void ** value) {
450        int ret;
451#if USE_CHECK_EARLY
452        if (libtrace_ringbuffer_is_empty(rb)) // Check early
453                return 0;
454#endif
455        TRY_LOCK(r, return 0;);
456        ret = libtrace_ringbuffer_try_read(rb, value);
457        UNLOCK(r);
458        return ret;
459}
460
461/**
462 * A thread safe version of libtrace_ringbuffer_try_wread
463 * Unlike libtrace_ringbuffer_try_sread this will block on da lock just
464 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
465 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_sread_bl
466 * and libtrace_ringbuffer_try_sread are being used.
467 */
468DLLEXPORT int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value) {
469        int ret;
470#if USE_CHECK_EARLY
471        if (libtrace_ringbuffer_is_empty(rb)) // Check early
472                return 0;
473#endif
474        LOCK(r);
475        ret = libtrace_ringbuffer_try_read(rb, value);
476        UNLOCK(r);
477        return ret;
478}
479
480DLLEXPORT void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb)
481{
482        rb->start = 0;
483        rb->end = 0;
484        rb->size = 0;
485        rb->elements = NULL;
486}
487
488
Note: See TracBrowser for help on using the repository browser.