1 | /** |
---|
2 | * A ring or circular buffer, very useful |
---|
3 | */ |
---|
4 | |
---|
5 | #include "ring_buffer.h" |
---|
6 | |
---|
7 | #include <stdlib.h> |
---|
8 | #include <assert.h> |
---|
9 | |
---|
10 | #define LOCK_TYPE_MUTEX 0 // Default if not defined |
---|
11 | #define LOCK_TYPE_SPIN 1 |
---|
12 | #define LOCK_TYPE_SEMAPHORE 2 |
---|
13 | #define LOCK_TYPE_NONE 3 |
---|
14 | |
---|
15 | // No major difference noticed here between mutex and spin, both have there |
---|
16 | // downsides. |
---|
17 | |
---|
18 | #define USE_MODULUS 1 |
---|
19 | #define USE_CHECK_EARLY 1 |
---|
20 | |
---|
21 | #define USE_LOCK_TYPE LOCK_TYPE_SPIN |
---|
22 | #if USE_LOCK_TYPE == LOCK_TYPE_SPIN |
---|
23 | # define LOCK(dir) assert(pthread_spin_lock(&rb->s ## dir ## lock) == 0) |
---|
24 | # define UNLOCK(dir) assert(pthread_spin_unlock(&rb->s ## dir ## lock) == 0) |
---|
25 | # define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \ |
---|
26 | action } |
---|
27 | #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE |
---|
28 | # define LOCK(dir) assert(sem_wait(&rb->sem ## dir ## lock) == 0) |
---|
29 | # define UNLOCK(dir) assert(sem_post(&rb->sem ## dir ## lock) == 0) |
---|
30 | # define TRY_LOCK(dir, action) if(sem_trywait(&rb->sem ## dir ## lock) != 0) { \ |
---|
31 | action } |
---|
32 | #elif USE_LOCK_TYPE == LOCK_TYPE_NONE |
---|
33 | # define LOCK(dir) |
---|
34 | # define UNLOCK(dir) |
---|
35 | # define TRY_LOCK(dir, action) |
---|
36 | #else // Mutex |
---|
37 | # define LOCK(dir) assert(pthread_mutex_lock(&rb-> dir ## lock) == 0) |
---|
38 | # define UNLOCK(dir) assert(pthread_mutex_unlock(&rb-> dir ## lock) == 0) |
---|
39 | # define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\ |
---|
40 | action } |
---|
41 | #endif |
---|
42 | |
---|
43 | |
---|
44 | /** |
---|
45 | * Implements a FIFO queue via a ring buffer, this is a fixed size |
---|
46 | * and all methods are no clobber i.e. will not overwrite old items |
---|
47 | * with new ones. |
---|
48 | * |
---|
49 | * @param rb A pointer to a ringbuffer structure. |
---|
50 | * @param size The maximum size of the ring buffer. (NOTE: one extra slot is allocated so use -1 if attempting memory alignment) |
---|
51 | * @param mode The mode allows selection to use semaphores to signal when data |
---|
52 | * becomes available. LIBTRACE_RINGBUFFER_BLOCKING or LIBTRACE_RINGBUFFER_POLLING. |
---|
53 | * NOTE: this mainly applies to the blocking functions |
---|
54 | */ |
---|
55 | DLLEXPORT void libtrace_ringbuffer_init(libtrace_ringbuffer_t * rb, size_t size, int mode) { |
---|
56 | size = size + 1; |
---|
57 | assert (size > 1); |
---|
58 | rb->size = size; // Only this -1 actually usable :) |
---|
59 | rb->start = 0; |
---|
60 | rb->end = 0; |
---|
61 | rb->elements = calloc(rb->size, sizeof(void*)); |
---|
62 | assert(rb->elements); |
---|
63 | rb->mode = mode; |
---|
64 | if (mode == LIBTRACE_RINGBUFFER_BLOCKING) { |
---|
65 | /* The signaling part - i.e. release when data's ready to read */ |
---|
66 | assert(sem_init(&rb->fulls, 0, 0) == 0); |
---|
67 | assert(sem_init(&rb->emptys, 0, size - 1) == 0); // REMEMBER the -1 here :) very important |
---|
68 | } |
---|
69 | /* The mutual exclusion part */ |
---|
70 | #if USE_LOCK_TYPE == LOCK_TYPE_SPIN |
---|
71 | #warning "using spinners" |
---|
72 | assert(pthread_spin_init(&rb->swlock, 0) == 0); |
---|
73 | assert(pthread_spin_init(&rb->srlock, 0) == 0); |
---|
74 | #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE |
---|
75 | #warning "using semaphore" |
---|
76 | assert(sem_init(&rb->semrlock, 0, 1) != -1); |
---|
77 | assert(sem_init(&rb->semwlock, 0, 1) != -1); |
---|
78 | #elif USE_LOCK_TYPE == LOCK_TYPE_NONE |
---|
79 | #warning "No locking used" |
---|
80 | #else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */ |
---|
81 | assert(pthread_mutex_init(&rb->wlock, NULL) == 0); |
---|
82 | assert(pthread_mutex_init(&rb->rlock, NULL) == 0); |
---|
83 | #endif |
---|
84 | } |
---|
85 | |
---|
86 | /** |
---|
87 | * Destroys the ring buffer along with any memory allocated to it |
---|
88 | * @param rb The ringbuffer to destroy |
---|
89 | */ |
---|
90 | DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) { |
---|
91 | #if USE_LOCK_TYPE == LOCK_TYPE_SPIN |
---|
92 | assert(pthread_spin_destroy(&rb->swlock) == 0); |
---|
93 | assert(pthread_spin_destroy(&rb->srlock) == 0); |
---|
94 | #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE |
---|
95 | assert(sem_destroy(&rb->semrlock) != -1); |
---|
96 | assert(sem_destroy(&rb->semwlock) != -1); |
---|
97 | #elif USE_LOCK_TYPE == LOCK_TYPE_NONE |
---|
98 | #else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */ |
---|
99 | assert(pthread_mutex_destroy(&rb->wlock) == 0); |
---|
100 | assert(pthread_mutex_destroy(&rb->rlock) == 0); |
---|
101 | #endif |
---|
102 | if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) { |
---|
103 | assert(sem_destroy(&rb->fulls) == 0); |
---|
104 | assert(sem_destroy(&rb->emptys) == 0); |
---|
105 | } |
---|
106 | rb->size = 0; |
---|
107 | rb->start = 0; |
---|
108 | rb->end = 0; |
---|
109 | free((void *)rb->elements); |
---|
110 | rb->elements = NULL; |
---|
111 | } |
---|
112 | |
---|
113 | /** |
---|
114 | * Tests to see if ringbuffer is empty, when using multiple threads |
---|
115 | * this doesn't guarantee that the next operation wont block. Use |
---|
116 | * write/read try instead. |
---|
117 | */ |
---|
118 | DLLEXPORT int libtrace_ringbuffer_is_empty(const libtrace_ringbuffer_t * rb) { |
---|
119 | return rb->start == rb->end; |
---|
120 | } |
---|
121 | |
---|
122 | /** |
---|
123 | * Tests to see if ringbuffer is empty, when using multiple threads |
---|
124 | * this doesn't guarantee that the next operation wont block. Use |
---|
125 | * write/read try instead. |
---|
126 | */ |
---|
127 | DLLEXPORT int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) { |
---|
128 | #if USE_MODULUS |
---|
129 | return rb->start == ((rb->end + 1) % rb->size); |
---|
130 | #else |
---|
131 | return rb->start == ((rb->end + 1 < rb->size) ? rb->end + 1 : 0); |
---|
132 | #endif |
---|
133 | } |
---|
134 | |
---|
135 | /** |
---|
136 | * Performs a blocking write to the buffer, upon return the value will be |
---|
137 | * stored. This will not clobber old values. |
---|
138 | * |
---|
139 | * This assumes only one thread writing at once. Use |
---|
140 | * libtrace_ringbuffer_swrite for a thread safe version. |
---|
141 | * |
---|
142 | * @param rb a pointer to libtrace_ringbuffer structure |
---|
143 | * @param value the value to store |
---|
144 | */ |
---|
145 | DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) { |
---|
146 | /* Need an empty to start with */ |
---|
147 | if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) |
---|
148 | assert(sem_wait(&rb->emptys) == 0); |
---|
149 | else |
---|
150 | while (libtrace_ringbuffer_is_full(rb)) |
---|
151 | /* Yield our time, why?, we tried and failed to write an item |
---|
152 | * to the buffer - so we should give up our time in the hope |
---|
153 | * that the reader thread can empty the buffer giving us a good |
---|
154 | * burst to write without blocking */ |
---|
155 | sched_yield();//_mm_pause(); |
---|
156 | |
---|
157 | rb->elements[rb->end] = value; |
---|
158 | #if USE_MODULUS |
---|
159 | rb->end = (rb->end + 1) % rb->size; |
---|
160 | #else |
---|
161 | rb->end = (rb->end + 1 < rb->size) ? rb->end + 1 : 0; |
---|
162 | #endif |
---|
163 | /* This check is bad we can easily lose our time slice, and the reader |
---|
164 | * can catch up before it should, in this case spin locking is used */ |
---|
165 | //if (libtrace_ringbuffer_is_empty(rb)) |
---|
166 | // assert(0 == 1); |
---|
167 | /* Now we've made another full */ |
---|
168 | if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) |
---|
169 | assert(sem_post(&rb->fulls) == 0); |
---|
170 | } |
---|
171 | |
---|
172 | /** |
---|
173 | * Performs a non-blocking write to the buffer, if their is no space |
---|
174 | * or the list is locked by another thread this will return immediately |
---|
175 | * without writing the value. Assumes that only one thread is writing. |
---|
176 | * Otherwise use libtrace_ringbuffer_try_swrite. |
---|
177 | * |
---|
178 | * @param rb a pointer to libtrace_ringbuffer structure |
---|
179 | * @param value the value to store |
---|
180 | * @return 1 if a object was written otherwise 0. |
---|
181 | */ |
---|
182 | DLLEXPORT int libtrace_ringbuffer_try_write(libtrace_ringbuffer_t * rb, void* value) { |
---|
183 | if (libtrace_ringbuffer_is_full(rb)) |
---|
184 | return 0; |
---|
185 | libtrace_ringbuffer_write(rb, value); |
---|
186 | return 1; |
---|
187 | } |
---|
188 | |
---|
189 | /** |
---|
190 | * Waits and reads from the supplied buffer, note this will block forever. |
---|
191 | * |
---|
192 | * @param rb a pointer to libtrace_ringbuffer structure |
---|
193 | * @param out a pointer to a memory address where the returned item would be placed |
---|
194 | * @return The object that was read |
---|
195 | */ |
---|
196 | DLLEXPORT void* libtrace_ringbuffer_read(libtrace_ringbuffer_t *rb) { |
---|
197 | void* value; |
---|
198 | |
---|
199 | /* We need a full slot */ |
---|
200 | if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) |
---|
201 | assert(sem_wait(&rb->fulls) == 0); |
---|
202 | else |
---|
203 | while (libtrace_ringbuffer_is_empty(rb)) |
---|
204 | /* Yield our time, why?, we tried and failed to read an item |
---|
205 | * from the buffer - so we should give up our time in the hope |
---|
206 | * that the writer thread can fill the buffer giving us a good |
---|
207 | * burst to read without blocking etc */ |
---|
208 | sched_yield();//_mm_pause(); |
---|
209 | |
---|
210 | value = rb->elements[rb->start]; |
---|
211 | #if USE_MODULUS |
---|
212 | rb->start = (rb->start + 1) % rb->size; |
---|
213 | #else |
---|
214 | rb->start = (rb->start + 1 < rb->size) ? rb->start + 1 : 0; |
---|
215 | #endif |
---|
216 | /* Now that's a empty slot */ |
---|
217 | if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) |
---|
218 | assert(sem_post(&rb->emptys) == 0); |
---|
219 | return value; |
---|
220 | } |
---|
221 | |
---|
222 | /** |
---|
223 | * Tries to read from the supplied buffer if it fails this and returns |
---|
224 | * 0 to indicate nothing was read. |
---|
225 | * |
---|
226 | * @param rb a pointer to libtrace_ringbuffer structure |
---|
227 | * @param out a pointer to a memory address where the returned item would be placed |
---|
228 | * @return 1 if a object was received otherwise 0, in this case out remains unchanged |
---|
229 | */ |
---|
230 | DLLEXPORT int libtrace_ringbuffer_try_read(libtrace_ringbuffer_t *rb, void ** value) { |
---|
231 | if (libtrace_ringbuffer_is_empty(rb)) |
---|
232 | return 0; |
---|
233 | *value = libtrace_ringbuffer_read(rb); |
---|
234 | return 1; |
---|
235 | } |
---|
236 | |
---|
237 | /** |
---|
238 | * A thread safe version of libtrace_ringbuffer_write |
---|
239 | */ |
---|
240 | DLLEXPORT void libtrace_ringbuffer_swrite(libtrace_ringbuffer_t * rb, void* value) { |
---|
241 | LOCK(w); |
---|
242 | libtrace_ringbuffer_write(rb, value); |
---|
243 | UNLOCK(w); |
---|
244 | } |
---|
245 | |
---|
246 | /** |
---|
247 | * A thread safe version of libtrace_ringbuffer_try_write |
---|
248 | */ |
---|
249 | DLLEXPORT int libtrace_ringbuffer_try_swrite(libtrace_ringbuffer_t * rb, void* value) { |
---|
250 | int ret; |
---|
251 | #if USE_CHECK_EARLY |
---|
252 | if (libtrace_ringbuffer_is_full(rb)) // Check early, drd issues |
---|
253 | return 0; |
---|
254 | #endif |
---|
255 | TRY_LOCK(w, return 0;); |
---|
256 | ret = libtrace_ringbuffer_try_write(rb, value); |
---|
257 | UNLOCK(w); |
---|
258 | return ret; |
---|
259 | } |
---|
260 | |
---|
261 | /** |
---|
262 | * A thread safe version of libtrace_ringbuffer_try_write |
---|
263 | * Unlike libtrace_ringbuffer_try_swrite this will block on da lock just |
---|
264 | * not the data. This will block for a long period of time if libtrace_ringbuffer_sread |
---|
265 | * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_swrite_bl |
---|
266 | * and libtrace_ringbuffer_try_swrite are being used. |
---|
267 | */ |
---|
268 | DLLEXPORT int libtrace_ringbuffer_try_swrite_bl(libtrace_ringbuffer_t * rb, void* value) { |
---|
269 | int ret; |
---|
270 | #if USE_CHECK_EARLY |
---|
271 | if (libtrace_ringbuffer_is_full(rb)) // Check early |
---|
272 | return 0; |
---|
273 | #endif |
---|
274 | LOCK(w); |
---|
275 | ret = libtrace_ringbuffer_try_write(rb, value); |
---|
276 | UNLOCK(w); |
---|
277 | return ret; |
---|
278 | } |
---|
279 | |
---|
280 | /** |
---|
281 | * A thread safe version of libtrace_ringbuffer_read |
---|
282 | */ |
---|
283 | DLLEXPORT void * libtrace_ringbuffer_sread(libtrace_ringbuffer_t *rb) { |
---|
284 | void* value; |
---|
285 | LOCK(r); |
---|
286 | value = libtrace_ringbuffer_read(rb); |
---|
287 | UNLOCK(r); |
---|
288 | return value; |
---|
289 | } |
---|
290 | |
---|
291 | /** |
---|
292 | * A thread safe version of libtrace_ringbuffer_try_write |
---|
293 | */ |
---|
294 | DLLEXPORT int libtrace_ringbuffer_try_sread(libtrace_ringbuffer_t *rb, void ** value) { |
---|
295 | int ret; |
---|
296 | #if USE_CHECK_EARLY |
---|
297 | if (libtrace_ringbuffer_is_empty(rb)) // Check early |
---|
298 | return 0; |
---|
299 | #endif |
---|
300 | TRY_LOCK(r, return 0;); |
---|
301 | ret = libtrace_ringbuffer_try_read(rb, value); |
---|
302 | UNLOCK(r); |
---|
303 | return ret; |
---|
304 | } |
---|
305 | |
---|
306 | /** |
---|
307 | * A thread safe version of libtrace_ringbuffer_try_wread |
---|
308 | * Unlike libtrace_ringbuffer_try_sread this will block on da lock just |
---|
309 | * not the data. This will block for a long period of time if libtrace_ringbuffer_sread |
---|
310 | * is holding the lock. However will not block for long if only libtrace_ringbuffer_try_sread_bl |
---|
311 | * and libtrace_ringbuffer_try_sread are being used. |
---|
312 | */ |
---|
313 | DLLEXPORT int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value) { |
---|
314 | int ret; |
---|
315 | #if USE_CHECK_EARLY |
---|
316 | if (libtrace_ringbuffer_is_empty(rb)) // Check early |
---|
317 | return 0; |
---|
318 | #endif |
---|
319 | LOCK(r); |
---|
320 | ret = libtrace_ringbuffer_try_read(rb, value); |
---|
321 | UNLOCK(r); |
---|
322 | return ret; |
---|
323 | } |
---|
324 | |
---|
325 | DLLEXPORT void libtrace_zero_ringbuffer(libtrace_ringbuffer_t * rb) |
---|
326 | { |
---|
327 | rb->start = 0; |
---|
328 | rb->end = 0; |
---|
329 | rb->size = 0; |
---|
330 | rb->elements = NULL; |
---|
331 | } |
---|