source: lib/trace_ringbuffer.c @ 29ba7c2

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 29ba7c2 was 29ba7c2, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

Remove include for xmmintrin.h which was used for a busy spinning pause mm_pause() in the ring buffer, instead we are using sched_yeild() anyway and this will most likely be replaced with a condition variable in the future.
Now we support non Intel x86/x64 systems which don't have xmmintrin.h.

  • Property mode set to 100644
File size: 10.5 KB
Line 
1/**
2 * A ring or circular buffer, very useful
3 */
4
5#include "trace_ringbuffer.h"
6#include <stdlib.h>
7#include <assert.h>
8
9#define LOCK_TYPE_MUTEX 0 // Default if not defined
10#define LOCK_TYPE_SPIN 1
11#define LOCK_TYPE_SEMAPHORE 2
12#define LOCK_TYPE_NONE 3
13
14// No major difference noticed here between mutex and spin, both have there
15// downsides.
16
17#define USE_MODULUS 1
18#define USE_CHECK_EARLY 1
19
20#define USE_LOCK_TYPE LOCK_TYPE_SPIN
21#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
22#       define LOCK(dir) assert(pthread_spin_lock(&rb->s ## dir ## lock) == 0)
23#       define UNLOCK(dir) assert(pthread_spin_unlock(&rb->s ## dir ## lock) == 0)
24#       define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \
25                                                                action }
26#elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
27#       define LOCK(dir) assert(sem_wait(&rb->sem ## dir ## lock) == 0)
28#       define UNLOCK(dir) assert(sem_post(&rb->sem ## dir ## lock) == 0)
29#       define TRY_LOCK(dir, action) if(sem_trywait(&rb->sem ## dir ## lock) != 0) { \
30                                                                action }
31#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
32#       define LOCK(dir)
33#       define UNLOCK(dir)
34#       define TRY_LOCK(dir, action)
35#else // Mutex
36#       define LOCK(dir) assert(pthread_mutex_lock(&rb-> dir ## lock) == 0)
37#       define UNLOCK(dir) assert(pthread_mutex_unlock(&rb-> dir ## lock) == 0)
38#       define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\
39                                                                action }
40#endif
41
42
43/**
44 * Implements a FIFO queue via a ring buffer, this is a fixed size
45 * and all methods are no clobber i.e. will not overwrite old items
46 * with new ones.
47 *
48 * @param rb A pointer to a ringbuffer structure.
49 * @param size The maximum size of the ring buffer. (NOTE: one extra slot is allocated so use -1 if attempting memory alignment)
50 * @param mode The mode allows selection to use semaphores to signal when data
51 *                              becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING.
52 *                              NOTE: this mainly applies to the blocking functions
53 */
54inline void libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, int size, int mode) {
55        size = size + 1;
56        assert (size > 1);
57        rb->size = size; // Only this -1 actually usable :)
58        rb->start = 0;
59        rb->end = 0;
60        rb->elements = calloc(rb->size, sizeof(void*));
61        assert(rb->elements);
62        rb->mode = mode;
63        if (mode == LIBTRACE_RINGBUFFER_BLOCKING) {
64                /* The signaling part - i.e. release when data's ready to read */
65                assert(sem_init(&rb->fulls, 0, 0) == 0);
66                assert(sem_init(&rb->emptys, 0, size - 1) == 0); // REMEMBER the -1 here :) very important
67        }
68        /* The mutual exclusion part */
69#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
70#warning "using spinners"
71        assert(pthread_spin_init(&rb->swlock, 0) == 0);
72        assert(pthread_spin_init(&rb->srlock, 0) == 0);
73#elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
74#warning "using semaphore"
75        assert(sem_init(&rb->semrlock, 0, 1) != -1);
76        assert(sem_init(&rb->semwlock, 0, 1) != -1);
77#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
78#warning "No locking used"
79#else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */
80        assert(pthread_mutex_init(&rb->wlock, NULL) == 0);
81        assert(pthread_mutex_init(&rb->rlock, NULL) == 0);
82#endif
83}
84
85/**
86 * Destroys the ring buffer along with any memory allocated to it
87 * @param rb The ringbuffer to destroy
88 */
89inline void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) {
90#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
91        assert(pthread_spin_destroy(&rb->swlock) == 0);
92        assert(pthread_spin_destroy(&rb->srlock) == 0);
93#elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
94        assert(sem_destroy(&rb->semrlock) != -1);
95        assert(sem_destroy(&rb->semwlock) != -1);
96#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
97#else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */
98        assert(pthread_mutex_destroy(&rb->wlock) == 0);
99        assert(pthread_mutex_destroy(&rb->rlock) == 0);
100#endif
101        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
102                assert(sem_destroy(&rb->fulls) == 0);
103                assert(sem_destroy(&rb->emptys) == 0);
104        }
105        rb->size = 0;
106        rb->start = 0;
107        rb->end = 0;
108        free((void *)rb->elements);
109        rb->elements = NULL;
110}
111
112/**
113 * Tests to see if ringbuffer is empty, when using multiple threads
114 * this doesn't guarantee that the next operation wont block. Use
115 * write/read try instead.
116 */
117inline int libtrace_ringbuffer_is_empty(const libtrace_ringbuffer_t * rb) {
118        return rb->start == rb->end;
119}
120
121/**
122 * Tests to see if ringbuffer is empty, when using multiple threads
123 * this doesn't guarantee that the next operation wont block. Use
124 * write/read try instead.
125 */
126inline int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) {
127#if USE_MODULUS
128        return rb->start == ((rb->end + 1) % rb->size);
129#else
130        return rb->start == ((rb->end + 1 < rb->size) ? rb->end + 1 : 0);
131#endif
132}
133
134/**
135 * Performs a blocking write to the buffer, upon return the value will be
136 * stored. This will not clobber old values.
137 *
138 * This assumes only one thread writing at once. Use
139 * libtrace_ringbuffer_swrite for a thread safe version.
140 *
141 * @param rb a pointer to libtrace_ringbuffer structure
142 * @param value the value to store
143 */
144inline void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) {
145        /* Need an empty to start with */
146        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
147                assert(sem_wait(&rb->emptys) == 0);
148        else 
149                while (libtrace_ringbuffer_is_full(rb))
150                        /* Yield our time, why?, we tried and failed to write an item
151                         * to the buffer - so we should give up our time in the hope
152                         * that the reader thread can empty the buffer giving us a good
153                         * burst to write without blocking */
154                        sched_yield();//_mm_pause();
155
156        rb->elements[rb->end] = value;
157#if USE_MODULUS
158        rb->end = (rb->end + 1) % rb->size;
159#else
160        rb->end = (rb->end + 1 < rb->size) ? rb->end + 1 : 0;
161#endif
162        /* This check is bad we can easily lose our time slice, and the reader
163         * can catch up before it should, in this case spin locking is used */
164        //if (libtrace_ringbuffer_is_empty(rb))
165        //      assert(0 == 1);
166        /* Now we've made another full */
167        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
168                assert(sem_post(&rb->fulls) == 0);
169}
170
171/**
172 * Performs a non-blocking write to the buffer, if their is no space
173 * or the list is locked by another thread this will return immediately
174 * without writing the value. Assumes that only one thread is writing.
175 * Otherwise use libtrace_ringbuffer_try_swrite.
176 *
177 * @param rb a pointer to libtrace_ringbuffer structure
178 * @param value the value to store
179 * @return 1 if a object was written otherwise 0.
180 */
181inline int libtrace_ringbuffer_try_write(libtrace_ringbuffer_t * rb, void* value) {
182        if (libtrace_ringbuffer_is_full(rb))
183                return 0;
184        libtrace_ringbuffer_write(rb, value);
185        return 1;
186}
187
188/**
189 * Waits and reads from the supplied buffer, note this will block forever.
190 *
191 * @param rb a pointer to libtrace_ringbuffer structure
192 * @param out a pointer to a memory address where the returned item would be placed
193 * @return The object that was read
194 */
195inline void* libtrace_ringbuffer_read(libtrace_ringbuffer_t *rb) {
196        void* value;
197       
198        /* We need a full slot */
199        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
200                assert(sem_wait(&rb->fulls) == 0);
201        else
202                while (libtrace_ringbuffer_is_empty(rb)) 
203                        /* Yield our time, why?, we tried and failed to read an item
204                         * from the buffer - so we should give up our time in the hope
205                         * that the writer thread can fill the buffer giving us a good
206                         * burst to read without blocking etc */
207                        sched_yield();//_mm_pause();
208       
209        value = rb->elements[rb->start];
210#if USE_MODULUS
211        rb->start = (rb->start + 1) % rb->size;
212#else
213        rb->start = (rb->start + 1 < rb->size) ? rb->start + 1 : 0;
214#endif
215        /* Now that's a empty slot */
216        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
217                assert(sem_post(&rb->emptys) == 0);
218        return value;
219}
220
221/**
222 * Tries to read from the supplied buffer if it fails this and returns
223 * 0 to indicate nothing was read.
224 *
225 * @param rb a pointer to libtrace_ringbuffer structure
226 * @param out a pointer to a memory address where the returned item would be placed
227 * @return 1 if a object was received otherwise 0, in this case out remains unchanged
228 */
229inline int libtrace_ringbuffer_try_read(libtrace_ringbuffer_t *rb, void ** value) {
230        if (libtrace_ringbuffer_is_empty(rb))
231                return 0;
232        *value = libtrace_ringbuffer_read(rb);
233        return 1;
234}
235
236/**
237 * A thread safe version of libtrace_ringbuffer_write
238 */
239inline void libtrace_ringbuffer_swrite(libtrace_ringbuffer_t * rb, void* value) {
240        LOCK(w);
241        libtrace_ringbuffer_write(rb, value);
242        UNLOCK(w);
243}
244
245/**
246 * A thread safe version of libtrace_ringbuffer_try_write
247 */
248inline int libtrace_ringbuffer_try_swrite(libtrace_ringbuffer_t * rb, void* value) {
249        int ret;
250#if USE_CHECK_EARLY
251        if (libtrace_ringbuffer_is_full(rb)) // Check early, drd issues
252                return 0;
253#endif
254        TRY_LOCK(w, return 0;);
255        ret = libtrace_ringbuffer_try_write(rb, value);
256        UNLOCK(w);
257        return ret;
258}
259
260/**
261 * A thread safe version of libtrace_ringbuffer_try_write
262 * Unlike libtrace_ringbuffer_try_swrite this will block on da lock just
263 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
264 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_swrite_bl
265 * and libtrace_ringbuffer_try_swrite are being used.
266 */
267inline int libtrace_ringbuffer_try_swrite_bl(libtrace_ringbuffer_t * rb, void* value) {
268        int ret;
269#if USE_CHECK_EARLY
270        if (libtrace_ringbuffer_is_full(rb)) // Check early
271                return 0;
272#endif
273        LOCK(w);
274        ret = libtrace_ringbuffer_try_write(rb, value);
275        UNLOCK(w);
276        return ret;
277}
278
279/**
280 * A thread safe version of libtrace_ringbuffer_read
281 */
282inline void * libtrace_ringbuffer_sread(libtrace_ringbuffer_t *rb) {
283        void* value;
284        LOCK(r);
285        value = libtrace_ringbuffer_read(rb);
286        UNLOCK(r);
287        return value;
288}
289
290/**
291 * A thread safe version of libtrace_ringbuffer_try_write
292 */
293inline int libtrace_ringbuffer_try_sread(libtrace_ringbuffer_t *rb, void ** value) {
294        int ret;
295#if USE_CHECK_EARLY
296        if (libtrace_ringbuffer_is_empty(rb)) // Check early
297                return 0;
298#endif
299        TRY_LOCK(r, return 0;);
300        ret = libtrace_ringbuffer_try_read(rb, value);
301        UNLOCK(r);
302        return ret;
303}
304
305/**
306 * A thread safe version of libtrace_ringbuffer_try_wread
307 * Unlike libtrace_ringbuffer_try_sread this will block on da lock just
308 * not the data. This will block for a long period of time if libtrace_ringbuffer_sread
309 * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_sread_bl
310 * and libtrace_ringbuffer_try_sread are being used.
311 */
312inline int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value) {
313        int ret;
314#if USE_CHECK_EARLY
315        if (libtrace_ringbuffer_is_empty(rb)) // Check early
316                return 0;
317#endif
318        LOCK(r);
319        ret = libtrace_ringbuffer_try_read(rb, value);
320        UNLOCK(r);
321        return ret;
322}
323
324inline void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb)
325{
326        rb->start = 0;
327        rb->end = 0;
328        rb->size = 0;
329        rb->elements = NULL;
330}
Note: See TracBrowser for help on using the repository browser.