source: lib/data-struct/object_cache.c @ 4007dbb

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 4007dbb was 4007dbb, checked in by Richard Sanger <rsangerarj@…>, 5 years ago

Updates the new interface to be more complete

This should work around any issues with systems without thread support.
This still remains compatible with existing code.
Examples/tools/tests still need to be updated to make use of the new interface.
And tests also need to be updated.
Adds debug memory stats as an option to configure.

  • Property mode set to 100644
File size: 15.5 KB
Line 
1#include "object_cache.h"
2#include <assert.h>
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6
7
8// pthread tls is most likely slower than __thread, but they have destructors so
9// we use a combination of the two here!!
10// Note Apples implementation of TLS means that memory is not available / has
11// been zeroed by the time the pthread destructor is called.
12struct local_cache {
13        libtrace_ocache_t *oc;
14        size_t total;
15        size_t used;
16        void **cache;
17        bool invalid;
18};
19
20struct mem_stats {
21        struct memfail {
22           uint64_t cache_hit;
23           uint64_t ring_hit;
24           uint64_t miss;
25           uint64_t recycled;
26        } readbulk, read, write, writebulk;
27};
28
29#ifdef ENABLE_MEM_STATS
30extern __thread struct mem_stats mem_hits;
31#endif
32
33struct local_caches {
34        size_t t_mem_caches_used;
35        size_t t_mem_caches_total;
36        struct local_cache *t_mem_caches;
37};
38
39static pthread_key_t memory_destructor_key;
40static pthread_once_t memory_destructor_once = PTHREAD_ONCE_INIT;
41static inline struct local_caches *get_local_caches();
42
43/**
44 * @brief unregister_thread assumes we DONT hold spin
45 */
46static inline void unregister_thread(struct local_cache *lc) {
47        size_t i;
48        if (lc->invalid)
49                fprintf(stderr, "Already free'd the thread cache!!\n");
50        pthread_spin_lock(&lc->oc->spin);
51        // Remove it from our thread list
52        for (i=0; i < lc->oc->nb_thread_list; ++i) {
53                if (lc->oc->thread_list[i] == lc) {
54                        --lc->oc->nb_thread_list;
55                        lc->oc->thread_list[i] = lc->oc->thread_list[lc->oc->nb_thread_list];
56                        lc->oc->thread_list[lc->oc->nb_thread_list] = NULL;
57                        i = ~0U;
58                        break;
59                }
60        }
61        if (i != ~0U) {
62                fprintf(stderr, "Attempted to unregistered a thread with an"
63                         " ocache that had never registered this thread. Ignoring.\n");
64                pthread_spin_unlock(&lc->oc->spin);
65                return;
66        }
67        lc->invalid = true;
68
69        if (lc->oc->max_allocations) {
70                libtrace_ringbuffer_swrite_bulk(&lc->oc->rb, lc->cache, lc->used, lc->used);
71        } else {
72                size_t i;
73                // We just run the free these
74                for(i = 0; i < lc->used; ++i) {
75                        lc->oc->free(lc->cache[i]);
76                }
77        }
78        pthread_spin_unlock(&lc->oc->spin);
79}
80
81/**
82 * @brief register_thread assumes we DONT hold spin
83 */
84static inline void register_thread(libtrace_ocache_t *oc, struct local_cache *lc) {
85        lc->invalid = false;
86        pthread_spin_lock(&oc->spin);
87        if (oc->nb_thread_list == oc->max_nb_thread_list) {
88                oc->max_nb_thread_list += 0x10;
89                oc->thread_list = realloc(oc->thread_list, sizeof(void*) * oc->max_nb_thread_list);
90        }
91        oc->thread_list[oc->nb_thread_list] = lc;
92        ++oc->nb_thread_list;
93        pthread_spin_unlock(&oc->spin);
94}
95
96static void destroy_memory_caches(void *tlsaddr) {
97        size_t a;
98        struct local_caches *lcs = tlsaddr;
99
100        for (a = 0; a < lcs->t_mem_caches_used; ++a) {
101                unregister_thread(&lcs->t_mem_caches[a]);
102                // Write these all back to the main buffer, this might have issues we would want to free these
103                free(lcs->t_mem_caches[a].cache);
104        }
105        free(lcs->t_mem_caches);
106        lcs->t_mem_caches = NULL;
107        free(lcs);
108
109}
110
111static void once_memory_cache_key_init() {
112        ASSERT_RET(pthread_key_create(&memory_destructor_key, &destroy_memory_caches), == 0);
113}
114
115/**
116 * Adds more space to our mem_caches
117 */
118static void resize_memory_caches(struct local_caches *lcs) {
119        assert (lcs->t_mem_caches_total > 0);
120        lcs->t_mem_caches += 0x10;
121        lcs->t_mem_caches = realloc(lcs->t_mem_caches,
122                                    lcs->t_mem_caches_total * sizeof(struct local_cache));
123}
124
125/* Get TLS for the list of local_caches */
126static inline struct local_caches *get_local_caches() {
127#if HAVE_TLS
128        static __thread struct local_caches *lcs = NULL;
129        if (lcs) {
130                return lcs;
131        }
132#else
133        struct local_caches *lcs;
134        pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
135        if ((lcs=pthread_getspecific(memory_destructor_key)) != 0) {
136                return lcs;
137        }
138#endif
139        else {
140                /* This thread has not been used with a memory pool before */
141                /* Allocate our TLS */
142                assert(lcs == NULL);
143                lcs = calloc(1, sizeof (struct local_caches));
144                assert(lcs);
145                /* Hook into pthreads to destroy this when the thread ends */
146                pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
147                pthread_setspecific(memory_destructor_key, (void *) lcs);
148                lcs->t_mem_caches_total = 0x10;
149                lcs->t_mem_caches = calloc(0x10, sizeof(struct local_cache));
150                assert(lcs);
151                assert(lcs->t_mem_caches);
152                return lcs;
153        }
154}
155
156static inline struct local_cache * find_cache(libtrace_ocache_t *oc) {
157        size_t i;
158        struct local_cache *lc = NULL;
159        struct local_caches *lcs = get_local_caches();
160
161        for (i = 0; i < lcs->t_mem_caches_used; ++i) {
162                if (lcs->t_mem_caches[i].oc == oc) {
163                        lc = &lcs->t_mem_caches[i];
164                        break;
165                }
166        }
167
168        if (!oc->thread_cache_size)
169                return 0;
170
171        // Create a cache
172        if (!lc) {
173                if (lcs->t_mem_caches_used == lcs->t_mem_caches_total)
174                        resize_memory_caches(lcs);
175                lcs->t_mem_caches[lcs->t_mem_caches_used].oc = oc;
176                lcs->t_mem_caches[lcs->t_mem_caches_used].used = 0;
177                lcs->t_mem_caches[lcs->t_mem_caches_used].total = oc->thread_cache_size;
178                lcs->t_mem_caches[lcs->t_mem_caches_used].cache = malloc(sizeof(void*) * oc->thread_cache_size);
179                lcs->t_mem_caches[lcs->t_mem_caches_used].invalid = false;
180                lc = &lcs->t_mem_caches[lcs->t_mem_caches_used];
181                // Register it with the underlying ring_buffer
182                register_thread(lc->oc, lc);
183                ++lcs->t_mem_caches_used;
184        }
185
186        assert(!lc->invalid);
187        return lc;
188}
189
190/**
191  * Creates a object cache, that is a pool of dynamically allocated and recycled
192  * objects of a fixed size. This should be faster than malloc and free.
193  * The alloc and free methods are supplied by the user and are used when no
194  * recycled objects are available, or to tidy the final results.
195  *
196  * The performance of these pools will decrease if thread caches are used
197  * as this results in a list to lookup per thread. The pool is added when
198  * to this list when first encountered, these persist untill the thread exits.
199  *
200  * NOTE: If limit_size is true do not attempt to 'free' any objects that were
201  * not created by this pool back otherwise the 'free' might deadlock. Also
202  * be cautious when picking the buffer size, upto thread_cache_size*(threads-1)
203  * could be unusable at any given time if these are stuck in thread local caches.
204  *
205  * @param oc A pointer to the object cache structure which is to be initialised.
206  * @param alloc The allocation method, must not be NULL. [void *alloc()]
207  * @param free The free method used to destroy packets. [void free(void * obj)]
208  * @param thread_cache_size A small cache kept on a per thread basis, this can be 0
209  *             however should only be done if bulk reads of packets are being performed
210  *             or contention is minimal.
211  * @param buffer_size The number of packets to be stored in the main buffer.
212  * @param limit_size If true no more objects than buffer_size will be allocated,
213  *             reads will block (free never should).Otherwise packets can be freely
214  *     allocated upon requested and are free'd if there is not enough space for them.
215  * @return If successful returns 0 otherwise -1.
216  */
217DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void),
218                                    void (*free)(void *),
219                                    size_t thread_cache_size,
220                                    size_t buffer_size, bool limit_size) {
221
222        assert(buffer_size);
223        assert(alloc);
224        assert(free);
225        if (libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING) != 0) {
226                return -1;
227        }
228        oc->alloc = alloc;
229        oc->free = free;
230        oc->current_allocations = 0;
231        oc->thread_cache_size = thread_cache_size;
232        oc->nb_thread_list = 0;
233        oc->max_nb_thread_list = 0x10;
234        oc->thread_list = calloc(0x10, sizeof(void*));
235        if (oc->thread_list == NULL) {
236                libtrace_ringbuffer_destroy(&oc->rb);
237                return -1;
238        }
239        pthread_spin_init(&oc->spin, 0);
240        if (limit_size)
241                oc->max_allocations = buffer_size;
242        else
243                oc->max_allocations = 0;
244        return 0;
245}
246
247/**
248  * Destroys the object cache. Call this only once all memory has
249  * been free'd back and no more accesses will be made.
250  *
251  * @return Returns the number of packets outstanding, or extra object recevied
252  *             Ideally this should be zero (0) otherwise some form of memory leak
253  *             is likely present. Currenty only implemented in the case limit_size
254  *     is true.
255  */
256DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc) {
257        void *ele;
258
259        while (oc->nb_thread_list)
260                unregister_thread(oc->thread_list[0]);
261
262        pthread_spin_lock(&oc->spin);
263        while (libtrace_ringbuffer_try_read(&oc->rb, &ele)) {
264                oc->free(ele);
265                if (oc->max_allocations)
266                        --oc->current_allocations;
267        }
268        pthread_spin_unlock(&oc->spin);
269
270        if (oc->current_allocations)
271                fprintf(stderr, "OCache destroyed, leaking %d packets!!\n", (int) oc->current_allocations);
272
273        libtrace_ringbuffer_destroy(&oc->rb);
274        pthread_spin_destroy(&oc->spin);
275        free(oc->thread_list);
276        libtrace_zero_ocache(oc);
277        if (oc->current_allocations)
278                return (int) oc->current_allocations;
279        else
280                return 0;
281}
282
283static inline size_t libtrace_ocache_alloc_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
284                                                                                 struct local_cache *lc) {
285        libtrace_ringbuffer_t *rb = &oc->rb;
286        size_t i;
287
288        // We have enough cached!! Yay
289        if (nb_buffers <= lc->used) {
290                // Copy all from cache
291                memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers);
292                lc->used -= nb_buffers;
293#ifdef ENABLE_MEM_STATS
294                mem_hits.read.cache_hit += nb_buffers;
295                mem_hits.readbulk.cache_hit += 1;
296#endif
297                return nb_buffers;
298        }
299        // Cache is not big enough try read all from ringbuffer
300        else if (nb_buffers > lc->total) {
301                i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers);
302#ifdef ENABLE_MEM_STATS
303                if (i)
304                        mem_hits.readbulk.ring_hit += 1;
305                else
306                        mem_hits.readbulk.miss += 1;
307                mem_hits.read.ring_hit += i;
308#endif
309        } else { // Not enough cached
310                // Empty the cache and re-fill it and then see what we're left with
311                i = lc->used;
312                memcpy(values, lc->cache, sizeof(void *) * lc->used);
313#ifdef ENABLE_MEM_STATS
314                mem_hits.read.cache_hit += i;
315#endif
316
317                // Make sure we still meet the minimum requirement
318                if (i < min_nb_buffers)
319                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
320                else
321                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0);
322#ifdef ENABLE_MEM_STATS
323                if (lc->used == lc->total)
324                        mem_hits.readbulk.ring_hit += 1;
325                else
326                        mem_hits.readbulk.miss += 1;
327                mem_hits.read.ring_hit += lc->used;
328#endif
329        }
330
331        // Try fill the remaining
332        if (i < nb_buffers && lc->used) {
333                size_t remaining;
334                remaining = MIN(lc->used, nb_buffers - i);
335                memcpy(&values[i], &lc->cache[lc->used - remaining], sizeof(void *) * remaining);
336                lc->used -= remaining;
337                i += remaining;
338        }
339#ifdef ENABLE_MEM_STATS
340        mem_hits.read.miss += nb_buffers - i;
341#endif
342        assert(i >= min_nb_buffers);
343        return i;
344}
345
346DLLEXPORT size_t libtrace_ocache_alloc(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
347        struct local_cache *lc = find_cache(oc);
348        size_t i;
349        size_t min;
350        bool try_alloc = !(oc->max_allocations && oc->max_allocations <= oc->current_allocations);
351
352        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
353        min = try_alloc ? 0: min_nb_buffers;
354        if (lc)
355                i = libtrace_ocache_alloc_cache(oc, values, nb_buffers, min,  lc);
356        else
357                i = libtrace_ringbuffer_sread_bulk(&oc->rb, values, nb_buffers, min);
358
359        if (try_alloc) {
360                size_t nb;
361
362                // Try alloc the rest
363                if (oc->max_allocations) {
364                        pthread_spin_lock(&oc->spin);
365                        nb = MIN(oc->max_allocations - oc->current_allocations, nb_buffers - i);
366                        oc->current_allocations += nb;
367                        pthread_spin_unlock(&oc->spin);
368                        nb += i;
369                } else {
370                        nb = nb_buffers;
371                }
372
373                for (;i < nb; ++i) {
374                        values[i] = (*oc->alloc)();
375                        assert(values[i]);
376                }
377                assert (i == nb);
378                // Still got to wait for more
379                if (nb < min_nb_buffers) {
380                        if (lc)
381                                i += libtrace_ocache_alloc_cache(oc, &values[nb], nb_buffers - nb, min_nb_buffers - nb, lc);
382                        else
383                                i += libtrace_ringbuffer_sread_bulk(&oc->rb, &values[nb], nb_buffers - nb, min_nb_buffers - nb);
384                }
385        }
386        assert(i >= min_nb_buffers);
387        return i;
388}
389
390
391static inline size_t libtrace_ocache_free_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
392                                                                                        struct local_cache *lc) {
393        libtrace_ringbuffer_t *rb = &oc->rb;
394        size_t i;
395
396        // We have enough cached!! Yay
397        if (nb_buffers <= lc->total - lc->used) {
398                // Copy all to the cache
399                memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers);
400                lc->used += nb_buffers;
401#ifdef ENABLE_MEM_STATS
402                mem_hits.write.cache_hit += nb_buffers;
403                mem_hits.writebulk.cache_hit += 1;
404#endif
405                return nb_buffers;
406        }
407        // Cache is not big enough try write all to the ringbuffer
408        else if (nb_buffers > lc->total) {
409                i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers);
410#ifdef ENABLE_MEM_STATS
411                if (i)
412                        mem_hits.writebulk.ring_hit += 1;
413                else
414                        mem_hits.writebulk.miss += 1;
415                mem_hits.write.ring_hit += i;
416#endif
417        } else { // Not enough cache space but there might later
418                // Fill the cache and empty it and then see what we're left with
419                i = (lc->total - lc->used);
420                memcpy(&lc->cache[lc->used], values, sizeof(void *) * i);
421#ifdef ENABLE_MEM_STATS
422                mem_hits.write.cache_hit += i;
423#endif
424
425                // Make sure we still meet the minimum requirement
426                if (i < min_nb_buffers)
427                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
428                else
429                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, 0);
430
431                // Re originise fulls to the front
432                if (lc->used)
433                        memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used);
434
435#ifdef ENABLE_MEM_STATS
436                if (lc->used)
437                        mem_hits.writebulk.miss += 1;
438                else
439                        mem_hits.writebulk.ring_hit += 1;
440                mem_hits.write.ring_hit += lc->total - lc->used;
441#endif
442        }
443
444        // Try empty the remaining
445        if (i < nb_buffers && lc->used != lc->total) {
446                size_t remaining;
447                remaining = MIN(lc->total - lc->used, nb_buffers - i);
448                memcpy(&lc->cache[lc->used], &values[i], sizeof(void *) * remaining);
449                lc->used += remaining;
450                i += remaining;
451        }
452#ifdef ENABLE_MEM_STATS
453        mem_hits.write.miss += nb_buffers - i;
454#endif
455        return i;
456}
457
458DLLEXPORT size_t libtrace_ocache_free(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
459        struct local_cache *lc = find_cache(oc);
460        size_t i;
461        size_t min;
462
463        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
464        min = oc->max_allocations ? min_nb_buffers : 0;
465        if (lc)
466                i = libtrace_ocache_free_cache(oc, values, nb_buffers, min, lc);
467        else
468                i = libtrace_ringbuffer_swrite_bulk(&oc->rb, values, nb_buffers, min);
469
470        if (!oc->max_allocations) {
471                // Free these normally
472                for (;i < min_nb_buffers; ++i) {
473                        oc->free(values[i]);
474                }
475        }
476        return i;
477}
478
479DLLEXPORT void libtrace_zero_ocache(libtrace_ocache_t *oc) {
480        libtrace_zero_ringbuffer(&oc->rb);
481        oc->thread_cache_size = 0;
482        oc->alloc = NULL;
483        oc->free = NULL;
484        oc->current_allocations = 0;
485        oc->max_allocations = 0;
486        oc->nb_thread_list = 0;
487        oc->max_nb_thread_list = 0;
488        oc->thread_list = NULL;
489}
490
491/**
492 * @brief ocache_unregister_thread removes a thread from an ocache.
493 * @param The ocache to remove this thread, this will free any packets in the TLS cache
494 */
495DLLEXPORT void libtrace_ocache_unregister_thread(libtrace_ocache_t *oc) {
496        size_t i;
497        struct local_caches *lcs = get_local_caches();
498        struct local_cache *lc = find_cache(oc);
499
500        if (lc) {
501                for (i = 0; i < lcs->t_mem_caches_used; ++i) {
502                        if (&lcs->t_mem_caches[i] == lc) {
503                                // Free the cache against the ocache
504                                unregister_thread(&lcs->t_mem_caches[i]);
505                                free(lcs->t_mem_caches[i].cache);
506                                // And remove it from the thread itself
507                                --lcs->t_mem_caches_used;
508                                lcs->t_mem_caches[i] = lcs->t_mem_caches[lcs->t_mem_caches_used];
509                                memset(&lcs->t_mem_caches[lcs->t_mem_caches_used], 0, sizeof(struct local_cache));
510                        }
511                }
512        }
513}
Note: See TracBrowser for help on using the repository browser.