source: lib/data-struct/object_cache.c @ 322c516

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 322c516 was f45ff19, checked in by Shane Alcock <salcock@…>, 6 years ago

Make sure our config #defines make it to the object cache

  • Property mode set to 100644
File size: 15.5 KB
Line 
1#include "config.h"
2#include "object_cache.h"
3#include <assert.h>
4#include <stdio.h>
5#include <stdlib.h>
6#include <string.h>
7
8
9// pthread tls is most likely slower than __thread, but they have destructors so
10// we use a combination of the two here!!
11// Note Apples implementation of TLS means that memory is not available / has
12// been zeroed by the time the pthread destructor is called.
13struct local_cache {
14        libtrace_ocache_t *oc;
15        size_t total;
16        size_t used;
17        void **cache;
18        bool invalid;
19};
20
21struct mem_stats {
22        struct memfail {
23           uint64_t cache_hit;
24           uint64_t ring_hit;
25           uint64_t miss;
26           uint64_t recycled;
27        } readbulk, read, write, writebulk;
28};
29
30#ifdef ENABLE_MEM_STATS
31extern __thread struct mem_stats mem_hits;
32#endif
33
34struct local_caches {
35        size_t t_mem_caches_used;
36        size_t t_mem_caches_total;
37        struct local_cache *t_mem_caches;
38};
39
40static pthread_key_t memory_destructor_key;
41static pthread_once_t memory_destructor_once = PTHREAD_ONCE_INIT;
42static inline struct local_caches *get_local_caches();
43
44/**
45 * @brief unregister_thread assumes we DONT hold spin
46 */
47static inline void unregister_thread(struct local_cache *lc) {
48        size_t i;
49        if (lc->invalid)
50                fprintf(stderr, "Already free'd the thread cache!!\n");
51        pthread_spin_lock(&lc->oc->spin);
52        // Remove it from our thread list
53        for (i=0; i < lc->oc->nb_thread_list; ++i) {
54                if (lc->oc->thread_list[i] == lc) {
55                        --lc->oc->nb_thread_list;
56                        lc->oc->thread_list[i] = lc->oc->thread_list[lc->oc->nb_thread_list];
57                        lc->oc->thread_list[lc->oc->nb_thread_list] = NULL;
58                        i = ~0U;
59                        break;
60                }
61        }
62        if (i != ~0U) {
63                fprintf(stderr, "Attempted to unregistered a thread with an"
64                         " ocache that had never registered this thread. Ignoring.\n");
65                pthread_spin_unlock(&lc->oc->spin);
66                return;
67        }
68        lc->invalid = true;
69
70        if (lc->oc->max_allocations) {
71                libtrace_ringbuffer_swrite_bulk(&lc->oc->rb, lc->cache, lc->used, lc->used);
72        } else {
73                size_t i;
74                // We just run the free these
75                for(i = 0; i < lc->used; ++i) {
76                        lc->oc->free(lc->cache[i]);
77                }
78        }
79        pthread_spin_unlock(&lc->oc->spin);
80}
81
82/**
83 * @brief register_thread assumes we DONT hold spin
84 */
85static inline void register_thread(libtrace_ocache_t *oc, struct local_cache *lc) {
86        lc->invalid = false;
87        pthread_spin_lock(&oc->spin);
88        if (oc->nb_thread_list == oc->max_nb_thread_list) {
89                oc->max_nb_thread_list += 0x10;
90                oc->thread_list = realloc(oc->thread_list, sizeof(void*) * oc->max_nb_thread_list);
91        }
92        oc->thread_list[oc->nb_thread_list] = lc;
93        ++oc->nb_thread_list;
94        pthread_spin_unlock(&oc->spin);
95}
96
97static void destroy_memory_caches(void *tlsaddr) {
98        size_t a;
99        struct local_caches *lcs = tlsaddr;
100
101        for (a = 0; a < lcs->t_mem_caches_used; ++a) {
102                unregister_thread(&lcs->t_mem_caches[a]);
103                // Write these all back to the main buffer, this might have issues we would want to free these
104                free(lcs->t_mem_caches[a].cache);
105        }
106        free(lcs->t_mem_caches);
107        lcs->t_mem_caches = NULL;
108        free(lcs);
109
110}
111
112static void once_memory_cache_key_init() {
113        ASSERT_RET(pthread_key_create(&memory_destructor_key, &destroy_memory_caches), == 0);
114}
115
116/**
117 * Adds more space to our mem_caches
118 */
119static void resize_memory_caches(struct local_caches *lcs) {
120        assert (lcs->t_mem_caches_total > 0);
121        lcs->t_mem_caches += 0x10;
122        lcs->t_mem_caches = realloc(lcs->t_mem_caches,
123                                    lcs->t_mem_caches_total * sizeof(struct local_cache));
124}
125
126/* Get TLS for the list of local_caches */
127static inline struct local_caches *get_local_caches() {
128#if HAVE_TLS
129        static __thread struct local_caches *lcs = NULL;
130        if (lcs) {
131                return lcs;
132        }
133#else
134        struct local_caches *lcs;
135        pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
136        if ((lcs=pthread_getspecific(memory_destructor_key)) != 0) {
137                return lcs;
138        }
139#endif
140        else {
141                /* This thread has not been used with a memory pool before */
142                /* Allocate our TLS */
143                assert(lcs == NULL);
144                lcs = calloc(1, sizeof (struct local_caches));
145                assert(lcs);
146                /* Hook into pthreads to destroy this when the thread ends */
147                pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
148                pthread_setspecific(memory_destructor_key, (void *) lcs);
149                lcs->t_mem_caches_total = 0x10;
150                lcs->t_mem_caches = calloc(0x10, sizeof(struct local_cache));
151                assert(lcs);
152                assert(lcs->t_mem_caches);
153                return lcs;
154        }
155}
156
157static inline struct local_cache * find_cache(libtrace_ocache_t *oc) {
158        size_t i;
159        struct local_cache *lc = NULL;
160        struct local_caches *lcs = get_local_caches();
161
162        for (i = 0; i < lcs->t_mem_caches_used; ++i) {
163                if (lcs->t_mem_caches[i].oc == oc) {
164                        lc = &lcs->t_mem_caches[i];
165                        break;
166                }
167        }
168
169        if (!oc->thread_cache_size)
170                return 0;
171
172        // Create a cache
173        if (!lc) {
174                if (lcs->t_mem_caches_used == lcs->t_mem_caches_total)
175                        resize_memory_caches(lcs);
176                lcs->t_mem_caches[lcs->t_mem_caches_used].oc = oc;
177                lcs->t_mem_caches[lcs->t_mem_caches_used].used = 0;
178                lcs->t_mem_caches[lcs->t_mem_caches_used].total = oc->thread_cache_size;
179                lcs->t_mem_caches[lcs->t_mem_caches_used].cache = malloc(sizeof(void*) * oc->thread_cache_size);
180                lcs->t_mem_caches[lcs->t_mem_caches_used].invalid = false;
181                lc = &lcs->t_mem_caches[lcs->t_mem_caches_used];
182                // Register it with the underlying ring_buffer
183                register_thread(lc->oc, lc);
184                ++lcs->t_mem_caches_used;
185        }
186
187        assert(!lc->invalid);
188        return lc;
189}
190
191/**
192  * Creates a object cache, that is a pool of dynamically allocated and recycled
193  * objects of a fixed size. This should be faster than malloc and free.
194  * The alloc and free methods are supplied by the user and are used when no
195  * recycled objects are available, or to tidy the final results.
196  *
197  * The performance of these pools will decrease if thread caches are used
198  * as this results in a list to lookup per thread. The pool is added when
199  * to this list when first encountered, these persist untill the thread exits.
200  *
201  * NOTE: If limit_size is true do not attempt to 'free' any objects that were
202  * not created by this pool back otherwise the 'free' might deadlock. Also
203  * be cautious when picking the buffer size, upto thread_cache_size*(threads-1)
204  * could be unusable at any given time if these are stuck in thread local caches.
205  *
206  * @param oc A pointer to the object cache structure which is to be initialised.
207  * @param alloc The allocation method, must not be NULL. [void *alloc()]
208  * @param free The free method used to destroy packets. [void free(void * obj)]
209  * @param thread_cache_size A small cache kept on a per thread basis, this can be 0
210  *             however should only be done if bulk reads of packets are being performed
211  *             or contention is minimal.
212  * @param buffer_size The number of packets to be stored in the main buffer.
213  * @param limit_size If true no more objects than buffer_size will be allocated,
214  *             reads will block (free never should).Otherwise packets can be freely
215  *     allocated upon requested and are free'd if there is not enough space for them.
216  * @return If successful returns 0 otherwise -1.
217  */
218DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void),
219                                    void (*free)(void *),
220                                    size_t thread_cache_size,
221                                    size_t buffer_size, bool limit_size) {
222
223        assert(buffer_size);
224        assert(alloc);
225        assert(free);
226        if (libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING) != 0) {
227                return -1;
228        }
229        oc->alloc = alloc;
230        oc->free = free;
231        oc->current_allocations = 0;
232        oc->thread_cache_size = thread_cache_size;
233        oc->nb_thread_list = 0;
234        oc->max_nb_thread_list = 0x10;
235        oc->thread_list = calloc(0x10, sizeof(void*));
236        if (oc->thread_list == NULL) {
237                libtrace_ringbuffer_destroy(&oc->rb);
238                return -1;
239        }
240        pthread_spin_init(&oc->spin, 0);
241        if (limit_size)
242                oc->max_allocations = buffer_size;
243        else
244                oc->max_allocations = 0;
245        return 0;
246}
247
248/**
249  * Destroys the object cache. Call this only once all memory has
250  * been free'd back and no more accesses will be made.
251  *
252  * @return Returns the number of packets outstanding, or extra object recevied
253  *             Ideally this should be zero (0) otherwise some form of memory leak
254  *             is likely present. Currenty only implemented in the case limit_size
255  *     is true.
256  */
257DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc) {
258        void *ele;
259
260        while (oc->nb_thread_list)
261                unregister_thread(oc->thread_list[0]);
262
263        pthread_spin_lock(&oc->spin);
264        while (libtrace_ringbuffer_try_read(&oc->rb, &ele)) {
265                oc->free(ele);
266                if (oc->max_allocations)
267                        --oc->current_allocations;
268        }
269        pthread_spin_unlock(&oc->spin);
270
271        if (oc->current_allocations)
272                fprintf(stderr, "OCache destroyed, leaking %d packets!!\n", (int) oc->current_allocations);
273
274        libtrace_ringbuffer_destroy(&oc->rb);
275        pthread_spin_destroy(&oc->spin);
276        free(oc->thread_list);
277        libtrace_zero_ocache(oc);
278        if (oc->current_allocations)
279                return (int) oc->current_allocations;
280        else
281                return 0;
282}
283
284static inline size_t libtrace_ocache_alloc_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
285                                                                                 struct local_cache *lc) {
286        libtrace_ringbuffer_t *rb = &oc->rb;
287        size_t i;
288
289        // We have enough cached!! Yay
290        if (nb_buffers <= lc->used) {
291                // Copy all from cache
292                memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers);
293                lc->used -= nb_buffers;
294#ifdef ENABLE_MEM_STATS
295                mem_hits.read.cache_hit += nb_buffers;
296                mem_hits.readbulk.cache_hit += 1;
297#endif
298                return nb_buffers;
299        }
300        // Cache is not big enough try read all from ringbuffer
301        else if (nb_buffers > lc->total) {
302                i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers);
303#ifdef ENABLE_MEM_STATS
304                if (i)
305                        mem_hits.readbulk.ring_hit += 1;
306                else
307                        mem_hits.readbulk.miss += 1;
308                mem_hits.read.ring_hit += i;
309#endif
310        } else { // Not enough cached
311                // Empty the cache and re-fill it and then see what we're left with
312                i = lc->used;
313                memcpy(values, lc->cache, sizeof(void *) * lc->used);
314#ifdef ENABLE_MEM_STATS
315                mem_hits.read.cache_hit += i;
316#endif
317
318                // Make sure we still meet the minimum requirement
319                if (i < min_nb_buffers)
320                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
321                else
322                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0);
323#ifdef ENABLE_MEM_STATS
324                if (lc->used == lc->total)
325                        mem_hits.readbulk.ring_hit += 1;
326                else
327                        mem_hits.readbulk.miss += 1;
328                mem_hits.read.ring_hit += lc->used;
329#endif
330        }
331
332        // Try fill the remaining
333        if (i < nb_buffers && lc->used) {
334                size_t remaining;
335                remaining = MIN(lc->used, nb_buffers - i);
336                memcpy(&values[i], &lc->cache[lc->used - remaining], sizeof(void *) * remaining);
337                lc->used -= remaining;
338                i += remaining;
339        }
340#ifdef ENABLE_MEM_STATS
341        mem_hits.read.miss += nb_buffers - i;
342#endif
343        assert(i >= min_nb_buffers);
344        return i;
345}
346
347DLLEXPORT size_t libtrace_ocache_alloc(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
348        struct local_cache *lc = find_cache(oc);
349        size_t i;
350        size_t min;
351        bool try_alloc = !(oc->max_allocations && oc->max_allocations <= oc->current_allocations);
352
353        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
354        min = try_alloc ? 0: min_nb_buffers;
355        if (lc)
356                i = libtrace_ocache_alloc_cache(oc, values, nb_buffers, min,  lc);
357        else
358                i = libtrace_ringbuffer_sread_bulk(&oc->rb, values, nb_buffers, min);
359
360        if (try_alloc) {
361                size_t nb;
362
363                // Try alloc the rest
364                if (oc->max_allocations) {
365                        pthread_spin_lock(&oc->spin);
366                        nb = MIN(oc->max_allocations - oc->current_allocations, nb_buffers - i);
367                        oc->current_allocations += nb;
368                        pthread_spin_unlock(&oc->spin);
369                        nb += i;
370                } else {
371                        nb = nb_buffers;
372                }
373
374                for (;i < nb; ++i) {
375                        values[i] = (*oc->alloc)();
376                        assert(values[i]);
377                }
378                assert (i == nb);
379                // Still got to wait for more
380                if (nb < min_nb_buffers) {
381                        if (lc)
382                                i += libtrace_ocache_alloc_cache(oc, &values[nb], nb_buffers - nb, min_nb_buffers - nb, lc);
383                        else
384                                i += libtrace_ringbuffer_sread_bulk(&oc->rb, &values[nb], nb_buffers - nb, min_nb_buffers - nb);
385                }
386        }
387        assert(i >= min_nb_buffers);
388        return i;
389}
390
391
392static inline size_t libtrace_ocache_free_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
393                                                                                        struct local_cache *lc) {
394        libtrace_ringbuffer_t *rb = &oc->rb;
395        size_t i;
396
397        // We have enough cached!! Yay
398        if (nb_buffers <= lc->total - lc->used) {
399                // Copy all to the cache
400                memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers);
401                lc->used += nb_buffers;
402#ifdef ENABLE_MEM_STATS
403                mem_hits.write.cache_hit += nb_buffers;
404                mem_hits.writebulk.cache_hit += 1;
405#endif
406                return nb_buffers;
407        }
408        // Cache is not big enough try write all to the ringbuffer
409        else if (nb_buffers > lc->total) {
410                i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers);
411#ifdef ENABLE_MEM_STATS
412                if (i)
413                        mem_hits.writebulk.ring_hit += 1;
414                else
415                        mem_hits.writebulk.miss += 1;
416                mem_hits.write.ring_hit += i;
417#endif
418        } else { // Not enough cache space but there might later
419                // Fill the cache and empty it and then see what we're left with
420                i = (lc->total - lc->used);
421                memcpy(&lc->cache[lc->used], values, sizeof(void *) * i);
422#ifdef ENABLE_MEM_STATS
423                mem_hits.write.cache_hit += i;
424#endif
425
426                // Make sure we still meet the minimum requirement
427                if (i < min_nb_buffers)
428                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
429                else
430                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, 0);
431
432                // Re originise fulls to the front
433                if (lc->used)
434                        memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used);
435
436#ifdef ENABLE_MEM_STATS
437                if (lc->used)
438                        mem_hits.writebulk.miss += 1;
439                else
440                        mem_hits.writebulk.ring_hit += 1;
441                mem_hits.write.ring_hit += lc->total - lc->used;
442#endif
443        }
444
445        // Try empty the remaining
446        if (i < nb_buffers && lc->used != lc->total) {
447                size_t remaining;
448                remaining = MIN(lc->total - lc->used, nb_buffers - i);
449                memcpy(&lc->cache[lc->used], &values[i], sizeof(void *) * remaining);
450                lc->used += remaining;
451                i += remaining;
452        }
453#ifdef ENABLE_MEM_STATS
454        mem_hits.write.miss += nb_buffers - i;
455#endif
456        return i;
457}
458
459DLLEXPORT size_t libtrace_ocache_free(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
460        struct local_cache *lc = find_cache(oc);
461        size_t i;
462        size_t min;
463
464        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
465        min = oc->max_allocations ? min_nb_buffers : 0;
466        if (lc)
467                i = libtrace_ocache_free_cache(oc, values, nb_buffers, min, lc);
468        else
469                i = libtrace_ringbuffer_swrite_bulk(&oc->rb, values, nb_buffers, min);
470
471        if (!oc->max_allocations) {
472                // Free these normally
473                for (;i < min_nb_buffers; ++i) {
474                        oc->free(values[i]);
475                }
476        }
477        return i;
478}
479
480DLLEXPORT void libtrace_zero_ocache(libtrace_ocache_t *oc) {
481        libtrace_zero_ringbuffer(&oc->rb);
482        oc->thread_cache_size = 0;
483        oc->alloc = NULL;
484        oc->free = NULL;
485        oc->current_allocations = 0;
486        oc->max_allocations = 0;
487        oc->nb_thread_list = 0;
488        oc->max_nb_thread_list = 0;
489        oc->thread_list = NULL;
490}
491
492/**
493 * @brief ocache_unregister_thread removes a thread from an ocache.
494 * @param The ocache to remove this thread, this will free any packets in the TLS cache
495 */
496DLLEXPORT void libtrace_ocache_unregister_thread(libtrace_ocache_t *oc) {
497        size_t i;
498        struct local_caches *lcs = get_local_caches();
499        struct local_cache *lc = find_cache(oc);
500
501        if (lc) {
502                for (i = 0; i < lcs->t_mem_caches_used; ++i) {
503                        if (&lcs->t_mem_caches[i] == lc) {
504                                // Free the cache against the ocache
505                                unregister_thread(&lcs->t_mem_caches[i]);
506                                free(lcs->t_mem_caches[i].cache);
507                                // And remove it from the thread itself
508                                --lcs->t_mem_caches_used;
509                                lcs->t_mem_caches[i] = lcs->t_mem_caches[lcs->t_mem_caches_used];
510                                memset(&lcs->t_mem_caches[lcs->t_mem_caches_used], 0, sizeof(struct local_cache));
511                        }
512                }
513        }
514}
Note: See TracBrowser for help on using the repository browser.