source: lib/data-struct/object_cache.c @ 14c6c08

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 14c6c08 was 14c6c08, checked in by Richard Sanger <rsangerarj@…>, 6 years ago

Fix libtrace_ocache to work correctly on Mac.

Apple's implementation of pthreads_once destructor seems to occur
after thread storage is destroyed (or zeroed). This rewrite fixes
that by only using
thread storage as a means for quick access and
malloc()ing the storage.

  • Property mode set to 100644
File size: 15.0 KB
Line 
1#include "object_cache.h"
2#include <assert.h>
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6
7
8// pthread tls is most likely slower than __thread, but they have destructors so
9// we use a combination of the two here!!
10// Note Apples implementation of TLS means that memory is not available / has
11// been zeroed by the time the pthread destructor is called.
12struct local_cache {
13        libtrace_ocache_t *oc;
14        size_t total;
15        size_t used;
16        void **cache;
17        bool invalid;
18};
19
20struct mem_stats {
21        struct memfail {
22           uint64_t cache_hit;
23           uint64_t ring_hit;
24           uint64_t miss;
25           uint64_t recycled;
26        } readbulk, read, write, writebulk;
27};
28
29extern __thread struct mem_stats mem_hits;
30
31struct local_caches {
32        size_t t_mem_caches_used;
33        size_t t_mem_caches_total;
34        struct local_cache *t_mem_caches;
35};
36
37static pthread_key_t memory_destructor_key;
38static pthread_once_t memory_destructor_once = PTHREAD_ONCE_INIT;
39static inline struct local_caches *get_local_caches();
40
41/**
42 * @brief unregister_thread assumes we DONT hold spin
43 */
44static inline void unregister_thread(struct local_cache *lc) {
45        size_t i;
46        if (lc->invalid)
47                fprintf(stderr, "Already free'd the thread cache!!\n");
48        pthread_spin_lock(&lc->oc->spin);
49        // Remove it from our thread list
50        for (i=0; i < lc->oc->nb_thread_list; ++i) {
51                if (lc->oc->thread_list[i] == lc) {
52                        --lc->oc->nb_thread_list;
53                        lc->oc->thread_list[i] = lc->oc->thread_list[lc->oc->nb_thread_list];
54                        lc->oc->thread_list[lc->oc->nb_thread_list] = NULL;
55                        i = ~0U;
56                        break;
57                }
58        }
59        if (i != ~0U) {
60                fprintf(stderr, "Attempted to unregistered a thread with an"
61                         " ocache that had never registered this thread. Ignoring.\n");
62                pthread_spin_unlock(&lc->oc->spin);
63                return;
64        }
65        lc->invalid = true;
66
67        if (lc->oc->max_allocations) {
68                libtrace_ringbuffer_swrite_bulk(&lc->oc->rb, lc->cache, lc->used, lc->used);
69        } else {
70                size_t i;
71                // We just run the free these
72                for(i = 0; i < lc->used; ++i) {
73                        lc->oc->free(lc->cache[i]);
74                }
75        }
76        pthread_spin_unlock(&lc->oc->spin);
77}
78
79/**
80 * @brief register_thread assumes we DONT hold spin
81 */
82static inline void register_thread(libtrace_ocache_t *oc, struct local_cache *lc) {
83        lc->invalid = false;
84        pthread_spin_lock(&oc->spin);
85        if (oc->nb_thread_list == oc->max_nb_thread_list) {
86                oc->max_nb_thread_list += 0x10;
87                oc->thread_list = realloc(oc->thread_list, sizeof(void*) * oc->max_nb_thread_list);
88        }
89        oc->thread_list[oc->nb_thread_list] = lc;
90        ++oc->nb_thread_list;
91        pthread_spin_unlock(&oc->spin);
92}
93
94static void destroy_memory_caches(void *tlsaddr) {
95        size_t a;
96        struct local_caches *lcs = tlsaddr;
97
98        for (a = 0; a < lcs->t_mem_caches_used; ++a) {
99                unregister_thread(&lcs->t_mem_caches[a]);
100                // Write these all back to the main buffer, this might have issues we would want to free these
101                free(lcs->t_mem_caches[a].cache);
102        }
103        free(lcs->t_mem_caches);
104        lcs->t_mem_caches = NULL;
105        free(lcs);
106
107}
108
109static void once_memory_cache_key_init() {
110        ASSERT_RET(pthread_key_create(&memory_destructor_key, &destroy_memory_caches), == 0);
111}
112
113/**
114 * Adds more space to our mem_caches
115 */
116static void resize_memory_caches(struct local_caches *lcs) {
117        assert (lcs->t_mem_caches_total > 0);
118        lcs->t_mem_caches += 0x10;
119        lcs->t_mem_caches = realloc(lcs->t_mem_caches,
120                                    lcs->t_mem_caches_total * sizeof(struct local_cache));
121}
122
123/* Get TLS for the list of local_caches */
124static inline struct local_caches *get_local_caches() {
125        static __thread struct local_caches *lcs = NULL;
126        if (lcs) {
127                return lcs;
128        } else {
129                /* This thread has not been used with a memory pool before */
130                /* Allocate our TLS */
131                assert(lcs == NULL);
132                lcs = calloc(1, sizeof (struct local_caches));
133                assert(lcs);
134                /* Hook into pthreads to destroy this when the thread ends */
135                pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
136                pthread_setspecific(memory_destructor_key, (void *) lcs);
137                lcs->t_mem_caches_total = 0x10;
138                lcs->t_mem_caches = calloc(0x10, sizeof(struct local_cache));
139                assert(lcs);
140                assert(lcs->t_mem_caches);
141                return lcs;
142        }
143}
144
145static inline struct local_cache * find_cache(libtrace_ocache_t *oc) {
146        size_t i;
147        struct local_cache *lc = NULL;
148        struct local_caches *lcs = get_local_caches();
149
150        for (i = 0; i < lcs->t_mem_caches_used; ++i) {
151                if (lcs->t_mem_caches[i].oc == oc) {
152                        lc = &lcs->t_mem_caches[i];
153                        break;
154                }
155        }
156
157        if (!oc->thread_cache_size)
158                return 0;
159
160        // Create a cache
161        if (!lc) {
162                if (lcs->t_mem_caches_used == lcs->t_mem_caches_total)
163                        resize_memory_caches(lcs);
164                lcs->t_mem_caches[lcs->t_mem_caches_used].oc = oc;
165                lcs->t_mem_caches[lcs->t_mem_caches_used].used = 0;
166                lcs->t_mem_caches[lcs->t_mem_caches_used].total = oc->thread_cache_size;
167                lcs->t_mem_caches[lcs->t_mem_caches_used].cache = malloc(sizeof(void*) * oc->thread_cache_size);
168                lcs->t_mem_caches[lcs->t_mem_caches_used].invalid = false;
169                lc = &lcs->t_mem_caches[lcs->t_mem_caches_used];
170                // Register it with the underlying ring_buffer
171                register_thread(lc->oc, lc);
172                ++lcs->t_mem_caches_used;
173        }
174
175        assert(!lc->invalid);
176        return lc;
177}
178
179/**
180  * Creates a object cache, that is a pool of dynamically allocated and recycled
181  * objects of a fixed size. This should be faster than malloc and free.
182  * The alloc and free methods are supplied by the user and are used when no
183  * recycled objects are available, or to tidy the final results.
184  *
185  * The performance of these pools will decrease if thread caches are used
186  * as this results in a list to lookup per thread. The pool is added when
187  * to this list when first encountered, these persist untill the thread exits.
188  *
189  * NOTE: If limit_size is true do not attempt to 'free' any objects that were
190  * not created by this pool back otherwise the 'free' might deadlock. Also
191  * be cautious when picking the buffer size, upto thread_cache_size*(threads-1)
192  * could be unusable at any given time if these are stuck in thread local caches.
193  *
194  * @param oc A pointer to the object cache structure which is to be initialised.
195  * @param alloc The allocation method, must not be NULL. [void *alloc()]
196  * @param free The free method used to destroy packets. [void free(void * obj)]
197  * @param thread_cache_size A small cache kept on a per thread basis, this can be 0
198  *             however should only be done if bulk reads of packets are being performed
199  *             or contention is minimal.
200  * @param buffer_size The number of packets to be stored in the main buffer.
201  * @param limit_size If true no more objects than buffer_size will be allocated,
202  *             reads will block (free never should).Otherwise packets can be freely
203  *     allocated upon requested and are free'd if there is not enough space for them.
204  * @return If successful returns 0 otherwise -1.
205  */
206DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void),
207                                    void (*free)(void *),
208                                    size_t thread_cache_size,
209                                    size_t buffer_size, bool limit_size) {
210
211        assert(buffer_size);
212        assert(alloc);
213        assert(free);
214        if (libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING) != 0) {
215                return -1;
216        }
217        oc->alloc = alloc;
218        oc->free = free;
219        oc->current_allocations = 0;
220        oc->thread_cache_size = thread_cache_size;
221        oc->nb_thread_list = 0;
222        oc->max_nb_thread_list = 0x10;
223        oc->thread_list = calloc(0x10, sizeof(void*));
224        if (oc->thread_list == NULL) {
225                libtrace_ringbuffer_destroy(&oc->rb);
226                return -1;
227        }
228        pthread_spin_init(&oc->spin, 0);
229        if (limit_size)
230                oc->max_allocations = buffer_size;
231        else
232                oc->max_allocations = 0;
233        return 0;
234}
235
236/**
237  * Destroys the object cache. Call this only once all memory has
238  * been free'd back and no more accesses will be made.
239  *
240  * @return Returns the number of packets outstanding, or extra object recevied
241  *             Ideally this should be zero (0) otherwise some form of memory leak
242  *             is likely present. Currenty only implemented in the case limit_size
243  *     is true.
244  */
245DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc) {
246        void *ele;
247
248        while (oc->nb_thread_list)
249                unregister_thread(oc->thread_list[0]);
250
251        pthread_spin_lock(&oc->spin);
252        while (libtrace_ringbuffer_try_read(&oc->rb, &ele)) {
253                oc->free(ele);
254                if (oc->max_allocations)
255                        --oc->current_allocations;
256        }
257        pthread_spin_unlock(&oc->spin);
258
259        if (oc->current_allocations)
260                fprintf(stderr, "OCache destroyed, leaking %d packets!!\n", (int) oc->current_allocations);
261
262        libtrace_ringbuffer_destroy(&oc->rb);
263        pthread_spin_destroy(&oc->spin);
264        free(oc->thread_list);
265        libtrace_zero_ocache(oc);
266        if (oc->current_allocations)
267                return (int) oc->current_allocations;
268        else
269                return 0;
270}
271
272static inline size_t libtrace_ocache_alloc_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
273                                                                                 struct local_cache *lc) {
274        libtrace_ringbuffer_t *rb = &oc->rb;
275        size_t i;
276
277        // We have enough cached!! Yay
278        if (nb_buffers <= lc->used) {
279                // Copy all from cache
280                memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers);
281                lc->used -= nb_buffers;
282                mem_hits.read.cache_hit += nb_buffers;
283                mem_hits.readbulk.cache_hit += 1;
284                return nb_buffers;
285        }
286        // Cache is not big enough try read all from ringbuffer
287        else if (nb_buffers > lc->total) {
288                i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers);
289                if (i)
290                        mem_hits.readbulk.ring_hit += 1;
291                else
292                        mem_hits.readbulk.miss += 1;
293                mem_hits.read.ring_hit += i;
294        } else { // Not enough cached
295                // Empty the cache and re-fill it and then see what we're left with
296                i = lc->used;
297                memcpy(values, lc->cache, sizeof(void *) * lc->used);
298                mem_hits.read.cache_hit += i;
299
300                // Make sure we still meet the minimum requirement
301                if (i < min_nb_buffers)
302                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
303                else
304                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0);
305
306                if (lc->used == lc->total)
307                        mem_hits.readbulk.ring_hit += 1;
308                else
309                        mem_hits.readbulk.miss += 1;
310                mem_hits.read.ring_hit += lc->used;
311        }
312
313        // Try fill the remaining
314        if (i < nb_buffers && lc->used) {
315                size_t remaining;
316                remaining = MIN(lc->used, nb_buffers - i);
317                memcpy(&values[i], &lc->cache[lc->used - remaining], sizeof(void *) * remaining);
318                lc->used -= remaining;
319                i += remaining;
320        }
321        mem_hits.read.miss += nb_buffers - i;
322        assert(i >= min_nb_buffers);
323        return i;
324}
325
326DLLEXPORT size_t libtrace_ocache_alloc(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
327        struct local_cache *lc = find_cache(oc);
328        size_t i;
329        size_t min;
330        bool try_alloc = !(oc->max_allocations && oc->max_allocations <= oc->current_allocations);
331
332        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
333        min = try_alloc ? 0: min_nb_buffers;
334        if (lc)
335                i = libtrace_ocache_alloc_cache(oc, values, nb_buffers, min,  lc);
336        else
337                i = libtrace_ringbuffer_sread_bulk(&oc->rb, values, nb_buffers, min);
338
339        if (try_alloc) {
340                size_t nb;
341
342                // Try alloc the rest
343                if (oc->max_allocations) {
344                        pthread_spin_lock(&oc->spin);
345                        nb = MIN(oc->max_allocations - oc->current_allocations, nb_buffers - i);
346                        oc->current_allocations += nb;
347                        pthread_spin_unlock(&oc->spin);
348                        nb += i;
349                } else {
350                        nb = nb_buffers;
351                }
352
353                for (;i < nb; ++i) {
354                        values[i] = (*oc->alloc)();
355                        assert(values[i]);
356                }
357                assert (i == nb);
358                // Still got to wait for more
359                if (nb < min_nb_buffers) {
360                        if (lc)
361                                i += libtrace_ocache_alloc_cache(oc, &values[nb], nb_buffers - nb, min_nb_buffers - nb, lc);
362                        else
363                                i += libtrace_ringbuffer_sread_bulk(&oc->rb, &values[nb], nb_buffers - nb, min_nb_buffers - nb);
364                }
365        }
366        assert(i >= min_nb_buffers);
367        return i;
368}
369
370
371static inline size_t libtrace_ocache_free_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
372                                                                                        struct local_cache *lc) {
373        libtrace_ringbuffer_t *rb = &oc->rb;
374        size_t i;
375
376        // We have enough cached!! Yay
377        if (nb_buffers <= lc->total - lc->used) {
378                // Copy all to the cache
379                memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers);
380                lc->used += nb_buffers;
381                mem_hits.write.cache_hit += nb_buffers;
382                mem_hits.writebulk.cache_hit += 1;
383                return nb_buffers;
384        }
385        // Cache is not big enough try write all to the ringbuffer
386        else if (nb_buffers > lc->total) {
387                i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers);
388                if (i)
389                        mem_hits.writebulk.ring_hit += 1;
390                else
391                        mem_hits.writebulk.miss += 1;
392                mem_hits.write.ring_hit += i;
393        } else { // Not enough cache space but there might later
394                // Fill the cache and empty it and then see what we're left with
395                i = (lc->total - lc->used);
396                memcpy(&lc->cache[lc->used], values, sizeof(void *) * i);
397                mem_hits.write.cache_hit += i;
398
399                // Make sure we still meet the minimum requirement
400                if (i < min_nb_buffers)
401                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
402                else
403                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, 0);
404
405                // Re originise fulls to the front
406                if (lc->used)
407                        memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used);
408
409                if (lc->used)
410                        mem_hits.writebulk.miss += 1;
411                else
412                        mem_hits.writebulk.ring_hit += 1;
413                mem_hits.write.ring_hit += lc->total - lc->used;
414        }
415
416        // Try empty the remaining
417        if (i < nb_buffers && lc->used != lc->total) {
418                size_t remaining;
419                remaining = MIN(lc->total - lc->used, nb_buffers - i);
420                memcpy(&lc->cache[lc->used], &values[i], sizeof(void *) * remaining);
421                lc->used += remaining;
422                i += remaining;
423        }
424        mem_hits.write.miss += nb_buffers - i;
425        return i;
426}
427
428DLLEXPORT size_t libtrace_ocache_free(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
429        struct local_cache *lc = find_cache(oc);
430        size_t i;
431        size_t min;
432
433        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
434        min = oc->max_allocations ? min_nb_buffers : 0;
435        if (lc)
436                i = libtrace_ocache_free_cache(oc, values, nb_buffers, min, lc);
437        else
438                i = libtrace_ringbuffer_swrite_bulk(&oc->rb, values, nb_buffers, min);
439
440        if (!oc->max_allocations) {
441                // Free these normally
442                for (;i < min_nb_buffers; ++i) {
443                        oc->free(values[i]);
444                }
445        }
446        return i;
447}
448
449DLLEXPORT void libtrace_zero_ocache(libtrace_ocache_t *oc) {
450        libtrace_zero_ringbuffer(&oc->rb);
451        oc->thread_cache_size = 0;
452        oc->alloc = NULL;
453        oc->free = NULL;
454        oc->current_allocations = 0;
455        oc->max_allocations = 0;
456        oc->nb_thread_list = 0;
457        oc->max_nb_thread_list = 0;
458        oc->thread_list = NULL;
459}
460
461/**
462 * @brief ocache_unregister_thread removes a thread from an ocache.
463 * @param The ocache to remove this thread, this will free any packets in the TLS cache
464 */
465DLLEXPORT void libtrace_ocache_unregister_thread(libtrace_ocache_t *oc) {
466        size_t i;
467        struct local_caches *lcs = get_local_caches();
468        struct local_cache *lc = find_cache(oc);
469
470        if (lc) {
471                for (i = 0; i < lcs->t_mem_caches_used; ++i) {
472                        if (&lcs->t_mem_caches[i] == lc) {
473                                // Free the cache against the ocache
474                                unregister_thread(&lcs->t_mem_caches[i]);
475                                free(lcs->t_mem_caches[i].cache);
476                                // And remove it from the thread itself
477                                --lcs->t_mem_caches_used;
478                                lcs->t_mem_caches[i] = lcs->t_mem_caches[lcs->t_mem_caches_used];
479                                memset(&lcs->t_mem_caches[lcs->t_mem_caches_used], 0, sizeof(struct local_cache));
480                        }
481                }
482        }
483}
Note: See TracBrowser for help on using the repository browser.