source: lib/data-struct/object_cache.c @ 78911b5

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 78911b5 was 78911b5, checked in by Dan Collins <djc44@…>, 7 years ago

Fixed the debug output to be more clear.
trace_parallel:

  • Changed pausing -> resuming (that never made sense before!)
  • Also removed some large debug output, but that can be reenabled easily

object_cache:

  • There is still a bug, but I added new lines to make it look nicer
  • Property mode set to 100644
File size: 13.4 KB
Line 
1
2#include "object_cache.h"
3#include <assert.h>
4#include <stdio.h>
5#include <stdlib.h>
6#include <string.h>
7
8
9// pthread tls is most likely slower than __thread, but they have destructors so
10// we use a combination of the two here!!
11struct local_cache {
12        libtrace_ocache_t *oc;
13        size_t total;
14        size_t used;
15        void **cache;
16        bool invalid;
17};
18
19struct mem_stats {
20        struct memfail {
21           uint64_t cache_hit;
22           uint64_t ring_hit;
23           uint64_t miss;
24           uint64_t recycled;
25        } readbulk, read, write, writebulk;
26};
27
28extern __thread struct mem_stats mem_hits;
29static __thread size_t t_mem_caches_used = 0;
30static __thread size_t t_mem_caches_total = 0;
31static __thread struct local_cache *t_mem_caches = NULL;
32static pthread_key_t memory_destructor_key;
33static pthread_once_t memory_destructor_once = PTHREAD_ONCE_INIT;
34
35/**
36 * @brief free_array assumes we DONT hold spin
37 */
38static inline void unregister_thread(struct local_cache *lc) {
39        size_t i;
40        if (lc->invalid)
41                fprintf(stderr, "Already free'd the thread cache!!\n");
42        pthread_spin_lock(&lc->oc->spin);
43        // Remove it from our thread list
44        for (i=0; i < lc->oc->nb_thread_list; ++i) {
45                if (lc->oc->thread_list[i] == lc) {
46                        --lc->oc->nb_thread_list;
47                        lc->oc->thread_list[i] = lc->oc->thread_list[lc->oc->nb_thread_list];
48                        lc->oc->thread_list[lc->oc->nb_thread_list] = NULL;
49                        i = ~0U;
50                        break;
51                }
52        }
53        if (i != ~0U) {
54                fprintf(stderr, "Umm this wasn't registered with us in the first place!!!!IGNORGING!!!!ANGRY\n");
55                pthread_spin_unlock(&lc->oc->spin);
56                return;
57        }
58        lc->invalid = true;
59
60        if (lc->oc->max_allocations) {
61                libtrace_ringbuffer_swrite_bulk(&lc->oc->rb, lc->cache, lc->used, lc->used);
62        } else {
63                size_t i;
64                // We just run the free these
65                for(i = 0; i < lc->used; ++i) {
66                        lc->oc->free(lc->cache[i]);
67                }
68        }
69        pthread_spin_unlock(&lc->oc->spin);
70}
71
72/**
73 * @brief free_array assumes we hold spin!!!
74 */
75static inline void register_thread(libtrace_ocache_t *oc, struct local_cache *lc) {
76        lc->invalid = false;
77        pthread_spin_lock(&oc->spin);
78        if (oc->nb_thread_list == oc->max_nb_thread_list) {
79                oc->max_nb_thread_list += 0x10;
80                oc->thread_list = realloc(oc->thread_list, sizeof(void*) * oc->max_nb_thread_list);
81        }
82        oc->thread_list[oc->nb_thread_list] = lc;
83        ++oc->nb_thread_list;
84        pthread_spin_unlock(&oc->spin);
85}
86
87static void destroy_memory_cache(void *tlsaddr) {
88        assert(tlsaddr == t_mem_caches);
89        size_t a;
90
91        for (a = 0; a < t_mem_caches_used; ++a) {
92                unregister_thread(&t_mem_caches[a]);
93                // Write these all back to the main buffer, this might have issues we would want to free these
94                free(t_mem_caches[a].cache);
95        }
96        free(t_mem_caches);
97        t_mem_caches = NULL;
98}
99
100static void once_memory_cache_key_init() {
101        ASSERT_RET(pthread_key_create(&memory_destructor_key, &destroy_memory_cache), == 0);
102}
103
104/**
105 * Adds more space to our mem_caches
106 */
107static void resize_memory_caches() {
108        if (t_mem_caches == NULL) {
109                pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
110                t_mem_caches_total = 0x10;
111                t_mem_caches = calloc(0x10, sizeof(struct local_cache));
112                pthread_setspecific(memory_destructor_key, (void *) t_mem_caches);
113        } else {
114                t_mem_caches += 0x10;
115                t_mem_caches = realloc(t_mem_caches, t_mem_caches_total * sizeof(struct local_cache));
116                pthread_setspecific(memory_destructor_key, t_mem_caches);
117        }
118}
119
120static inline struct local_cache * find_cache(libtrace_ocache_t *oc) {
121        struct local_cache *lc = NULL;
122        size_t i;
123
124        for (i = 0; i < t_mem_caches_used; ++i) {
125                if (t_mem_caches[i].oc == oc) {
126                        lc = &t_mem_caches[i];
127                        break;
128                }
129        }
130
131        if (!oc->thread_cache_size)
132                return 0;
133
134        // Create a cache
135        if (!lc) {
136                if (t_mem_caches_used == t_mem_caches_total)
137                        resize_memory_caches();
138                t_mem_caches[t_mem_caches_used].oc = oc;
139                t_mem_caches[t_mem_caches_used].used = 0;
140                t_mem_caches[t_mem_caches_used].total = oc->thread_cache_size;
141                t_mem_caches[t_mem_caches_used].cache = malloc(sizeof(void*) * oc->thread_cache_size);
142                t_mem_caches[t_mem_caches_used].invalid = false;
143                lc = &t_mem_caches[t_mem_caches_used];
144                // Register it with the underlying ring_buffer
145                register_thread(lc->oc, lc);
146                ++t_mem_caches_used;
147        }
148
149        assert(!lc->invalid);
150        return lc;
151}
152
153/**
154  * Creates a object cache, that is a pool of dynamically allocated and recycled
155  * objects of a fixed size. This should be faster than malloc and free.
156  * The alloc and free methods are supplied by the user and are used when no
157  * recycled objects are available, or to tidy the final results.
158  *
159  * The performance of these pools will decrease if thread caches are used
160  * as this results in a list to lookup per thread. The pool is added when
161  * to this list when first encountered, these persist untill the thread exits.
162  *
163  * NOTE: If limit_size is true do not attempt to 'free' any objects that were
164  * not created by this pool back otherwise the 'free' might deadlock. Also
165  * be cautious when picking the buffer size, upto thread_cache_size*(threads-1)
166  * could be unusable at any given time if these are stuck in thread local caches.
167  *
168  * @param oc A pointer to the object cache structure which is to be initialised.
169  * @param alloc The allocation method, must not be NULL. [void *alloc()]
170  * @param free The free method used to destroy packets. [void free(void * obj)]
171  * @param thread_cache_size A small cache kept on a per thread basis, this can be 0
172  *             however should only be done if bulk reads of packets are being performed
173  *             or contention is minimal.
174  * @param buffer_size The number of packets to be stored in the main buffer.
175  * @param limit_size If true no more objects than buffer_size will be allocated,
176  *             reads will block (free never should).Otherwise packets can be freely
177  *     allocated upon requested and are free'd if there is not enough space for them.
178  * @return Returns The number of packets outstanding, or extra object recevied
179  *             Ideally this should be zero (0) otherwise some form of memory leak
180  *             is likely present.
181  */
182DLLEXPORT void libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void), void (*free)(void *),
183                                                                          size_t thread_cache_size, size_t buffer_size, bool limit_size) {
184
185        assert(buffer_size);
186        assert(alloc);
187        assert(free);
188        libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
189        oc->alloc = alloc;
190        oc->free = free;
191        oc->current_allocations = 0;
192        oc->thread_cache_size = thread_cache_size;
193        oc->nb_thread_list = 0;
194        oc->max_nb_thread_list = 0x10;
195        oc->thread_list = calloc(0x10, sizeof(void*));
196        pthread_spin_init(&oc->spin, 0);
197        if (limit_size)
198                oc->max_allocations = buffer_size;
199        else
200                oc->max_allocations = 0;
201}
202
203/**
204  * Destroys the object cache. Call this only once all memory has
205  * been free'd back and no more accesses will be made.
206  *
207  * @return Returns the number of packets outstanding, or extra object recevied
208  *             Ideally this should be zero (0) otherwise some form of memory leak
209  *             is likely present. Currenty only implemented in the case limit_size
210  *     is true.
211  */
212DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc) {
213        void *ele;
214
215        while (oc->nb_thread_list)
216                unregister_thread(oc->thread_list[0]);
217
218        pthread_spin_lock(&oc->spin);
219        while (libtrace_ringbuffer_try_read(&oc->rb, &ele)) {
220                oc->free(ele);
221                if (oc->max_allocations)
222                        --oc->current_allocations;
223        }
224        pthread_spin_unlock(&oc->spin);
225
226        // Make sure we haven't lost too many packets
227        if (oc->current_allocations)
228                fprintf(stderr, "!!OCache closing lost, %d packets!!\n", (int) oc->current_allocations);
229        else
230                /* This is clearly a bug, but I don't know what to replace it with... */
231                fprintf(stderr, "!!OCache closing lost, %d packets!!\n", (int) oc->current_allocations);
232        libtrace_ringbuffer_destroy(&oc->rb);
233        pthread_spin_destroy(&oc->spin);
234        free(oc->thread_list);
235        libtrace_zero_ocache(oc);
236        if (oc->current_allocations)
237                return (int) oc->current_allocations;
238        else
239                return 0;
240}
241
242static inline size_t libtrace_ocache_alloc_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
243                                                                                 struct local_cache *lc) {
244        libtrace_ringbuffer_t *rb = &oc->rb;
245        size_t i;
246
247        // We have enough cached!! Yay
248        if (nb_buffers <= lc->used) {
249                // Copy all from cache
250                memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers);
251                lc->used -= nb_buffers;
252                mem_hits.read.cache_hit += nb_buffers;
253                mem_hits.readbulk.cache_hit += 1;
254                return nb_buffers;
255        }
256        // Cache is not big enough try read all from ringbuffer
257        else if (nb_buffers > lc->total) {
258                i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers);
259                if (i)
260                        mem_hits.readbulk.ring_hit += 1;
261                else
262                        mem_hits.readbulk.miss += 1;
263                mem_hits.read.ring_hit += i;
264        } else { // Not enough cached
265                // Empty the cache and re-fill it and then see what we're left with
266                i = lc->used;
267                memcpy(values, lc->cache, sizeof(void *) * lc->used);
268                mem_hits.read.cache_hit += i;
269
270                // Make sure we still meet the minimum requirement
271                if (i < min_nb_buffers)
272                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
273                else
274                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0);
275
276                if (lc->used == lc->total)
277                        mem_hits.readbulk.ring_hit += 1;
278                else
279                        mem_hits.readbulk.miss += 1;
280                mem_hits.read.ring_hit += lc->used;
281        }
282
283        // Try fill the remaining
284        if (i < nb_buffers && lc->used) {
285                size_t remaining;
286                remaining = MIN(lc->used, nb_buffers - i);
287                memcpy(&values[i], &lc->cache[lc->used - remaining], sizeof(void *) * remaining);
288                lc->used -= remaining;
289                i += remaining;
290        }
291        mem_hits.read.miss += nb_buffers - i;
292        assert(i >= min_nb_buffers);
293        return i;
294}
295
296DLLEXPORT size_t libtrace_ocache_alloc(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
297        struct local_cache *lc = find_cache(oc);
298        size_t i;
299        size_t min;
300        bool try_alloc = !(oc->max_allocations && oc->max_allocations <= oc->current_allocations);
301
302        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
303        min = try_alloc ? 0: min_nb_buffers;
304        if (lc)
305                i = libtrace_ocache_alloc_cache(oc, values, nb_buffers, min,  lc);
306        else
307                i = libtrace_ringbuffer_sread_bulk(&oc->rb, values, nb_buffers, min);
308
309        if (try_alloc) {
310                size_t nb;
311
312                // Try alloc the rest
313                if (oc->max_allocations) {
314                        pthread_spin_lock(&oc->spin);
315                        nb = MIN(oc->max_allocations - oc->current_allocations, nb_buffers - i);
316                        oc->current_allocations += nb;
317                        pthread_spin_unlock(&oc->spin);
318                        nb += i;
319                } else {
320                        nb = nb_buffers;
321                }
322
323                for (;i < nb; ++i) {
324                        values[i] = (*oc->alloc)();
325                        assert(values[i]);
326                }
327                assert (i == nb);
328                // Still got to wait for more
329                if (nb < min_nb_buffers) {
330                        if (lc)
331                                i += libtrace_ocache_alloc_cache(oc, &values[nb], nb_buffers - nb, min_nb_buffers - nb, lc);
332                        else
333                                i += libtrace_ringbuffer_sread_bulk(&oc->rb, &values[nb], nb_buffers - nb, min_nb_buffers - nb);
334                }
335        }
336        assert(i >= min_nb_buffers);
337        return i;
338}
339
340
341static inline size_t libtrace_ocache_free_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
342                                                                                        struct local_cache *lc) {
343        libtrace_ringbuffer_t *rb = &oc->rb;
344        size_t i;
345
346        // We have enough cached!! Yay
347        if (nb_buffers <= lc->total - lc->used) {
348                // Copy all to the cache
349                memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers);
350                lc->used += nb_buffers;
351                mem_hits.write.cache_hit += nb_buffers;
352                mem_hits.writebulk.cache_hit += 1;
353                return nb_buffers;
354        }
355        // Cache is not big enough try write all to the ringbuffer
356        else if (nb_buffers > lc->total) {
357                i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers);
358                if (i)
359                        mem_hits.writebulk.ring_hit += 1;
360                else
361                        mem_hits.writebulk.miss += 1;
362                mem_hits.write.ring_hit += i;
363        } else { // Not enough cache space but there might later
364                // Fill the cache and empty it and then see what we're left with
365                i = (lc->total - lc->used);
366                memcpy(&lc->cache[lc->used], values, sizeof(void *) * i);
367                mem_hits.write.cache_hit += i;
368
369                // Make sure we still meet the minimum requirement
370                if (i < min_nb_buffers)
371                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
372                else
373                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, 0);
374
375                // Re originise fulls to the front
376                if (lc->used)
377                        memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used);
378
379                if (lc->used)
380                        mem_hits.writebulk.miss += 1;
381                else
382                        mem_hits.writebulk.ring_hit += 1;
383                mem_hits.write.ring_hit += lc->total - lc->used;
384        }
385
386        // Try empty the remaining
387        if (i < nb_buffers && lc->used != lc->total) {
388                size_t remaining;
389                remaining = MIN(lc->total - lc->used, nb_buffers - i);
390                memcpy(&lc->cache[lc->used], &values[i], sizeof(void *) * remaining);
391                lc->used += remaining;
392                i += remaining;
393        }
394        mem_hits.write.miss += nb_buffers - i;
395        return i;
396}
397
398DLLEXPORT size_t libtrace_ocache_free(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
399        struct local_cache *lc = find_cache(oc);
400        size_t i;
401        size_t min;
402
403        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
404        min = oc->max_allocations ? min_nb_buffers : 0;
405        if (lc)
406                i = libtrace_ocache_free_cache(oc, values, nb_buffers, min, lc);
407        else
408                i = libtrace_ringbuffer_swrite_bulk(&oc->rb, values, nb_buffers, min);
409
410        if (!oc->max_allocations) {
411                // Free these normally
412                for (;i < min_nb_buffers; ++i) {
413                        oc->free(values[i]);
414                }
415        }
416        return i;
417}
418
419DLLEXPORT void libtrace_zero_ocache(libtrace_ocache_t *oc) {
420        libtrace_zero_ringbuffer(&oc->rb);
421        oc->thread_cache_size = 0;
422        oc->alloc = NULL;
423        oc->free = NULL;
424        oc->current_allocations = 0;
425        oc->max_allocations = 0;
426        oc->nb_thread_list = 0;
427        oc->max_nb_thread_list = 0;
428        oc->thread_list = NULL;
429}
430
Note: See TracBrowser for help on using the repository browser.