source: lib/data-struct/object_cache.c @ b6ff245

develop
Last change on this file since b6ff245 was b6ff245, checked in by Jacob Van Walraven <jcv9@…>, 2 years ago

cleanup

  • Property mode set to 100644
File size: 18.0 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26#include "config.h"
27#include "object_cache.h"
28#include <assert.h>
29#include <stdio.h>
30#include <stdlib.h>
31#include <string.h>
32
33
34// pthread tls is most likely slower than __thread, but they have destructors so
35// we use a combination of the two here!!
36// Note Apples implementation of TLS means that memory is not available / has
37// been zeroed by the time the pthread destructor is called.
38struct local_cache {
39        libtrace_ocache_t *oc;
40        size_t total;
41        size_t used;
42        void **cache;
43        bool invalid;
44};
45
46struct mem_stats {
47        struct memfail {
48           uint64_t cache_hit;
49           uint64_t ring_hit;
50           uint64_t miss;
51           uint64_t recycled;
52        } readbulk, read, write, writebulk;
53};
54
55#ifdef ENABLE_MEM_STATS
56extern __thread struct mem_stats mem_hits;
57#endif
58
59struct local_caches {
60        size_t t_mem_caches_used;
61        size_t t_mem_caches_total;
62        struct local_cache *t_mem_caches;
63};
64
65static pthread_key_t memory_destructor_key;
66static pthread_once_t memory_destructor_once = PTHREAD_ONCE_INIT;
67static inline struct local_caches *get_local_caches();
68
69/**
70 * @brief unregister_thread assumes we DONT hold spin
71 */
72static inline void unregister_thread(struct local_cache *lc) {
73        size_t i;
74        if (lc->invalid)
75                fprintf(stderr, "Already free'd the thread cache!!\n");
76        pthread_spin_lock(&lc->oc->spin);
77        // Remove it from our thread list
78        for (i=0; i < lc->oc->nb_thread_list; ++i) {
79                if (lc->oc->thread_list[i] == lc) {
80                        --lc->oc->nb_thread_list;
81                        lc->oc->thread_list[i] = lc->oc->thread_list[lc->oc->nb_thread_list];
82                        lc->oc->thread_list[lc->oc->nb_thread_list] = NULL;
83                        i = ~0U;
84                        break;
85                }
86        }
87        if (i != ~0U) {
88                fprintf(stderr, "Attempted to unregistered a thread with an"
89                         " ocache that had never registered this thread. Ignoring.\n");
90                pthread_spin_unlock(&lc->oc->spin);
91                return;
92        }
93        lc->invalid = true;
94
95        if (lc->oc->max_allocations) {
96                libtrace_ringbuffer_swrite_bulk(&lc->oc->rb, lc->cache, lc->used, lc->used);
97        } else {
98                size_t i;
99                // We just run the free these
100                for(i = 0; i < lc->used; ++i) {
101                        lc->oc->free(lc->cache[i]);
102                }
103        }
104        pthread_spin_unlock(&lc->oc->spin);
105}
106
107/**
108 * @brief register_thread assumes we DONT hold spin
109 */
110static inline void register_thread(libtrace_ocache_t *oc, struct local_cache *lc) {
111        lc->invalid = false;
112        pthread_spin_lock(&oc->spin);
113        if (oc->nb_thread_list == oc->max_nb_thread_list) {
114                oc->max_nb_thread_list += 0x10;
115                oc->thread_list = realloc(oc->thread_list, sizeof(void*) * oc->max_nb_thread_list);
116        }
117        oc->thread_list[oc->nb_thread_list] = lc;
118        ++oc->nb_thread_list;
119        pthread_spin_unlock(&oc->spin);
120}
121
122static void destroy_memory_caches(void *tlsaddr) {
123        size_t a;
124        struct local_caches *lcs = tlsaddr;
125
126        for (a = 0; a < lcs->t_mem_caches_used; ++a) {
127                unregister_thread(&lcs->t_mem_caches[a]);
128                // Write these all back to the main buffer, this might have issues we would want to free these
129                free(lcs->t_mem_caches[a].cache);
130        }
131        free(lcs->t_mem_caches);
132        lcs->t_mem_caches = NULL;
133        free(lcs);
134
135}
136
137static void once_memory_cache_key_init() {
138        ASSERT_RET(pthread_key_create(&memory_destructor_key, &destroy_memory_caches), == 0);
139}
140
141/**
142 * Adds more space to our mem_caches
143 */
144static void resize_memory_caches(struct local_caches *lcs) {
145        /*assert (lcs->t_mem_caches_total > 0);*/
146        if (lcs->t_mem_caches_total <= 0) {
147                fprintf(stderr, "Expected lcs->t_mem_caches_total to be greater or equal to 0 in resize_memory_caches()\n");
148                return;
149        }
150        lcs->t_mem_caches += 0x10;
151        lcs->t_mem_caches = realloc(lcs->t_mem_caches,
152                                    lcs->t_mem_caches_total * sizeof(struct local_cache));
153}
154
155/* Get TLS for the list of local_caches */
156static inline struct local_caches *get_local_caches() {
157#if HAVE_TLS
158        static __thread struct local_caches *lcs = NULL;
159        if (lcs) {
160                return lcs;
161        }
162#else
163        struct local_caches *lcs;
164        pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
165        if ((lcs=pthread_getspecific(memory_destructor_key)) != 0) {
166                return lcs;
167        }
168#endif
169        else {
170                /* This thread has not been used with a memory pool before */
171                /* Allocate our TLS */
172                /*assert(lcs == NULL);*/
173                if (lcs) {
174                        fprintf(stderr, "Expected lcs to be NULL in get_local_caches()\n");
175                        return NULL;
176                }
177                lcs = calloc(1, sizeof (struct local_caches));
178                /*assert(lcs);*/
179                if (!lcs) {
180                        fprintf(stderr, "Unable to allocate memory for lcs in get_local_caches()\n");
181                        return NULL;
182                }
183                /* Hook into pthreads to destroy this when the thread ends */
184                pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
185                pthread_setspecific(memory_destructor_key, (void *) lcs);
186                lcs->t_mem_caches_total = 0x10;
187                lcs->t_mem_caches = calloc(0x10, sizeof(struct local_cache));
188                /*assert(lcs);*/
189                /*assert(lcs->t_mem_caches);*/
190                if (!lcs->t_mem_caches) {
191                        fprintf(stderr, "Unable to allocate memory for lcs->t_mem_caches in get_local_cahces()\n");
192                        return NULL;
193                }
194                return lcs;
195        }
196}
197
198static inline struct local_cache * find_cache(libtrace_ocache_t *oc) {
199        size_t i;
200        struct local_cache *lc = NULL;
201        struct local_caches *lcs = get_local_caches();
202
203        for (i = 0; i < lcs->t_mem_caches_used; ++i) {
204                if (lcs->t_mem_caches[i].oc == oc) {
205                        lc = &lcs->t_mem_caches[i];
206                        break;
207                }
208        }
209
210        if (!oc->thread_cache_size)
211                return 0;
212
213        // Create a cache
214        if (!lc) {
215                if (lcs->t_mem_caches_used == lcs->t_mem_caches_total)
216                        resize_memory_caches(lcs);
217                lcs->t_mem_caches[lcs->t_mem_caches_used].oc = oc;
218                lcs->t_mem_caches[lcs->t_mem_caches_used].used = 0;
219                lcs->t_mem_caches[lcs->t_mem_caches_used].total = oc->thread_cache_size;
220                lcs->t_mem_caches[lcs->t_mem_caches_used].cache = malloc(sizeof(void*) * oc->thread_cache_size);
221                lcs->t_mem_caches[lcs->t_mem_caches_used].invalid = false;
222                lc = &lcs->t_mem_caches[lcs->t_mem_caches_used];
223                // Register it with the underlying ring_buffer
224                register_thread(lc->oc, lc);
225                ++lcs->t_mem_caches_used;
226        }
227
228        /*assert(!lc->invalid);*/
229        if (lc->invalid) {
230                fprintf(stderr, "lc cache is invalid in find_cache()\n");
231                return NULL;
232        }
233        return lc;
234}
235
236/**
237  * Creates a object cache, that is a pool of dynamically allocated and recycled
238  * objects of a fixed size. This should be faster than malloc and free.
239  * The alloc and free methods are supplied by the user and are used when no
240  * recycled objects are available, or to tidy the final results.
241  *
242  * The performance of these pools will decrease if thread caches are used
243  * as this results in a list to lookup per thread. The pool is added when
244  * to this list when first encountered, these persist untill the thread exits.
245  *
246  * NOTE: If limit_size is true do not attempt to 'free' any objects that were
247  * not created by this pool back otherwise the 'free' might deadlock. Also
248  * be cautious when picking the buffer size, upto thread_cache_size*(threads-1)
249  * could be unusable at any given time if these are stuck in thread local caches.
250  *
251  * @param oc A pointer to the object cache structure which is to be initialised.
252  * @param alloc The allocation method, must not be NULL. [void *alloc()]
253  * @param free The free method used to destroy packets. [void free(void * obj)]
254  * @param thread_cache_size A small cache kept on a per thread basis, this can be 0
255  *             however should only be done if bulk reads of packets are being performed
256  *             or contention is minimal.
257  * @param buffer_size The number of packets to be stored in the main buffer.
258  * @param limit_size If true no more objects than buffer_size will be allocated,
259  *             reads will block (free never should).Otherwise packets can be freely
260  *     allocated upon requested and are free'd if there is not enough space for them.
261  * @return If successful returns 0 otherwise -1.
262  */
263DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void),
264                                    void (*free)(void *),
265                                    size_t thread_cache_size,
266                                    size_t buffer_size, bool limit_size) {
267
268        /*assert(buffer_size);*/
269        if (!buffer_size) {
270                fprintf(stderr, "NULL buffer_size passed into libtrace_ocache_init()\n");
271                return -1;
272        }
273        /*assert(alloc);*/
274        if (!alloc) {
275                fprintf(stderr, "NULL alloc passed into libtrace_ocache_init()\n");
276                return -1;
277        }
278        /*assert(free);*/
279        if (!free) {
280                fprintf(stderr, "NULL free method passed into libtrace_ocache_init()\n");
281                return -1;
282        }
283        if (libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING) != 0) {
284                return -1;
285        }
286        oc->alloc = alloc;
287        oc->free = free;
288        oc->current_allocations = 0;
289        oc->thread_cache_size = thread_cache_size;
290        oc->nb_thread_list = 0;
291        oc->max_nb_thread_list = 0x10;
292        oc->thread_list = calloc(0x10, sizeof(void*));
293        if (oc->thread_list == NULL) {
294                libtrace_ringbuffer_destroy(&oc->rb);
295                return -1;
296        }
297        pthread_spin_init(&oc->spin, 0);
298        if (limit_size)
299                oc->max_allocations = buffer_size;
300        else
301                oc->max_allocations = 0;
302        return 0;
303}
304
305/**
306  * Destroys the object cache. Call this only once all memory has
307  * been free'd back and no more accesses will be made.
308  *
309  * @return Returns the number of packets outstanding, or extra object recevied
310  *             Ideally this should be zero (0) otherwise some form of memory leak
311  *             is likely present. Currenty only implemented in the case limit_size
312  *     is true.
313  */
314DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc) {
315        void *ele;
316
317        while (oc->nb_thread_list)
318                unregister_thread(oc->thread_list[0]);
319
320        pthread_spin_lock(&oc->spin);
321        while (libtrace_ringbuffer_try_read(&oc->rb, &ele)) {
322                oc->free(ele);
323                if (oc->max_allocations)
324                        --oc->current_allocations;
325        }
326        pthread_spin_unlock(&oc->spin);
327
328        if (oc->current_allocations)
329                fprintf(stderr, "OCache destroyed, leaking %d packets!!\n", (int) oc->current_allocations);
330
331        libtrace_ringbuffer_destroy(&oc->rb);
332        pthread_spin_destroy(&oc->spin);
333        free(oc->thread_list);
334        libtrace_zero_ocache(oc);
335        if (oc->current_allocations)
336                return (int) oc->current_allocations;
337        else
338                return 0;
339}
340
341static inline size_t libtrace_ocache_alloc_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
342                                                                                 struct local_cache *lc) {
343        libtrace_ringbuffer_t *rb = &oc->rb;
344        size_t i;
345
346        // We have enough cached!! Yay
347        if (nb_buffers <= lc->used) {
348                // Copy all from cache
349                memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers);
350                lc->used -= nb_buffers;
351#ifdef ENABLE_MEM_STATS
352                mem_hits.read.cache_hit += nb_buffers;
353                mem_hits.readbulk.cache_hit += 1;
354#endif
355                return nb_buffers;
356        }
357        // Cache is not big enough try read all from ringbuffer
358        else if (nb_buffers > lc->total) {
359                i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers);
360#ifdef ENABLE_MEM_STATS
361                if (i)
362                        mem_hits.readbulk.ring_hit += 1;
363                else
364                        mem_hits.readbulk.miss += 1;
365                mem_hits.read.ring_hit += i;
366#endif
367        } else { // Not enough cached
368                // Empty the cache and re-fill it and then see what we're left with
369                i = lc->used;
370                memcpy(values, lc->cache, sizeof(void *) * lc->used);
371#ifdef ENABLE_MEM_STATS
372                mem_hits.read.cache_hit += i;
373#endif
374
375                // Make sure we still meet the minimum requirement
376                if (i < min_nb_buffers)
377                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
378                else
379                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0);
380#ifdef ENABLE_MEM_STATS
381                if (lc->used == lc->total)
382                        mem_hits.readbulk.ring_hit += 1;
383                else
384                        mem_hits.readbulk.miss += 1;
385                mem_hits.read.ring_hit += lc->used;
386#endif
387        }
388
389        // Try fill the remaining
390        if (i < nb_buffers && lc->used) {
391                size_t remaining;
392                remaining = MIN(lc->used, nb_buffers - i);
393                memcpy(&values[i], &lc->cache[lc->used - remaining], sizeof(void *) * remaining);
394                lc->used -= remaining;
395                i += remaining;
396        }
397#ifdef ENABLE_MEM_STATS
398        mem_hits.read.miss += nb_buffers - i;
399#endif
400        /*assert(i >= min_nb_buffers);*/
401        if (i < min_nb_buffers) {
402                fprintf(stderr, "Unable to fill remaining cache in libtrace_ocache_alloc_cache()\n");
403                return ~0U;
404        }
405        return i;
406}
407
408DLLEXPORT size_t libtrace_ocache_alloc(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
409        struct local_cache *lc = find_cache(oc);
410        size_t i;
411        size_t min;
412        bool try_alloc = !(oc->max_allocations && oc->max_allocations <= oc->current_allocations);
413
414        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
415        min = try_alloc ? 0: min_nb_buffers;
416        if (lc)
417                i = libtrace_ocache_alloc_cache(oc, values, nb_buffers, min,  lc);
418        else
419                i = libtrace_ringbuffer_sread_bulk(&oc->rb, values, nb_buffers, min);
420
421        if (try_alloc) {
422                size_t nb;
423
424                // Try alloc the rest
425                if (oc->max_allocations) {
426                        pthread_spin_lock(&oc->spin);
427                        nb = MIN(oc->max_allocations - oc->current_allocations, nb_buffers - i);
428                        oc->current_allocations += nb;
429                        pthread_spin_unlock(&oc->spin);
430                        nb += i;
431                } else {
432                        nb = nb_buffers;
433                }
434
435                for (;i < nb; ++i) {
436                        values[i] = (*oc->alloc)();
437                        /*assert(values[i]);*/
438                        if (!values[i]) {
439                                fprintf(stderr, "Unable to alloc memory for values[%zu] in libtrace_ocache_alloc()\n", i);
440                                return ~0U;
441                        }
442                }
443                /*assert (i == nb);*/
444                if (i != nb) {
445                        fprintf(stderr, "Expected i == nb in libtrace_ocache_alloc()\n");
446                        return ~0U;
447                }
448                // Still got to wait for more
449                if (nb < min_nb_buffers) {
450                        if (lc)
451                                i += libtrace_ocache_alloc_cache(oc, &values[nb], nb_buffers - nb, min_nb_buffers - nb, lc);
452                        else
453                                i += libtrace_ringbuffer_sread_bulk(&oc->rb, &values[nb], nb_buffers - nb, min_nb_buffers - nb);
454                }
455        }
456        /*assert(i >= min_nb_buffers);*/
457        if (i < min_nb_buffers) {
458                fprintf(stderr, "Expected min_nb_buffers to be equal or less than i in libtrace_ocache_alloc()\n");
459                return ~0U;
460        }
461        return i;
462}
463
464
465static inline size_t libtrace_ocache_free_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
466                                                                                        struct local_cache *lc) {
467        libtrace_ringbuffer_t *rb = &oc->rb;
468        size_t i;
469
470        // We have enough cached!! Yay
471        if (nb_buffers <= lc->total - lc->used) {
472                // Copy all to the cache
473                memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers);
474                lc->used += nb_buffers;
475#ifdef ENABLE_MEM_STATS
476                mem_hits.write.cache_hit += nb_buffers;
477                mem_hits.writebulk.cache_hit += 1;
478#endif
479                return nb_buffers;
480        }
481        // Cache is not big enough try write all to the ringbuffer
482        else if (nb_buffers > lc->total) {
483                i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers);
484#ifdef ENABLE_MEM_STATS
485                if (i)
486                        mem_hits.writebulk.ring_hit += 1;
487                else
488                        mem_hits.writebulk.miss += 1;
489                mem_hits.write.ring_hit += i;
490#endif
491        } else { // Not enough cache space but there might later
492                // Fill the cache and empty it and then see what we're left with
493                i = (lc->total - lc->used);
494                memcpy(&lc->cache[lc->used], values, sizeof(void *) * i);
495#ifdef ENABLE_MEM_STATS
496                mem_hits.write.cache_hit += i;
497#endif
498
499                // Make sure we still meet the minimum requirement
500                if (i < min_nb_buffers)
501                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
502                else
503                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, 0);
504
505                // Re originise fulls to the front
506                if (lc->used)
507                        memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used);
508
509#ifdef ENABLE_MEM_STATS
510                if (lc->used)
511                        mem_hits.writebulk.miss += 1;
512                else
513                        mem_hits.writebulk.ring_hit += 1;
514                mem_hits.write.ring_hit += lc->total - lc->used;
515#endif
516        }
517
518        // Try empty the remaining
519        if (i < nb_buffers && lc->used != lc->total) {
520                size_t remaining;
521                remaining = MIN(lc->total - lc->used, nb_buffers - i);
522                memcpy(&lc->cache[lc->used], &values[i], sizeof(void *) * remaining);
523                lc->used += remaining;
524                i += remaining;
525        }
526#ifdef ENABLE_MEM_STATS
527        mem_hits.write.miss += nb_buffers - i;
528#endif
529        return i;
530}
531
532DLLEXPORT size_t libtrace_ocache_free(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
533        struct local_cache *lc = find_cache(oc);
534        size_t i;
535        size_t min;
536
537        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
538        min = oc->max_allocations ? min_nb_buffers : 0;
539        if (lc)
540                i = libtrace_ocache_free_cache(oc, values, nb_buffers, min, lc);
541        else
542                i = libtrace_ringbuffer_swrite_bulk(&oc->rb, values, nb_buffers, min);
543
544        if (!oc->max_allocations) {
545                // Free these normally
546                for (;i < min_nb_buffers; ++i) {
547                        oc->free(values[i]);
548                }
549        }
550        return i;
551}
552
553DLLEXPORT void libtrace_zero_ocache(libtrace_ocache_t *oc) {
554        libtrace_zero_ringbuffer(&oc->rb);
555        oc->thread_cache_size = 0;
556        oc->alloc = NULL;
557        oc->free = NULL;
558        oc->current_allocations = 0;
559        oc->max_allocations = 0;
560        oc->nb_thread_list = 0;
561        oc->max_nb_thread_list = 0;
562        oc->thread_list = NULL;
563}
564
565/**
566 * @brief ocache_unregister_thread removes a thread from an ocache.
567 * @param The ocache to remove this thread, this will free any packets in the TLS cache
568 */
569DLLEXPORT void libtrace_ocache_unregister_thread(libtrace_ocache_t *oc) {
570        size_t i;
571        struct local_caches *lcs = get_local_caches();
572        struct local_cache *lc = find_cache(oc);
573
574        if (lc) {
575                for (i = 0; i < lcs->t_mem_caches_used; ++i) {
576                        if (&lcs->t_mem_caches[i] == lc) {
577                                // Free the cache against the ocache
578                                unregister_thread(&lcs->t_mem_caches[i]);
579                                free(lcs->t_mem_caches[i].cache);
580                                // And remove it from the thread itself
581                                --lcs->t_mem_caches_used;
582                                lcs->t_mem_caches[i] = lcs->t_mem_caches[lcs->t_mem_caches_used];
583                                memset(&lcs->t_mem_caches[lcs->t_mem_caches_used], 0, sizeof(struct local_cache));
584                        }
585                }
586        }
587}
Note: See TracBrowser for help on using the repository browser.