source: lib/data-struct/object_cache.c @ 6a6e6a8

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 6a6e6a8 was 6a6e6a8, checked in by Richard Sanger <rsangerarj@…>, 7 years ago

More documentation, including some renaming and modifications to behaviour

  • Removes accessor functions for libtrace_result_t, instead directly access the structure
  • Documentation for most functions
  • Split tick into interval and count messages for the two modes of operation
  • Normalise interval and packet order to use the erf timestamp format
  • Rename trace_send_message_to_XXX to trace trace_message_XXX
  • Property mode set to 100644
File size: 14.0 KB
Line 
1#include "object_cache.h"
2#include <assert.h>
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6
7
8// pthread tls is most likely slower than __thread, but they have destructors so
9// we use a combination of the two here!!
10struct local_cache {
11        libtrace_ocache_t *oc;
12        size_t total;
13        size_t used;
14        void **cache;
15        bool invalid;
16};
17
18struct mem_stats {
19        struct memfail {
20           uint64_t cache_hit;
21           uint64_t ring_hit;
22           uint64_t miss;
23           uint64_t recycled;
24        } readbulk, read, write, writebulk;
25};
26
27extern __thread struct mem_stats mem_hits;
28static __thread size_t t_mem_caches_used = 0;
29static __thread size_t t_mem_caches_total = 0;
30static __thread struct local_cache *t_mem_caches = NULL;
31static pthread_key_t memory_destructor_key;
32static pthread_once_t memory_destructor_once = PTHREAD_ONCE_INIT;
33
34/**
35 * @brief unregister_thread assumes we DONT hold spin
36 */
37static inline void unregister_thread(struct local_cache *lc) {
38        size_t i;
39        if (lc->invalid)
40                fprintf(stderr, "Already free'd the thread cache!!\n");
41        pthread_spin_lock(&lc->oc->spin);
42        // Remove it from our thread list
43        for (i=0; i < lc->oc->nb_thread_list; ++i) {
44                if (lc->oc->thread_list[i] == lc) {
45                        --lc->oc->nb_thread_list;
46                        lc->oc->thread_list[i] = lc->oc->thread_list[lc->oc->nb_thread_list];
47                        lc->oc->thread_list[lc->oc->nb_thread_list] = NULL;
48                        i = ~0U;
49                        break;
50                }
51        }
52        if (i != ~0U) {
53                fprintf(stderr, "Umm this wasn't registered with us in the first place!!!!IGNORGING!!!!ANGRY\n");
54                pthread_spin_unlock(&lc->oc->spin);
55                return;
56        }
57        lc->invalid = true;
58
59        if (lc->oc->max_allocations) {
60                libtrace_ringbuffer_swrite_bulk(&lc->oc->rb, lc->cache, lc->used, lc->used);
61        } else {
62                size_t i;
63                // We just run the free these
64                for(i = 0; i < lc->used; ++i) {
65                        lc->oc->free(lc->cache[i]);
66                }
67        }
68        pthread_spin_unlock(&lc->oc->spin);
69}
70
71/**
72 * @brief register_thread assumes we DONT hold spin
73 */
74static inline void register_thread(libtrace_ocache_t *oc, struct local_cache *lc) {
75        lc->invalid = false;
76        pthread_spin_lock(&oc->spin);
77        if (oc->nb_thread_list == oc->max_nb_thread_list) {
78                oc->max_nb_thread_list += 0x10;
79                oc->thread_list = realloc(oc->thread_list, sizeof(void*) * oc->max_nb_thread_list);
80        }
81        oc->thread_list[oc->nb_thread_list] = lc;
82        ++oc->nb_thread_list;
83        pthread_spin_unlock(&oc->spin);
84}
85
86static void destroy_memory_cache(void *tlsaddr) {
87        assert(tlsaddr == t_mem_caches);
88        size_t a;
89
90        for (a = 0; a < t_mem_caches_used; ++a) {
91                unregister_thread(&t_mem_caches[a]);
92                // Write these all back to the main buffer, this might have issues we would want to free these
93                free(t_mem_caches[a].cache);
94        }
95        free(t_mem_caches);
96        t_mem_caches = NULL;
97}
98
99static void once_memory_cache_key_init() {
100        ASSERT_RET(pthread_key_create(&memory_destructor_key, &destroy_memory_cache), == 0);
101}
102
103/**
104 * Adds more space to our mem_caches
105 */
106static void resize_memory_caches() {
107        if (t_mem_caches == NULL) {
108                pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
109                t_mem_caches_total = 0x10;
110                t_mem_caches = calloc(0x10, sizeof(struct local_cache));
111                pthread_setspecific(memory_destructor_key, (void *) t_mem_caches);
112        } else {
113                t_mem_caches += 0x10;
114                t_mem_caches = realloc(t_mem_caches, t_mem_caches_total * sizeof(struct local_cache));
115                pthread_setspecific(memory_destructor_key, t_mem_caches);
116        }
117}
118
119static inline struct local_cache * find_cache(libtrace_ocache_t *oc) {
120        struct local_cache *lc = NULL;
121        size_t i;
122
123        for (i = 0; i < t_mem_caches_used; ++i) {
124                if (t_mem_caches[i].oc == oc) {
125                        lc = &t_mem_caches[i];
126                        break;
127                }
128        }
129
130        if (!oc->thread_cache_size)
131                return 0;
132
133        // Create a cache
134        if (!lc) {
135                if (t_mem_caches_used == t_mem_caches_total)
136                        resize_memory_caches();
137                t_mem_caches[t_mem_caches_used].oc = oc;
138                t_mem_caches[t_mem_caches_used].used = 0;
139                t_mem_caches[t_mem_caches_used].total = oc->thread_cache_size;
140                t_mem_caches[t_mem_caches_used].cache = malloc(sizeof(void*) * oc->thread_cache_size);
141                t_mem_caches[t_mem_caches_used].invalid = false;
142                lc = &t_mem_caches[t_mem_caches_used];
143                // Register it with the underlying ring_buffer
144                register_thread(lc->oc, lc);
145                ++t_mem_caches_used;
146        }
147
148        assert(!lc->invalid);
149        return lc;
150}
151
152/**
153  * Creates a object cache, that is a pool of dynamically allocated and recycled
154  * objects of a fixed size. This should be faster than malloc and free.
155  * The alloc and free methods are supplied by the user and are used when no
156  * recycled objects are available, or to tidy the final results.
157  *
158  * The performance of these pools will decrease if thread caches are used
159  * as this results in a list to lookup per thread. The pool is added when
160  * to this list when first encountered, these persist untill the thread exits.
161  *
162  * NOTE: If limit_size is true do not attempt to 'free' any objects that were
163  * not created by this pool back otherwise the 'free' might deadlock. Also
164  * be cautious when picking the buffer size, upto thread_cache_size*(threads-1)
165  * could be unusable at any given time if these are stuck in thread local caches.
166  *
167  * @param oc A pointer to the object cache structure which is to be initialised.
168  * @param alloc The allocation method, must not be NULL. [void *alloc()]
169  * @param free The free method used to destroy packets. [void free(void * obj)]
170  * @param thread_cache_size A small cache kept on a per thread basis, this can be 0
171  *             however should only be done if bulk reads of packets are being performed
172  *             or contention is minimal.
173  * @param buffer_size The number of packets to be stored in the main buffer.
174  * @param limit_size If true no more objects than buffer_size will be allocated,
175  *             reads will block (free never should).Otherwise packets can be freely
176  *     allocated upon requested and are free'd if there is not enough space for them.
177  * @return If successful returns 0 otherwise -1.
178  */
179DLLEXPORT int libtrace_ocache_init(libtrace_ocache_t *oc, void *(*alloc)(void),
180                                    void (*free)(void *),
181                                    size_t thread_cache_size,
182                                    size_t buffer_size, bool limit_size) {
183
184        assert(buffer_size);
185        assert(alloc);
186        assert(free);
187        if (libtrace_ringbuffer_init(&oc->rb, buffer_size, LIBTRACE_RINGBUFFER_BLOCKING) != 0) {
188                return -1;
189        }
190        oc->alloc = alloc;
191        oc->free = free;
192        oc->current_allocations = 0;
193        oc->thread_cache_size = thread_cache_size;
194        oc->nb_thread_list = 0;
195        oc->max_nb_thread_list = 0x10;
196        oc->thread_list = calloc(0x10, sizeof(void*));
197        if (oc->thread_list == NULL) {
198                libtrace_ringbuffer_destroy(&oc->rb);
199                return -1;
200        }
201        pthread_spin_init(&oc->spin, 0);
202        if (limit_size)
203                oc->max_allocations = buffer_size;
204        else
205                oc->max_allocations = 0;
206        return 0;
207}
208
209/**
210  * Destroys the object cache. Call this only once all memory has
211  * been free'd back and no more accesses will be made.
212  *
213  * @return Returns the number of packets outstanding, or extra object recevied
214  *             Ideally this should be zero (0) otherwise some form of memory leak
215  *             is likely present. Currenty only implemented in the case limit_size
216  *     is true.
217  */
218DLLEXPORT int libtrace_ocache_destroy(libtrace_ocache_t *oc) {
219        void *ele;
220
221        while (oc->nb_thread_list)
222                unregister_thread(oc->thread_list[0]);
223
224        pthread_spin_lock(&oc->spin);
225        while (libtrace_ringbuffer_try_read(&oc->rb, &ele)) {
226                oc->free(ele);
227                if (oc->max_allocations)
228                        --oc->current_allocations;
229        }
230        pthread_spin_unlock(&oc->spin);
231
232        if (oc->current_allocations)
233                fprintf(stderr, "OCache destroyed, leaking %d packets!!\n", (int) oc->current_allocations);
234
235        libtrace_ringbuffer_destroy(&oc->rb);
236        pthread_spin_destroy(&oc->spin);
237        free(oc->thread_list);
238        libtrace_zero_ocache(oc);
239        if (oc->current_allocations)
240                return (int) oc->current_allocations;
241        else
242                return 0;
243}
244
245static inline size_t libtrace_ocache_alloc_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
246                                                                                 struct local_cache *lc) {
247        libtrace_ringbuffer_t *rb = &oc->rb;
248        size_t i;
249
250        // We have enough cached!! Yay
251        if (nb_buffers <= lc->used) {
252                // Copy all from cache
253                memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers);
254                lc->used -= nb_buffers;
255                mem_hits.read.cache_hit += nb_buffers;
256                mem_hits.readbulk.cache_hit += 1;
257                return nb_buffers;
258        }
259        // Cache is not big enough try read all from ringbuffer
260        else if (nb_buffers > lc->total) {
261                i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers);
262                if (i)
263                        mem_hits.readbulk.ring_hit += 1;
264                else
265                        mem_hits.readbulk.miss += 1;
266                mem_hits.read.ring_hit += i;
267        } else { // Not enough cached
268                // Empty the cache and re-fill it and then see what we're left with
269                i = lc->used;
270                memcpy(values, lc->cache, sizeof(void *) * lc->used);
271                mem_hits.read.cache_hit += i;
272
273                // Make sure we still meet the minimum requirement
274                if (i < min_nb_buffers)
275                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
276                else
277                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0);
278
279                if (lc->used == lc->total)
280                        mem_hits.readbulk.ring_hit += 1;
281                else
282                        mem_hits.readbulk.miss += 1;
283                mem_hits.read.ring_hit += lc->used;
284        }
285
286        // Try fill the remaining
287        if (i < nb_buffers && lc->used) {
288                size_t remaining;
289                remaining = MIN(lc->used, nb_buffers - i);
290                memcpy(&values[i], &lc->cache[lc->used - remaining], sizeof(void *) * remaining);
291                lc->used -= remaining;
292                i += remaining;
293        }
294        mem_hits.read.miss += nb_buffers - i;
295        assert(i >= min_nb_buffers);
296        return i;
297}
298
299DLLEXPORT size_t libtrace_ocache_alloc(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
300        struct local_cache *lc = find_cache(oc);
301        size_t i;
302        size_t min;
303        bool try_alloc = !(oc->max_allocations && oc->max_allocations <= oc->current_allocations);
304
305        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
306        min = try_alloc ? 0: min_nb_buffers;
307        if (lc)
308                i = libtrace_ocache_alloc_cache(oc, values, nb_buffers, min,  lc);
309        else
310                i = libtrace_ringbuffer_sread_bulk(&oc->rb, values, nb_buffers, min);
311
312        if (try_alloc) {
313                size_t nb;
314
315                // Try alloc the rest
316                if (oc->max_allocations) {
317                        pthread_spin_lock(&oc->spin);
318                        nb = MIN(oc->max_allocations - oc->current_allocations, nb_buffers - i);
319                        oc->current_allocations += nb;
320                        pthread_spin_unlock(&oc->spin);
321                        nb += i;
322                } else {
323                        nb = nb_buffers;
324                }
325
326                for (;i < nb; ++i) {
327                        values[i] = (*oc->alloc)();
328                        assert(values[i]);
329                }
330                assert (i == nb);
331                // Still got to wait for more
332                if (nb < min_nb_buffers) {
333                        if (lc)
334                                i += libtrace_ocache_alloc_cache(oc, &values[nb], nb_buffers - nb, min_nb_buffers - nb, lc);
335                        else
336                                i += libtrace_ringbuffer_sread_bulk(&oc->rb, &values[nb], nb_buffers - nb, min_nb_buffers - nb);
337                }
338        }
339        assert(i >= min_nb_buffers);
340        return i;
341}
342
343
344static inline size_t libtrace_ocache_free_cache(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers,
345                                                                                        struct local_cache *lc) {
346        libtrace_ringbuffer_t *rb = &oc->rb;
347        size_t i;
348
349        // We have enough cached!! Yay
350        if (nb_buffers <= lc->total - lc->used) {
351                // Copy all to the cache
352                memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers);
353                lc->used += nb_buffers;
354                mem_hits.write.cache_hit += nb_buffers;
355                mem_hits.writebulk.cache_hit += 1;
356                return nb_buffers;
357        }
358        // Cache is not big enough try write all to the ringbuffer
359        else if (nb_buffers > lc->total) {
360                i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers);
361                if (i)
362                        mem_hits.writebulk.ring_hit += 1;
363                else
364                        mem_hits.writebulk.miss += 1;
365                mem_hits.write.ring_hit += i;
366        } else { // Not enough cache space but there might later
367                // Fill the cache and empty it and then see what we're left with
368                i = (lc->total - lc->used);
369                memcpy(&lc->cache[lc->used], values, sizeof(void *) * i);
370                mem_hits.write.cache_hit += i;
371
372                // Make sure we still meet the minimum requirement
373                if (i < min_nb_buffers)
374                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, min_nb_buffers - i);
375                else
376                        lc->used = lc->total - libtrace_ringbuffer_swrite_bulk(rb, lc->cache, lc->total, 0);
377
378                // Re originise fulls to the front
379                if (lc->used)
380                        memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used);
381
382                if (lc->used)
383                        mem_hits.writebulk.miss += 1;
384                else
385                        mem_hits.writebulk.ring_hit += 1;
386                mem_hits.write.ring_hit += lc->total - lc->used;
387        }
388
389        // Try empty the remaining
390        if (i < nb_buffers && lc->used != lc->total) {
391                size_t remaining;
392                remaining = MIN(lc->total - lc->used, nb_buffers - i);
393                memcpy(&lc->cache[lc->used], &values[i], sizeof(void *) * remaining);
394                lc->used += remaining;
395                i += remaining;
396        }
397        mem_hits.write.miss += nb_buffers - i;
398        return i;
399}
400
401DLLEXPORT size_t libtrace_ocache_free(libtrace_ocache_t *oc, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
402        struct local_cache *lc = find_cache(oc);
403        size_t i;
404        size_t min;
405
406        assert(oc->max_allocations ? nb_buffers < oc->max_allocations : 1);
407        min = oc->max_allocations ? min_nb_buffers : 0;
408        if (lc)
409                i = libtrace_ocache_free_cache(oc, values, nb_buffers, min, lc);
410        else
411                i = libtrace_ringbuffer_swrite_bulk(&oc->rb, values, nb_buffers, min);
412
413        if (!oc->max_allocations) {
414                // Free these normally
415                for (;i < min_nb_buffers; ++i) {
416                        oc->free(values[i]);
417                }
418        }
419        return i;
420}
421
422DLLEXPORT void libtrace_zero_ocache(libtrace_ocache_t *oc) {
423        libtrace_zero_ringbuffer(&oc->rb);
424        oc->thread_cache_size = 0;
425        oc->alloc = NULL;
426        oc->free = NULL;
427        oc->current_allocations = 0;
428        oc->max_allocations = 0;
429        oc->nb_thread_list = 0;
430        oc->max_nb_thread_list = 0;
431        oc->thread_list = NULL;
432}
433
434/**
435 * @brief ocache_unregister_thread removes a thread from an ocache.
436 * @param The ocache to remove this thread, this will free any packets in the TLS cache
437 */
438DLLEXPORT void libtrace_ocache_unregister_thread(libtrace_ocache_t *oc) {
439        size_t i;
440        struct local_cache *lc = find_cache(oc);
441
442        if (lc) {
443                for (i = 0; i < t_mem_caches_used; ++i) {
444                        if (&t_mem_caches[i] == lc) {
445                                // Free the cache against the ocache
446                                unregister_thread(&t_mem_caches[i]);
447                                free(t_mem_caches[i].cache);
448                                // And remove it from the thread itself
449                                --t_mem_caches_used;
450                                t_mem_caches[i] = t_mem_caches[t_mem_caches_used];
451                                memset(&t_mem_caches[t_mem_caches_used], 0, sizeof(struct local_cache));
452                        }
453                }
454        }
455}
Note: See TracBrowser for help on using the repository browser.