Changeset a49a9eb


Ignore:
Timestamp:
07/30/14 18:44:16 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
be3f75b
Parents:
41148f2
Message:

Add an object cache with thread local caches
All packets used by a trace are put through this.
Adds bulk read/write operations to the ringbuffer (used by the object cache)
Replace semaphores with condition variables to support these bulk operations.
Internally use bulk read operations from a single threaded formats to reduce lock overhead.
Replaces the asserts around pthread_* functions with a version that will still run the command if NDEBUG

Files:
2 added
12 edited

Legend:

Unmodified
Added
Removed
  • lib/Makefile.am

    rdafe86a ra49a9eb  
    5858                libtrace_arphrd.h \
    5959                data-struct/ring_buffer.c data-struct/vector.c data-struct/message_queue.c \
    60                 data-struct/deque.c data-struct/sliding_window.c \
     60                data-struct/deque.c data-struct/sliding_window.c data-struct/object_cache.c \
    6161                hash_toeplitz.c
    6262
  • lib/data-struct/deque.c

    r8c42377 ra49a9eb  
    2525        q->size = 0;
    2626        q->element_size = element_size;
    27         assert(pthread_mutex_init(&q->lock, NULL) == 0);
     27        ASSERT_RET(pthread_mutex_init(&q->lock, NULL), == 0);
    2828}
    2929
     
    3636        memcpy(&new_node->data, d, q->element_size);
    3737        // Only ->prev is unknown at this stage to be completed in lock
    38         assert(pthread_mutex_lock(&q->lock) == 0);
     38        ASSERT_RET(pthread_mutex_lock(&q->lock), == 0);
    3939        if (q->head == NULL) {
    4040                assert(q->tail == NULL && q->size == 0);
     
    4848        }
    4949        q->size++;
    50         assert(pthread_mutex_unlock(&q->lock) == 0);
     50        ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0);
    5151}
    5252
     
    5959        memcpy(&new_node->data, d, q->element_size);
    6060        // Only ->next is unknown at this stage to be completed in lock
    61         assert(pthread_mutex_lock(&q->lock) == 0);
     61        ASSERT_RET(pthread_mutex_lock(&q->lock), == 0);
    6262        if (q->head == NULL) {
    6363                assert(q->tail == NULL && q->size == 0);
     
    7272        }
    7373        q->size++;
    74         assert(pthread_mutex_unlock(&q->lock) == 0);
     74        ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0);
    7575}
    7676
     
    7878{
    7979        int ret = 1;
    80         assert(pthread_mutex_lock(&q->lock) == 0);
     80        ASSERT_RET(pthread_mutex_lock(&q->lock), == 0);
    8181        if (q->head == NULL)
    8282                ret = 0;
    8383        else
    8484                memcpy(d, &q->head->data, q->element_size);
    85         assert(pthread_mutex_unlock(&q->lock) == 0);
     85        ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0);
    8686        return ret;
    8787}
     
    9090{
    9191        int ret = 1;
    92         assert(pthread_mutex_lock(&q->lock) == 0);
     92        ASSERT_RET(pthread_mutex_lock(&q->lock), == 0);
    9393        if (q->tail == NULL)
    9494                ret = 0;
    9595        else
    9696                memcpy(d, &q->tail->data, q->element_size);
    97         assert(pthread_mutex_unlock(&q->lock) == 0);
     97        ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0);
    9898        return ret;
    9999}
     
    103103        int ret = 0;
    104104        list_node_t * n = NULL;
    105         assert(pthread_mutex_lock(&q->lock) == 0);
     105        ASSERT_RET(pthread_mutex_lock(&q->lock), == 0);
    106106        if (q->head != NULL) {
    107107                n = q->head;
     
    114114                        q->tail = q->head;
    115115        }
    116         assert(pthread_mutex_unlock(&q->lock) == 0);
     116        ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0);
    117117        // Unlock once we've removed it :)
    118118        if (ret) {
     
    127127        int ret = 0;
    128128        list_node_t * n;
    129         assert(pthread_mutex_lock(&q->lock) == 0);
     129        ASSERT_RET(pthread_mutex_lock(&q->lock), == 0);
    130130        if (q->tail != NULL) {
    131131                n = q->tail;
     
    138138                        q->head = q->tail;
    139139        }
    140         assert(pthread_mutex_unlock(&q->lock) == 0);
     140        ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0);
    141141        if (ret) {
    142142                memcpy(d, &n->data, q->element_size);
     
    150150#if RACE_SAFE
    151151        size_t ret;
    152         assert(pthread_mutex_lock(&q->lock) == 0);
     152        ASSERT_RET(pthread_mutex_lock(&q->lock), == 0);
    153153        ret = q->size;
    154         assert(pthread_mutex_unlock(&q->lock) == 0);
     154        ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0);
    155155        return ret;
    156156#else
     
    169169        list_node_t *n;
    170170        assert(q->element_size == sizeof(libtrace_result_t));
    171         assert(pthread_mutex_lock(&q->lock) == 0);
     171        ASSERT_RET(pthread_mutex_lock(&q->lock), == 0);
    172172        n = q->head;
    173173        for (n = q->head; n != NULL; n = n->next) {
    174174                (*fn)(&n->data);
    175175        }
    176         assert(pthread_mutex_unlock(&q->lock) == 0);
     176        ASSERT_RET(pthread_mutex_unlock(&q->lock), == 0);
    177177}
  • lib/data-struct/message_queue.c

    r5ba34eb ra49a9eb  
    1919{
    2020        assert(message_len);
    21         assert(pipe(mq->pipefd) != -1);
     21        ASSERT_RET(pipe(mq->pipefd), != -1);
    2222        mq->message_count = 0;
    2323        if (message_len > PIPE_BUF)
     
    4545        int ret;
    4646        assert(mq->message_len);
    47         assert(write(mq->pipefd[1], message, mq->message_len) == (int) mq->message_len);
     47        ASSERT_RET(write(mq->pipefd[1], message, mq->message_len), == (int) mq->message_len);
    4848        // Update after we've written
    4949        pthread_spin_lock(&mq->spin);
     
    7373        ret = mq->message_count--;
    7474        pthread_spin_unlock(&mq->spin);
    75         assert(read(mq->pipefd[0], message, mq->message_len) == (int) mq->message_len);
     75        ASSERT_RET(read(mq->pipefd[0], message, mq->message_len), == (int) mq->message_len);
    7676        return ret;
    7777}
     
    102102                ret = --mq->message_count;
    103103                // :( read(...) needs to be done within the *spin* lock otherwise blocking might steal our read
    104                 assert(read(mq->pipefd[0], message, mq->message_len) == (int) mq->message_len);
     104                ASSERT_RET(read(mq->pipefd[0], message, mq->message_len), == (int) mq->message_len);
    105105        } else {
    106106                ret = LIBTRACE_MQ_FAILED;
  • lib/data-struct/message_queue.h

    r5ba34eb ra49a9eb  
    11#include <pthread.h>
    22#include <limits.h>
     3#include "../libtrace.h"
    34
    45#ifndef LIBTRACE_MESSAGE_QUEUE
  • lib/data-struct/ring_buffer.c

    read9478 ra49a9eb  
    77#include <stdlib.h>
    88#include <assert.h>
     9#include <string.h>
     10#include <stdio.h>
    911
    1012#define LOCK_TYPE_MUTEX 0 // Default if not defined
    1113#define LOCK_TYPE_SPIN 1
    12 #define LOCK_TYPE_SEMAPHORE 2
    13 #define LOCK_TYPE_NONE 3
     14#define LOCK_TYPE_NONE 2
    1415
    1516// No major difference noticed here between mutex and spin, both have there
    1617// downsides.
    1718
    18 #define USE_MODULUS 1
    1919#define USE_CHECK_EARLY 1
    2020
    21 #define USE_LOCK_TYPE LOCK_TYPE_SPIN
     21#define USE_LOCK_TYPE LOCK_TYPE_MUTEX
    2222#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
    23 #       define LOCK(dir) assert(pthread_spin_lock(&rb->s ## dir ## lock) == 0)
    24 #       define UNLOCK(dir) assert(pthread_spin_unlock(&rb->s ## dir ## lock) == 0)
     23#       define LOCK(dir) ASSERT_RET(pthread_spin_lock(&rb->s ## dir ## lock), == 0)
     24#       define UNLOCK(dir) ASSERT_RET(pthread_spin_unlock(&rb->s ## dir ## lock), == 0)
    2525#       define TRY_LOCK(dir, action) if(pthread_spin_lock(&rb->s ## dir ## lock) != 0) { \
    26                                                                 action }
    27 #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
    28 #       define LOCK(dir) assert(sem_wait(&rb->sem ## dir ## lock) == 0)
    29 #       define UNLOCK(dir) assert(sem_post(&rb->sem ## dir ## lock) == 0)
    30 #       define TRY_LOCK(dir, action) if(sem_trywait(&rb->sem ## dir ## lock) != 0) { \
    3126                                                                action }
    3227#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
     
    3530#       define TRY_LOCK(dir, action)
    3631#else // Mutex
    37 #       define LOCK(dir) assert(pthread_mutex_lock(&rb-> dir ## lock) == 0)
    38 #       define UNLOCK(dir) assert(pthread_mutex_unlock(&rb-> dir ## lock) == 0)
     32#       define LOCK(dir) ASSERT_RET(pthread_mutex_lock(&rb-> dir ## lock), == 0)
     33#       define UNLOCK(dir) ASSERT_RET(pthread_mutex_unlock(&rb-> dir ## lock), == 0)
    3934#       define TRY_LOCK(dir, action) if(pthread_mutex_lock(&rb-> dir ## lock) != 0) {\
    4035                                                                action }
     
    6459        if (mode == LIBTRACE_RINGBUFFER_BLOCKING) {
    6560                /* The signaling part - i.e. release when data's ready to read */
    66                 assert(sem_init(&rb->fulls, 0, 0) == 0);
    67                 assert(sem_init(&rb->emptys, 0, size - 1) == 0); // REMEMBER the -1 here :) very important
     61                pthread_cond_init(&rb->full_cond, NULL);
     62                pthread_cond_init(&rb->empty_cond, NULL);
     63                ASSERT_RET(pthread_mutex_init(&rb->empty_lock, NULL), == 0);
     64                ASSERT_RET(pthread_mutex_init(&rb->full_lock, NULL), == 0);
    6865        }
    6966        /* The mutual exclusion part */
    7067#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
    7168#warning "using spinners"
    72         assert(pthread_spin_init(&rb->swlock, 0) == 0);
    73         assert(pthread_spin_init(&rb->srlock, 0) == 0);
    74 #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
    75 #warning "using semaphore"
    76         assert(sem_init(&rb->semrlock, 0, 1) != -1);
    77         assert(sem_init(&rb->semwlock, 0, 1) != -1);
     69        ASSERT_RET(pthread_spin_init(&rb->swlock, 0), == 0);
     70        ASSERT_RET(pthread_spin_init(&rb->srlock, 0), == 0);
    7871#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
    7972#warning "No locking used"
    80 #else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */
    81         assert(pthread_mutex_init(&rb->wlock, NULL) == 0);
    82         assert(pthread_mutex_init(&rb->rlock, NULL) == 0);
     73#else
     74        ASSERT_RET(pthread_mutex_init(&rb->wlock, NULL), == 0);
     75        ASSERT_RET(pthread_mutex_init(&rb->rlock, NULL), == 0);
    8376#endif
    8477}
     
    9083DLLEXPORT void libtrace_ringbuffer_destroy(libtrace_ringbuffer_t * rb) {
    9184#if USE_LOCK_TYPE == LOCK_TYPE_SPIN
    92         assert(pthread_spin_destroy(&rb->swlock) == 0);
    93         assert(pthread_spin_destroy(&rb->srlock) == 0);
    94 #elif USE_LOCK_TYPE == LOCK_TYPE_SEMAPHORE
    95         assert(sem_destroy(&rb->semrlock) != -1);
    96         assert(sem_destroy(&rb->semwlock) != -1);
     85        ASSERT_RET(pthread_spin_destroy(&rb->swlock), == 0);
     86        ASSERT_RET(pthread_spin_destroy(&rb->srlock), == 0);
    9787#elif USE_LOCK_TYPE == LOCK_TYPE_NONE
    98 #else /* USE_LOCK_TYPE == LOCK_TYPE_MUTEX */
    99         assert(pthread_mutex_destroy(&rb->wlock) == 0);
    100         assert(pthread_mutex_destroy(&rb->rlock) == 0);
    101 #endif
     88#endif
     89        ASSERT_RET(pthread_mutex_destroy(&rb->wlock), == 0);
     90        ASSERT_RET(pthread_mutex_destroy(&rb->rlock), == 0);
    10291        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
    103                 assert(sem_destroy(&rb->fulls) == 0);
    104                 assert(sem_destroy(&rb->emptys) == 0);
     92                pthread_cond_destroy(&rb->full_cond);
     93                pthread_cond_destroy(&rb->empty_cond);
    10594        }
    10695        rb->size = 0;
     
    126115 */
    127116DLLEXPORT int libtrace_ringbuffer_is_full(const libtrace_ringbuffer_t * rb) {
    128 #if USE_MODULUS
    129117        return rb->start == ((rb->end + 1) % rb->size);
    130 #else
    131         return rb->start == ((rb->end + 1 < rb->size) ? rb->end + 1 : 0);
    132 #endif
    133 }
    134 
    135 /**
    136  * Performs a blocking write to the buffer, upon return the value will be
    137  * stored. This will not clobber old values.
    138  *
    139  * This assumes only one thread writing at once. Use
    140  * libtrace_ringbuffer_swrite for a thread safe version.
    141  *
    142  * @param rb a pointer to libtrace_ringbuffer structure
    143  * @param value the value to store
    144  */
    145 DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) {
     118}
     119
     120static inline size_t libtrace_ringbuffer_nb_full(const libtrace_ringbuffer_t *rb) {
     121        if (rb->end < rb->start)
     122                return rb->end + rb->size - rb->start;
     123        else
     124                return rb->end - rb->start;
     125        // return (rb->end + rb->size - rb->start) % rb->size;
     126}
     127
     128static inline size_t libtrace_ringbuffer_nb_empty(const libtrace_ringbuffer_t *rb) {
     129        if (rb->start <= rb->end)
     130                return rb->start + rb->size - rb->end - 1;
     131        else
     132                return rb->start - rb->end - 1;
     133        // return (rb->start + rb->size - rb->end - 1) % rb->size;
     134}
     135
     136/**
     137 * Waits for a empty slot, that we can write to.
     138 * @param rb The ringbuffer
     139 */
     140static inline void wait_for_empty(libtrace_ringbuffer_t *rb) {
    146141        /* Need an empty to start with */
    147         if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
    148                 assert(sem_wait(&rb->emptys) == 0);
    149         else
     142        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
     143                pthread_mutex_lock(&rb->empty_lock);
     144                while (libtrace_ringbuffer_is_full(rb))
     145                        pthread_cond_wait(&rb->empty_cond, &rb->empty_lock);
     146                pthread_mutex_unlock(&rb->empty_lock);
     147        } else {
    150148                while (libtrace_ringbuffer_is_full(rb))
    151149                        /* Yield our time, why?, we tried and failed to write an item
     
    154152                         * burst to write without blocking */
    155153                        sched_yield();//_mm_pause();
    156 
     154        }
     155}
     156
     157/**
     158 * Waits for a full slot, that we read from.
     159 * @param rb The ringbuffer
     160 */
     161static inline void wait_for_full(libtrace_ringbuffer_t *rb) {
     162        /* Need an empty to start with */
     163        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
     164                pthread_mutex_lock(&rb->full_lock);
     165                while (libtrace_ringbuffer_is_empty(rb))
     166                        pthread_cond_wait(&rb->full_cond, &rb->full_lock);
     167                pthread_mutex_unlock(&rb->full_lock);
     168        } else {
     169                while (libtrace_ringbuffer_is_empty(rb))
     170                        /* Yield our time, why?, we tried and failed to write an item
     171                         * to the buffer - so we should give up our time in the hope
     172                         * that the reader thread can empty the buffer giving us a good
     173                         * burst to write without blocking */
     174                        sched_yield();//_mm_pause();
     175        }
     176}
     177
     178/**
     179 * Notifies we have created a full slot, after a write.
     180 * @param rb The ringbuffer
     181 */
     182static inline void notify_full(libtrace_ringbuffer_t *rb) {
     183        /* Need an empty to start with */
     184        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
     185                pthread_mutex_lock(&rb->full_lock);
     186                pthread_cond_broadcast(&rb->full_cond);
     187                pthread_mutex_unlock(&rb->full_lock);
     188        }
     189}
     190
     191/**
     192 * Notifies we have created an empty slot, after a read.
     193 * @param rb The ringbuffer
     194 */
     195static inline void notify_empty(libtrace_ringbuffer_t *rb) {
     196        /* Need an empty to start with */
     197        if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING) {
     198                pthread_mutex_lock(&rb->empty_lock);
     199                pthread_cond_broadcast(&rb->empty_cond);
     200                pthread_mutex_unlock(&rb->empty_lock);
     201        }
     202}
     203
     204/**
     205 * Performs a blocking write to the buffer, upon return the value will be
     206 * stored. This will not clobber old values.
     207 *
     208 * This assumes only one thread writing at once. Use
     209 * libtrace_ringbuffer_swrite for a thread safe version.
     210 *
     211 * @param rb a pointer to libtrace_ringbuffer structure
     212 * @param value the value to store
     213 */
     214DLLEXPORT void libtrace_ringbuffer_write(libtrace_ringbuffer_t * rb, void* value) {
     215        /* Need an empty to start with */
     216        wait_for_empty(rb);
    157217        rb->elements[rb->end] = value;
    158 #if USE_MODULUS
    159218        rb->end = (rb->end + 1) % rb->size;
    160 #else
    161         rb->end = (rb->end + 1 < rb->size) ? rb->end + 1 : 0;
    162 #endif
    163         /* This check is bad we can easily lose our time slice, and the reader
    164          * can catch up before it should, in this case spin locking is used */
    165         //if (libtrace_ringbuffer_is_empty(rb))
    166         //      assert(0 == 1);
    167         /* Now we've made another full */
    168         if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
    169                 assert(sem_post(&rb->fulls) == 0);
     219        notify_full(rb);
     220}
     221
     222/**
     223 * Performs a blocking write to the buffer, upon return the value will be
     224 * stored. This will not clobber old values.
     225 *
     226 * This assumes only one thread writing at once. Use
     227 * libtrace_ringbuffer_swrite for a thread safe version.
     228 *
     229 * Packets are written out from start to end in order, if only some packets are
     230 * written those at the end of the array will be still be unwritten.
     231 *
     232 * @param rb a pointer to libtrace_ringbuffer structure
     233 * @param values A pointer to a memory address read in
     234 * @param nb_buffer The maximum buffers to write i.e. the length of values
     235 * @param min_nb_buffers The minimum number of buffers to write
     236 * @param value the value to store
     237 */
     238DLLEXPORT size_t libtrace_ringbuffer_write_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
     239        size_t nb_ready;
     240        size_t i = 0;
     241       
     242        assert(min_nb_buffers <= nb_buffers);
     243        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb))
     244                return 0;
     245
     246        do {
     247                register size_t end;
     248                wait_for_empty(rb);
     249                nb_ready = libtrace_ringbuffer_nb_empty(rb);
     250                nb_ready = MIN(nb_ready, nb_buffers-i);
     251                nb_ready += i;
     252                // TODO consider optimising into at most 2 memcpys??
     253                end = rb->end;
     254                for (; i < nb_ready; i++) {
     255                        rb->elements[end] = values[i];
     256                        end = (end + 1) % rb->size;
     257                }
     258                rb->end = end;
     259                notify_full(rb);
     260        } while (i < min_nb_buffers);
     261        return i;
    170262}
    171263
     
    198290       
    199291        /* We need a full slot */
    200         if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
    201                 assert(sem_wait(&rb->fulls) == 0);
    202         else
    203                 while (libtrace_ringbuffer_is_empty(rb))
    204                         /* Yield our time, why?, we tried and failed to read an item
    205                          * from the buffer - so we should give up our time in the hope
    206                          * that the writer thread can fill the buffer giving us a good
    207                          * burst to read without blocking etc */
    208                         sched_yield();//_mm_pause();
     292        wait_for_full(rb);
     293        value = rb->elements[rb->start];
     294        rb->start = (rb->start + 1) % rb->size;
     295        /* Now that's an empty slot */
     296        notify_empty(rb);
     297        return value;
     298}
     299
     300/**
     301 * Waits and reads from the supplied buffer, note this will block forever.
     302 * Attempts to read the requested number of packets, however will return
     303 * with only the number that are currently ready.
     304 *
     305 * Set min_nb_buffers to 0 to 'try' read packets.
     306 *
     307 * The buffer is filled from start to finish i.e. if 2 is returned [0] and [1]
     308 * are valid.
     309 *
     310 * @param rb a pointer to libtrace_ringbuffer structure
     311 * @param values A pointer to a memory address where the returned item would be placed
     312 * @param nb_buffer The maximum buffers to read i.e. the length of values
     313 * @param min_nb_buffers The minimum number of buffers to read
     314 * @return The number of packets read
     315 */
     316DLLEXPORT size_t libtrace_ringbuffer_read_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
     317        size_t nb_ready;
     318        size_t i = 0;
    209319       
    210         value = rb->elements[rb->start];
    211 #if USE_MODULUS
    212         rb->start = (rb->start + 1) % rb->size;
    213 #else
    214         rb->start = (rb->start + 1 < rb->size) ? rb->start + 1 : 0;
    215 #endif
    216         /* Now that's a empty slot */
    217         if (rb->mode == LIBTRACE_RINGBUFFER_BLOCKING)
    218                 assert(sem_post(&rb->emptys) == 0);
    219         return value;
    220 }
     320        assert(min_nb_buffers <= nb_buffers);
     321
     322        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb))
     323                return 0;
     324
     325        do {
     326                register size_t start;
     327                /* We need a full slot */
     328                wait_for_full(rb);
     329
     330                nb_ready = libtrace_ringbuffer_nb_full(rb);
     331                nb_ready = MIN(nb_ready, nb_buffers-i);
     332                // Additional to the i we've already read
     333                nb_ready += i;
     334                start = rb->start;
     335                for (; i < nb_ready; i++) {
     336                        values[i] = rb->elements[start];
     337                        start = (start + 1) % rb->size;
     338                }
     339                rb->start = start;
     340                /* Now that's an empty slot */
     341                notify_empty(rb);
     342        } while (i < min_nb_buffers);
     343        return i;
     344}
     345
    221346
    222347/**
     
    242367        libtrace_ringbuffer_write(rb, value);
    243368        UNLOCK(w);
     369}
     370
     371/**
     372 * A thread safe version of libtrace_ringbuffer_write_bulk
     373 */
     374DLLEXPORT size_t libtrace_ringbuffer_swrite_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
     375        size_t ret;
     376#if USE_CHECK_EARLY
     377        if (!min_nb_buffers && libtrace_ringbuffer_is_full(rb)) // Check early
     378                return 0;
     379#endif
     380        LOCK(w);
     381        ret = libtrace_ringbuffer_write_bulk(rb, values, nb_buffers, min_nb_buffers);
     382        UNLOCK(w);
     383        return ret;
    244384}
    245385
     
    290430
    291431/**
     432 * A thread safe version of libtrace_ringbuffer_read_bulk
     433 */
     434DLLEXPORT size_t libtrace_ringbuffer_sread_bulk(libtrace_ringbuffer_t * rb, void *values[], size_t nb_buffers, size_t min_nb_buffers) {
     435        size_t ret;
     436#if USE_CHECK_EARLY
     437        if (!min_nb_buffers && libtrace_ringbuffer_is_empty(rb)) // Check early
     438                return 0;
     439#endif
     440        LOCK(r);
     441        ret = libtrace_ringbuffer_read_bulk(rb, values, nb_buffers, min_nb_buffers);
     442        UNLOCK(r);
     443        return ret;
     444}
     445
     446/**
    292447 * A thread safe version of libtrace_ringbuffer_try_write
    293448 */
     
    330485        rb->elements = NULL;
    331486}
     487
     488
  • lib/data-struct/ring_buffer.h

    read9478 ra49a9eb  
    2020        pthread_spinlock_t swlock;
    2121        pthread_spinlock_t srlock;
    22         sem_t semrlock;
    23         sem_t semwlock;
    24         sem_t emptys;
    25         sem_t fulls;
     22        // We need to ensure that broadcasts dont get lost hence
     23        // these locks below
     24        // We avoid using semaphores since they don't allow
     25        // multiple releases.
     26        pthread_mutex_t empty_lock;
     27        pthread_mutex_t full_lock;
     28        pthread_cond_t empty_cond; // Signal when empties are ready
     29        pthread_cond_t full_cond; // Signal when fulls are ready
    2630        // Aim to get this on a separate cache line to start - important if spinning
    2731        volatile size_t end;
     
    4650DLLEXPORT int libtrace_ringbuffer_try_sread_bl(libtrace_ringbuffer_t *rb, void ** value);
    4751
     52
     53
     54DLLEXPORT size_t libtrace_ringbuffer_write_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers);
     55DLLEXPORT size_t libtrace_ringbuffer_read_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers);
     56DLLEXPORT size_t libtrace_ringbuffer_sread_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers);
     57DLLEXPORT size_t libtrace_ringbuffer_swrite_bulk(libtrace_ringbuffer_t *rb, void *values[], size_t nb_buffers, size_t min_nb_buffers);
     58
    4859#endif
  • lib/data-struct/vector.c

    rfac8c46 ra49a9eb  
    1010        v->max_size = 128; // Pick a largish size to begin with
    1111        v->elements = malloc(v->max_size * v->element_size);
    12         assert(pthread_mutex_init(&v->lock, NULL) == 0);
     12        ASSERT_RET(pthread_mutex_init(&v->lock, NULL), == 0);
    1313}
    1414
    1515DLLEXPORT void libtrace_vector_destroy(libtrace_vector_t *v) {
    16         assert(pthread_mutex_destroy(&v->lock) == 0);
     16        ASSERT_RET(pthread_mutex_destroy(&v->lock), == 0);
    1717        free(v->elements);
    1818        // Be safe make sure we wont work any more
     
    2424
    2525DLLEXPORT void libtrace_vector_push_back(libtrace_vector_t *v, void *d) {
    26         assert(pthread_mutex_lock(&v->lock) == 0);
     26        ASSERT_RET(pthread_mutex_lock(&v->lock), == 0);
    2727        if (v->size >= v->max_size) {
    2828                /* Resize */
     
    3333        memcpy(&v->elements[v->size*v->element_size], d, v->element_size);
    3434        v->size++;
    35         assert(pthread_mutex_unlock(&v->lock) == 0);
     35        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    3636}
    3737
     
    4141
    4242DLLEXPORT int libtrace_vector_get(libtrace_vector_t *v, size_t location, void *d) {
    43         assert(pthread_mutex_lock(&v->lock) == 0);
     43        ASSERT_RET(pthread_mutex_lock(&v->lock), == 0);
    4444        if (location >= v->size) {
    45                 assert(pthread_mutex_unlock(&v->lock) == 0);
     45                ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    4646                return 0;
    4747        }
    4848        memcpy(d, &v->elements[location*v->element_size], v->element_size);
    49         assert(pthread_mutex_unlock(&v->lock) == 0);
     49        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    5050        return 1;
    5151}
     
    5353DLLEXPORT int libtrace_vector_remove_front(libtrace_vector_t *v) {
    5454        size_t i;
    55         assert(pthread_mutex_lock(&v->lock) == 0);
     55        ASSERT_RET(pthread_mutex_lock(&v->lock), == 0);
    5656        if (!v->size) {
    57                 assert(pthread_mutex_unlock(&v->lock) == 0);
     57                ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    5858                return 0;
    5959        }
     
    6262        for (i = 0; i < v->size * v->element_size; i++)
    6363                v->elements[i] = v->elements[i+v->element_size];
    64         assert(pthread_mutex_unlock(&v->lock) == 0);
     64        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    6565        return 1;
    6666}
     
    8282        if (src->size == 0) // Nothing to do if this is the case
    8383                return;
    84         assert(pthread_mutex_lock(&dest->lock) == 0);
    85         assert(pthread_mutex_lock(&src->lock) == 0);
     84        ASSERT_RET(pthread_mutex_lock(&dest->lock), == 0);
     85        ASSERT_RET(pthread_mutex_lock(&src->lock), == 0);
    8686        if (src->size == 0) // Double check now we've got the locks - Nothing to do if this is the case
    8787                goto unlock;
     
    104104        }
    105105unlock:
    106         assert(pthread_mutex_unlock(&src->lock) == 0);
    107         assert(pthread_mutex_unlock(&dest->lock) == 0);
     106        ASSERT_RET(pthread_mutex_unlock(&src->lock), == 0);
     107        ASSERT_RET(pthread_mutex_unlock(&dest->lock), == 0);
    108108}
    109109
     
    117117
    118118DLLEXPORT void libtrace_vector_empty(libtrace_vector_t *v) {
    119         assert(pthread_mutex_lock(&v->lock) == 0);
     119        ASSERT_RET(pthread_mutex_lock(&v->lock), == 0);
    120120        v->size = 0;
    121         assert(pthread_mutex_unlock(&v->lock) == 0);
     121        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    122122}
    123123
     
    126126{
    127127        size_t cur;
    128         assert(pthread_mutex_lock(&v->lock) == 0);
     128        ASSERT_RET(pthread_mutex_lock(&v->lock), == 0);
    129129        for (cur = 0; cur < v->size; cur++) {
    130130                (*fn)(&v->elements[cur*v->element_size]);
    131131        }
    132         assert(pthread_mutex_unlock(&v->lock) == 0);
     132        ASSERT_RET(pthread_mutex_unlock(&v->lock), == 0);
    133133}
  • lib/libtrace.h.in

    r17c5749 ra49a9eb  
    115115/** DAG driver version installed on the current system */
    116116#define DAG_DRIVER_V "@DAG_VERSION_NUM@"
     117
     118/**
     119  * A version of assert that always runs the first argument even
     120  * when not debugging, however only asserts the condition if debugging
     121  * Intended for use mainly with pthread locks etc. which have error
     122  * returns but *should* never actually fail.
     123  */
     124#ifdef NDEBUG
     125#define ASSERT_RET(run, cond) run
     126#else
     127#define ASSERT_RET(run, cond) assert(run cond)
     128#endif
    117129   
    118130#ifdef __cplusplus
     
    31883200
    31893201DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, fn_per_pkt per_pkt, fn_reducer reducer);
    3190 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet);
    31913202DLLEXPORT int trace_ppause(libtrace_t *libtrace);
    31923203DLLEXPORT int trace_pstop(libtrace_t *libtrace);
  • lib/libtrace_int.h

    r50ce607 ra49a9eb  
    149149
    150150#include "data-struct/ring_buffer.h"
     151#include "data-struct/object_cache.h"
    151152#include "data-struct/vector.h"
    152153#include "data-struct/message_queue.h"
     
    317318        int packet_freelist_size;
    318319        /** The actual freelist */
    319         libtrace_ringbuffer_t packet_freelist;
     320        libtrace_ocache_t packet_freelist;
    320321        /** The number of packets that can queue per thread - XXX consider deadlocks with non malloc()'d packets that need to be released */
    321322        int perpkt_buffer_size;
  • lib/trace.c

    rc99b1e5 ra49a9eb  
    273273        libtrace->perpkt_buffer_size = 0;
    274274        libtrace->expected_key = 0;
    275         libtrace_zero_ringbuffer(&libtrace->packet_freelist);
     275        libtrace_zero_ocache(&libtrace->packet_freelist);
    276276        libtrace_zero_thread(&libtrace->hasher_thread);
    277277        libtrace_zero_thread(&libtrace->reducer_thread);
     
    395395        libtrace->packet_freelist_size = 0;
    396396        libtrace->perpkt_buffer_size = 0;
    397         libtrace_zero_ringbuffer(&libtrace->packet_freelist);
     397        libtrace_zero_ocache(&libtrace->packet_freelist);
    398398        libtrace_zero_thread(&libtrace->hasher_thread);
    399399        libtrace_zero_thread(&libtrace->reducer_thread);
     
    651651                }
    652652                free(libtrace->first_packets.packets);
    653                 assert(pthread_spin_destroy(&libtrace->first_packets.lock) == 0);
     653                ASSERT_RET(pthread_spin_destroy(&libtrace->first_packets.lock), == 0);
    654654        }
    655655
     
    666666        /* Empty any packet memory */
    667667        if (libtrace->state != STATE_NEW) {
    668                 libtrace_packet_t * packet;
    669                 while (libtrace_ringbuffer_try_read(&libtrace->packet_freelist,(void **) &packet))
    670                         trace_destroy_packet(packet);
    671                
    672                 libtrace_ringbuffer_destroy(&libtrace->packet_freelist);
     668                // This has all of our packets
     669                libtrace_ocache_destroy(&libtrace->packet_freelist);
    673670               
    674671                for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
     
    721718}
    722719
    723 DLLEXPORT libtrace_packet_t *trace_create_packet(void) 
    724 {
    725         libtrace_packet_t *packet = 
     720DLLEXPORT libtrace_packet_t *trace_create_packet(void)
     721{
     722        libtrace_packet_t *packet =
    726723                (libtrace_packet_t*)calloc((size_t)1,sizeof(libtrace_packet_t));
    727724
     
    14531450#if HAVE_LLVM
    14541451        if (!filter->jitfilter) {
    1455                 assert(pthread_mutex_lock(&mutex) == 0);
     1452                ASSERT_RET(pthread_mutex_lock(&mutex), == 0);
    14561453                /* Again double check here like the bpf filter */
    14571454                if(filter->jitfilter)
     
    14621459                 * as such lock here anyways */
    14631460                        filter->jitfilter = compile_program(filter->filter.bf_insns, filter->filter.bf_len);
    1464                 assert(pthread_mutex_unlock(&mutex) == 0);
     1461                ASSERT_RET(pthread_mutex_unlock(&mutex), == 0);
    14651462        }
    14661463#endif
  • lib/trace_parallel.c

    r049a700 ra49a9eb  
    103103#define VERBOSE_DEBBUGING 0
    104104
     105
     106static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
     107
    105108extern int libtrace_parallel;
    106109
     
    109112        uint64_t wait_for_fill_complete_hits;
    110113} contention_stats[1024];
     114
     115struct mem_stats {
     116        struct memfail {
     117           uint64_t cache_hit;
     118           uint64_t ring_hit;
     119           uint64_t miss;
     120           uint64_t recycled;
     121        } readbulk, read, write, writebulk;
     122};
     123
     124// Grrr gcc wants this spelt out
     125__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
     126
     127static void print_memory_stats() {
     128        char t_name[50];
     129        uint64_t total;
     130        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
     131
     132        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
     133
     134        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
     135        if (total) {
     136                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
     137                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
     138                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
     139                                total, (double) mem_hits.read.miss / (double) total * 100.0);
     140        }
     141
     142        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
     143        if (total) {
     144                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
     145                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
     146
     147
     148                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
     149                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
     150        }
     151
     152        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
     153        if (total) {
     154                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
     155                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
     156
     157                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
     158                                total, (double) mem_hits.write.miss / (double) total * 100.0);
     159        }
     160
     161        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
     162        if (total) {
     163                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
     164                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
     165
     166                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
     167                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
     168        }
     169
     170}
    111171
    112172/**
     
    165225        const enum trace_state new_state, const bool need_lock)
    166226{
    167         enum trace_state prev_state;
     227        UNUSED enum trace_state prev_state;
    168228        if (need_lock)
    169229                pthread_mutex_lock(&trace->libtrace_lock);
     
    293353static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
    294354        trace_make_results_packets_safe(trace);
    295         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     355        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    296356        thread_change_state(trace, t, THREAD_PAUSED, false);
    297357        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
    298                 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
     358                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
    299359        }
    300360        thread_change_state(trace, t, THREAD_RUNNING, false);
    301         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    302 }
     361        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     362}
     363
     364#define PACKETQUEUES 10
    303365
    304366/**
     
    309371        libtrace_thread_t * t;
    310372        libtrace_message_t message = {0};
    311         libtrace_packet_t *packet = NULL;
    312 
     373        libtrace_packet_t *packets[PACKETQUEUES] = {NULL};
     374        size_t nb_packets;
     375        size_t i;
    313376
    314377        // Force this thread to wait until trace_pstart has been completed
    315         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     378        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    316379        t = get_thread_table(trace);
    317380        assert(t);
     
    320383                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
    321384        }
    322         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     385        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    323386
    324387        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
     
    333396
    334397        for (;;) {
    335                 int psize;
    336398
    337399                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
     
    346408                                                // The hasher has stopped by this point, so the queue shouldn't be filling
    347409                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    348                                                         psize = trace_pread_packet(trace, t, &packet);
    349                                                         if (psize > 0) {
    350                                                                 packet = (*trace->per_pkt)(trace, packet, NULL, t);
     410                                                        nb_packets = trace_pread_packet(trace, t, packets, 1);
     411                                                        if (nb_packets == 1) {
     412                                                                if (packets[0]->error > 0)
     413                                                                        packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t);
    351414                                                        } else {
    352                                                                 fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
     415                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", packets[0]->error, libtrace_ringbuffer_is_empty(&t->rbuffer));
    353416                                                        }
    354417                                                }
     
    369432
    370433                if (trace->perpkt_thread_count == 1) {
    371                         if (!packet) {
    372                                 if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
    373                                         packet = trace_create_packet();
     434                        if (!packets[0]) {
     435                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1);
    374436                        }
    375                         assert(packet);
    376                         if ((psize = trace_read_packet(trace, packet)) <1) {
    377                                 break;
     437                        assert(packets[0]);
     438                        packets[0]->error = trace_read_packet(trace, packets[0]);
     439                        nb_packets = 1;
     440                } else {
     441                        nb_packets = trace_pread_packet(trace, t, packets, PACKETQUEUES);
     442                }
     443                // Loop through the packets we just read
     444                for (i = 0; i < nb_packets; ++i) {
     445                       
     446                        if (packets[i]->error > 0) {
     447                                packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
     448                        } else if (packets[i]->error != -2) {
     449                                // An error this should be the last packet we read
     450                                size_t z;
     451                                for (z = i ; z < nb_packets; ++z)
     452                                        fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[i]->error);
     453                                assert (i == nb_packets-1);
     454                                goto stop;
    378455                        }
    379                 } else {
    380                         psize = trace_pread_packet(trace, t, &packet);
    381                 }
    382 
    383                 if (psize > 0) {
    384                         packet = (*trace->per_pkt)(trace, packet, NULL, t);
    385                         continue;
    386                 }
    387 
    388                 if (psize == -2)
    389                         continue; // We have a message
    390 
    391                 if (psize < 1) { // consider sending a message
    392                         break;
    393                 }
    394 
     456                        // -2 is a message its not worth checking now just finish this lot and we'll check
     457                        // when we loop next
     458                }
    395459        }
    396460
     
    404468        (*trace->per_pkt)(trace, NULL, &message, t);
    405469
    406         // Free our last packet
    407         if (packet)
    408                 trace_destroy_packet(packet);
     470        // Free any remaining packets
     471        for (i = 0; i < PACKETQUEUES; i++) {
     472                if (packets[i]) {
     473                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
     474                        packets[i] = NULL;
     475                }
     476        }
    409477
    410478       
     
    416484        trace_send_message_to_reducer(trace, &message);
    417485
    418         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     486        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    419487        if (trace->format->punregister_thread) {
    420488                trace->format->punregister_thread(trace, t);
    421489        }
    422         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     490        print_memory_stats();
     491
     492        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    423493
    424494        pthread_exit(NULL);
     
    439509        assert(trace_has_dedicated_hasher(trace));
    440510        /* Wait until all threads are started and objects are initialised (ring buffers) */
    441         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     511        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    442512        t = &trace->hasher_thread;
    443513        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
     
    446516                trace->format->pregister_thread(trace, t, true);
    447517        }
    448         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     518        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    449519        int pkt_skipped = 0;
    450520        /* Read all packets in then hash and queue against the correct thread */
    451521        while (1) {
    452522                int thread;
    453                 if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
    454                         packet = trace_create_packet();
     523                if (!pkt_skipped)
     524                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
    455525                assert(packet);
    456526
     
    462532                        switch(message.code) {
    463533                                case MESSAGE_DO_PAUSE:
    464                                         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     534                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    465535                                        thread_change_state(trace, t, THREAD_PAUSED, false);
    466536                                        pthread_cond_broadcast(&trace->perpkt_cond);
    467537                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
    468                                                 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
     538                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
    469539                                        }
    470540                                        thread_change_state(trace, t, THREAD_RUNNING, false);
    471541                                        pthread_cond_broadcast(&trace->perpkt_cond);
    472                                         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     542                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    473543                                        break;
    474544                                case MESSAGE_DO_STOP:
     
    506576                        bcast = packet;
    507577                } else {
    508                         bcast = trace_create_packet();
     578                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
    509579                        bcast->error = packet->error;
    510580                }
    511                 assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     581                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    512582                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
    513                         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     583                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    514584                        // Unlock early otherwise we could deadlock
    515585                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
    516586                } else {
    517                         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     587                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    518588                }
    519589        }
     
    526596        message.additional.uint64 = 0;
    527597        trace_send_message_to_reducer(trace, &message);
    528         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     598        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    529599        if (trace->format->punregister_thread) {
    530600                trace->format->punregister_thread(trace, t);
    531601        }
    532         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     602        print_memory_stats();
     603        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    533604
    534605        // TODO remove from TTABLE t sometime
     
    553624}
    554625
     626/**
     627 * @brief Move NULLs to the end of an array.
     628 * @param values
     629 * @param len
     630 * @return The location the first NULL, aka the number of non NULL elements
     631 */
     632static inline size_t move_nulls_back(void *arr[], size_t len) {
     633        size_t fr=0, en = len-1;
     634        // Shift all non NULL elements to the front of the array, and NULLs to the
     635        // end, traverses every element at most once
     636        for (;fr < en; ++fr) {
     637                if (arr[fr] == NULL) {
     638                        for (;en > fr; --en) {
     639                                if(arr[en]) {
     640                                        arr[fr] = arr[en];
     641                                        arr[en] = NULL;
     642                                        break;
     643                                }
     644                        }
     645                }
     646        }
     647        // This is the index of the first NULL
     648        en = MIN(fr, en);
     649        // Or the end of the array if this special case
     650        if (arr[en])
     651                en++;
     652        return en;
     653}
     654
     655/** returns the number of packets successfully allocated in the final array
     656 these will all be at the front of the array */
     657inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
     658        size_t nb;
     659        nb = move_nulls_back((void **) packets, nb_packets);
     660        mem_hits.read.recycled += nb;
     661        nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb);
     662        assert(nb_packets == nb);
     663        return nb;
     664}
     665
     666
     667inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
     668        size_t nb;
     669        nb = move_nulls_back((void **) packets, nb_packets);
     670        mem_hits.write.recycled += nb_packets - nb;
     671        nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb);
     672        memset(packets, 0, nb); // XXX make better, maybe do this in ocache??
     673        return nb;
     674}
     675
    555676/* Our simplest case when a thread becomes ready it can obtain an exclusive
    556  * lock to read a packet from the underlying trace.
    557  */
    558 inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    559 {
    560         // We need this to fill the 'first' packet table
    561         if (!*packet) {
    562                 if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
    563                         *packet = trace_create_packet();
    564         }
    565         assert(*packet);
    566         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    567         /* Read a packet */
    568         (*packet)->error = trace_read_packet(libtrace, *packet);
    569         // Doing this inside the lock ensures the first packet is always
    570         // recorded first
    571         if ((*packet)->error > 0)
    572                 store_first_packet(libtrace, *packet, t);
    573 
    574         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    575         return (*packet)->error;
     677 * lock to read packets from the underlying trace.
     678 */
     679inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
     680{
     681        size_t i = 0;
     682
     683        nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets);
     684
     685        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     686        /* Read nb_packets */
     687        for (i = 0; i < nb_packets; ++i) {
     688                packets[i]->error = trace_read_packet(libtrace, packets[i]);
     689                // Doing this inside the lock ensures the first packet is always
     690                // recorded first
     691                if (packets[i]->error <= 0) {
     692                        ++i;
     693                        break;
     694                }
     695        }
     696        if (packets[0]->error > 0) {
     697                store_first_packet(libtrace, packets[0], t);
     698        }
     699        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     700        return i;
    576701}
    577702
     
    581706 * 2. Move that into the packet provided (packet)
    582707 */
    583 inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    584 {
    585         if (*packet) // Recycle the old get the new
    586                 if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
    587                         trace_destroy_packet(*packet);
    588         *packet = libtrace_ringbuffer_read(&t->rbuffer);
    589 
    590         if (*packet) {
     708inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets)
     709{
     710        size_t i;
     711
     712        // Always grab at least one
     713        if (packets[0]) // Recycle the old get the new
     714                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
     715        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
     716
     717
     718        if (packets[0] == NULL) {
     719                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packets, 1, 1);
     720                packets[0]->error = -2;
     721        }
     722
     723        if (packets[0]->error < 0)
     724                return 1;
     725
     726        for (i = 1; i < nb_packets; i++) {
     727                if (packets[i]) // Recycle the old get the new
     728                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
     729                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
     730                        packets[i] = NULL;
     731                        break;
     732                }
     733                // Message wating
     734                if (packets[i] == NULL) {
     735                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
     736                        packets[i]->error = -2;
     737                        ++i;
     738                        break;
     739                }
     740        }
     741       
     742        return i;
     743        /*if (*packet) {
    591744                return (*packet)->error;
    592745        } else {
     
    597750                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
    598751                return -2;
    599         }
     752        }*/
    600753}
    601754
     
    613766
    614767                if (*packet) // Recycle the old get the new
    615                         if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
    616                                 trace_destroy_packet(*packet);
     768                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
    617769                *packet = retrived_packet;
    618770                *ret = (*packet)->error;
     
    635787        do {
    636788                // Wait for a thread to end
    637                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     789                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    638790
    639791                // Check before
    640792                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
    641793                        complete = true;
    642                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     794                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    643795                        continue;
    644796                }
    645797
    646                 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
     798                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
    647799
    648800                // Check after
    649801                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
    650802                        complete = true;
    651                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     803                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    652804                        continue;
    653805                }
    654806
    655                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     807                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    656808
    657809                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
     
    696848                        return ret;
    697849                // Can still block here if another thread is writing to a full queue
    698                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     850                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    699851
    700852                // Its impossible for our own queue to overfill, because no one can write
    701853                // when we are in the lock
    702854                if(try_waiting_queue(libtrace, t, packet, &ret)) {
    703                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     855                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    704856                        return ret;
    705857                }
     
    708860                if (libtrace->perpkt_queue_full) {
    709861                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    710                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     862                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    711863                        continue;
    712864                }
    713865
    714                 if (!*packet) {
    715                         if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
    716                                 *packet = trace_create_packet();
    717                 }
     866                if (!*packet)
     867                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
    718868                assert(*packet);
    719869
    720870                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
    721871                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
    722                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     872                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    723873                        if (libtrace_halt)
    724874                                return 0;
     
    731881                if (thread == t->perpkt_num) {
    732882                        // If it's this thread we must be in order because we checked the buffer once we got the lock
    733                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     883                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    734884                        return (*packet)->error;
    735885                }
     
    738888                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
    739889                                libtrace->perpkt_queue_full = true;
    740                                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     890                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    741891                                contention_stats[t->perpkt_num].full_queue_hits++;
    742                                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     892                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    743893                        }
    744894                        *packet = NULL;
     
    748898                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
    749899                }
    750                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     900                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    751901        }
    752902}
     
    779929                // We limit the number of packets we get to the size of the sliding window
    780930                // such that it is impossible for any given thread to fail to store a packet
    781                 assert(sem_wait(&libtrace->sem) == 0);
     931                ASSERT_RET(sem_wait(&libtrace->sem), == 0);
    782932                /*~~~~Single threaded read of a packet~~~~*/
    783                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     933                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    784934
    785935                /* Re-check our queue things we might have data waiting */
    786936                if(try_waiting_queue(libtrace, t, packet, &ret)) {
    787                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    788                         assert(sem_post(&libtrace->sem) == 0);
     937                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     938                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    789939                        return ret;
    790940                }
     
    792942                // TODO put on *proper* condition variable
    793943                if (libtrace->perpkt_queue_full) {
    794                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    795                         assert(sem_post(&libtrace->sem) == 0);
     944                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     945                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    796946                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    797947                        continue;
    798948                }
    799949
    800                 if (!*packet) {
    801                         if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
    802                                 *packet = trace_create_packet();
    803                 }
     950                if (!*packet)
     951                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
    804952                assert(*packet);
    805953
    806954                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
    807                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    808                         assert(sem_post(&libtrace->sem) == 0);
     955                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     956                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    809957                        // Finish this thread ensuring that any data written later by another thread is retrieved also
    810958                        if (libtrace_halt)
     
    813961                                return trace_finish_perpkt(libtrace, packet, t);
    814962                }
    815                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     963                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    816964
    817965                /* ~~~~Multiple threads can run the hasher~~~~ */
     
    819967
    820968                /* Yes this is correct opposite read lock for a write operation */
    821                 assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
     969                ASSERT_RET(pthread_rwlock_rdlock(&libtrace->window_lock), == 0);
    822970                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
    823971                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
    824                 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     972                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    825973                *packet = NULL;
    826974
    827975                // Always try read any data from the sliding window
    828976                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
    829                         assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
     977                        ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
    830978                        if (libtrace->perpkt_queue_full) {
    831979                                // I might be the holdup in which case if I can read my queue I should do that and return
    832980                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
    833                                         assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     981                                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    834982                                        return ret;
    835983                                }
    836                                 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     984                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    837985                                continue;
    838986                        }
     
    851999                                                                // We must be able to write this now 100% without fail
    8521000                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
    853                                                                 assert(sem_post(&libtrace->sem) == 0);
    854                                                                 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     1001                                                                ASSERT_RET(sem_post(&libtrace->sem), == 0);
     1002                                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    8551003                                                                return ret;
    8561004                                                        } else {
     
    8601008                                                // Not us we have to give the other threads a chance to write there packets then
    8611009                                                libtrace->perpkt_queue_full = true;
    862                                                 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     1010                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    8631011                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    864                                                         assert(sem_post(&libtrace->sem) == 0);
     1012                                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    8651013
    8661014                                                contention_stats[t->perpkt_num].full_queue_hits++;
    867                                                 assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
     1015                                                ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
    8681016                                                // Grab these back
    8691017                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    870                                                         assert(sem_wait(&libtrace->sem) == 0);
     1018                                                        ASSERT_RET(sem_wait(&libtrace->sem), == 0);
    8711019                                                libtrace->perpkt_queue_full = false;
    8721020                                        }
    873                                         assert(sem_post(&libtrace->sem) == 0);
     1021                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    8741022                                        *packet = NULL;
    8751023                                } else {
     
    8791027                                }
    8801028                        }
    881                         assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     1029                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    8821030                }
    8831031                // Now we go back to checking our queue anyways
     
    9021050                gettimeofday(&tv, NULL);
    9031051                dup = trace_copy_packet(packet);
    904                 assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
     1052                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
    9051053                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
    9061054                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
     
    9181066                                libtrace->first_packets.first = t->perpkt_num;
    9191067                }
    920                 assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
     1068                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
    9211069                libtrace_message_t mesg = {0};
    9221070                mesg.code = MESSAGE_FIRST_PACKET;
     
    9361084{
    9371085        int ret = 0;
    938         assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
     1086        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
    9391087        if (libtrace->first_packets.count) {
    9401088                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
     
    9561104                *tv = NULL;
    9571105        }
    958         assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
     1106        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
    9591107        return ret;
    9601108}
     
    11101258
    11111259/**
    1112  * Read a packet from the parallel trace
    1113  */
    1114 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    1115 {
    1116         int ret;
    1117 
    1118         // Cleanup the packet passed back
    1119         if (*packet)
    1120                 trace_fin_packet(*packet);
     1260 * Read packets from the parallel trace
     1261 * @return the number of packets read, null packets indicate messages. Check packet->error before
     1262 * assuming a packet is valid.
     1263 */
     1264static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
     1265{
     1266        size_t ret;
     1267        size_t i;
     1268        assert(nb_packets);
     1269
     1270        for (i = 0; i < nb_packets; i++) {
     1271                // Cleanup the packet passed back
     1272                if (packets[i])
     1273                        trace_fin_packet(packets[i]);
     1274        }
    11211275
    11221276        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    1123                 if (!*packet)
    1124                         *packet = trace_create_packet();
    1125                 ret = trace_pread_packet_wrapper(libtrace, t, *packet);
     1277                if (!packets[0])
     1278                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **)packets, 1, 1);
     1279                packets[0]->error = trace_pread_packet_wrapper(libtrace, t, *packets);
     1280                ret = 1;
    11261281        } else if (trace_has_dedicated_hasher(libtrace)) {
    1127                 ret = trace_pread_packet_hasher_thread(libtrace, t, packet);
     1282                ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);
    11281283        } else if (!trace_has_dedicated_hasher(libtrace)) {
    11291284                /* We don't care about which core a packet goes to */
    1130                 ret = trace_pread_packet_first_in_first_served(libtrace, t, packet);
     1285                ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets);
    11311286        } /* else {
    11321287                ret = trace_pread_packet_hash_locked(libtrace, packet);
     
    11351290        // Formats can also optionally do this internally to ensure the first
    11361291        // packet is always reported correctly
    1137         if (ret > 0) {
    1138                 store_first_packet(libtrace, *packet, t);
     1292        assert(ret);
     1293        assert(ret <= nb_packets);
     1294        if (packets[0]->error > 0) {
     1295                store_first_packet(libtrace, packets[0], t);
    11391296                if (libtrace->tracetime)
    1140                         delay_tracetime(libtrace, *packet, t);
     1297                        delay_tracetime(libtrace, packets[0], t);
    11411298        }
    11421299
     
    11491306static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
    11501307        int i;
    1151 
     1308        char name[16];
    11521309        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    11531310                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
    1154                 assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
     1311                ASSERT_RET(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace), == 0);
     1312                snprintf(name, 16, "perpkt-%d", i);
     1313                pthread_setname_np(t->tid, name);
    11551314        }
    11561315        return libtrace->perpkt_thread_count;
     
    11681327{
    11691328        int i;
     1329        char name[16];
    11701330        sigset_t sig_before, sig_block_all;
    11711331        assert(libtrace);
     
    11771337        if (libtrace->state != STATE_NEW) {
    11781338                int err = 0;
    1179                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1339                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    11801340                if (libtrace->state != STATE_PAUSED) {
    11811341                        trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
    11821342                                "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
    1183                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1343                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    11841344                        return -1;
    11851345                }
     
    12071367                        libtrace_change_state(libtrace, STATE_RUNNING, false);
    12081368                }
    1209                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1369                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    12101370                return err;
    12111371        }
     
    12191379        libtrace->reducer = reducer;
    12201380
    1221         assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
    1222         assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
    1223         assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
     1381        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     1382        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
     1383        ASSERT_RET(pthread_rwlock_init(&libtrace->window_lock, NULL), == 0);
    12241384        // Grab the lock
    1225         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1385        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    12261386
    12271387        // Set default buffer sizes
     
    12501410        sigemptyset(&sig_block_all);
    12511411
    1252         assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
     1412        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
    12531413
    12541414        // If we are using a hasher start it
     
    12611421                t->state = THREAD_RUNNING;
    12621422                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    1263                 assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
     1423                ASSERT_RET(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace), == 0);
     1424                snprintf(name, sizeof(name), "hasher-thread");
     1425                pthread_setname_np(t->tid, name);
    12641426        } else {
    12651427                libtrace->hasher_thread.type = THREAD_EMPTY;
    12661428        }
    1267         libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
     1429        //libtrace_ocache_init(&libtrace->packet_freelist, trace_create_packet, trace_destroy_packet, 64, libtrace->packet_freelist_size * 4, true);
     1430        libtrace_ocache_init(&libtrace->packet_freelist,
     1431                                                 (void* (*)()) trace_create_packet,
     1432                                                 (void (*)(void *))trace_destroy_packet,
     1433                                                 64,
     1434                                                 libtrace->packet_freelist_size * 4,
     1435                                                 true);
    12681436        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
    1269         assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
     1437        ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0);
    12701438        // This will be applied to every new thread that starts, i.e. they will block all signals
    12711439        // Lets start a fixed number of reading threads
     
    12811449        libtrace->first_packets.first = 0;
    12821450        libtrace->first_packets.count = 0;
    1283         assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
     1451        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
    12841452        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
    12851453
     
    12971465                t->perpkt_num = i;
    12981466                if (libtrace->hasher)
    1299                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
     1467                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
    13001468                // Depending on the mode vector or deque might be chosen
    13011469                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
     
    13051473                t->tmp_data = NULL;
    13061474                t->recorded_first = false;
    1307                 assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
     1475                ASSERT_RET(pthread_spin_init(&t->tmp_spinlock, 0), == 0);
    13081476                t->tracetime_offset_usec = 0;;
    13091477        }
     
    13261494                libtrace->keepalive_thread.state = THREAD_RUNNING;
    13271495                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
    1328                 assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
     1496                ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0);
    13291497        }
    13301498
     
    13351503
    13361504        // Revert back - Allow signals again
    1337         assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
    1338         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1505        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
     1506        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    13391507
    13401508        if (threads_started < 0)
     
    13681536        t = get_thread_table(libtrace);
    13691537        // Check state from within the lock if we are going to change it
    1370         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1538        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    13711539        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
    13721540                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
    13731541                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
    1374                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1542                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    13751543                return -1;
    13761544        }
    13771545
    13781546        libtrace_change_state(libtrace, STATE_PAUSING, false);
    1379         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1547        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    13801548
    13811549        // Special case handle the hasher thread case
     
    13861554                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
    13871555                // Wait for it to pause
    1388                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1556                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    13891557                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
    1390                         assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
    1391                 }
    1392                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1558                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
     1559                }
     1560                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    13931561        }
    13941562
     
    14251593
    14261594        // Wait for all threads to pause
    1427         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1595        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    14281596        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
    1429                 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
    1430         }
    1431         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1597                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
     1598        }
     1599        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    14321600
    14331601        fprintf(stderr, "Threads have paused\n");
     
    15591727        for (i=0; i< libtrace->perpkt_thread_count; i++) {
    15601728                //printf("Waiting to join with perpkt #%d\n", i);
    1561                 assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
     1729                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
    15621730                //printf("Joined with perpkt #%d\n", i);
    15631731                // So we must do our best effort to empty the queue - so
     
    16071775       
    16081776        libtrace_change_state(libtrace, STATE_JOINED, true);
     1777        print_memory_stats();
    16091778}
    16101779
     
    19772146DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
    19782147        libtrace_packet_t* result;
    1979         if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
    1980                 result = trace_create_packet();
     2148        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
    19812149        assert(result);
    19822150        swap_packets(result, packet); // Move the current packet into our copy
     
    19842152}
    19852153
    1986 DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
     2154DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    19872155        // Try write back the packet
    19882156        assert(packet);
    19892157        // Always release any resources this might be holding such as a slot in a ringbuffer
    19902158        trace_fin_packet(packet);
    1991         if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
    1992                 /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
    1993                 //assert(1 == 90);
    1994                 trace_destroy_packet(packet);
    1995         }
     2159        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
    19962160}
    19972161
  • test/test-datastruct-ringbuffer.c

    r19135af ra49a9eb  
    2626}
    2727
     28static void * producer_bulk(void * a) {
     29        libtrace_ringbuffer_t * rb = (libtrace_ringbuffer_t *) a;
     30        char * i;
     31        for (i = NULL; i < TEST_SIZE; i++) {
     32                assert(libtrace_ringbuffer_write_bulk(rb, (void **) &i, 1, 1) == 1);
     33        }
     34        return 0;
     35}
     36
     37static void * consumer_bulk(void * a) {
     38        libtrace_ringbuffer_t * rb = (libtrace_ringbuffer_t *) a;
     39        char *i;
     40        void *value;
     41        for (i = NULL; i < TEST_SIZE; i++) {
     42                assert (libtrace_ringbuffer_read_bulk(rb, &value, 1, 1) == 1);
     43                assert(value == i);
     44        }
     45        return 0;
     46}
     47
     48
    2849/**
    2950 * Tests the ringbuffer data structure, first this establishes that single
     
    3960
    4061        libtrace_ringbuffer_init(&rb_block, (size_t) RINGBUFFER_SIZE, LIBTRACE_RINGBUFFER_BLOCKING);
    41         libtrace_ringbuffer_init(&rb_polling, (size_t) RINGBUFFER_SIZE, LIBTRACE_RINGBUFFER_BLOCKING);
     62        libtrace_ringbuffer_init(&rb_polling, (size_t) RINGBUFFER_SIZE, LIBTRACE_RINGBUFFER_POLLING);
    4263        assert(libtrace_ringbuffer_is_empty(&rb_block));
    4364        assert(libtrace_ringbuffer_is_empty(&rb_polling));
     
    105126        assert(libtrace_ringbuffer_is_empty(&rb_polling));
    106127
     128        pthread_create(&t[0], NULL, &producer_bulk, (void *) &rb_block);
     129        pthread_create(&t[1], NULL, &consumer_bulk, (void *) &rb_block);
     130        pthread_join(t[0], NULL);
     131        pthread_join(t[1], NULL);
     132        assert(libtrace_ringbuffer_is_empty(&rb_block));
     133
     134        pthread_create(&t[0], NULL, &producer_bulk, (void *) &rb_polling);
     135        pthread_create(&t[1], NULL, &consumer_bulk, (void *) &rb_polling);
     136        pthread_join(t[0], NULL);
     137        pthread_join(t[1], NULL);
     138        assert(libtrace_ringbuffer_is_empty(&rb_polling));
     139
    107140        return 0;
    108141}
Note: See TracChangeset for help on using the changeset viewer.