Ignore:
Timestamp:
07/30/14 18:44:16 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
be3f75b
Parents:
41148f2
Message:

Add an object cache with thread local caches
All packets used by a trace are put through this.
Adds bulk read/write operations to the ringbuffer (used by the object cache)
Replace semaphores with condition variables to support these bulk operations.
Internally use bulk read operations from a single threaded formats to reduce lock overhead.
Replaces the asserts around pthread_* functions with a version that will still run the command if NDEBUG

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r049a700 ra49a9eb  
    103103#define VERBOSE_DEBBUGING 0
    104104
     105
     106static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
     107
    105108extern int libtrace_parallel;
    106109
     
    109112        uint64_t wait_for_fill_complete_hits;
    110113} contention_stats[1024];
     114
     115struct mem_stats {
     116        struct memfail {
     117           uint64_t cache_hit;
     118           uint64_t ring_hit;
     119           uint64_t miss;
     120           uint64_t recycled;
     121        } readbulk, read, write, writebulk;
     122};
     123
     124// Grrr gcc wants this spelt out
     125__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
     126
     127static void print_memory_stats() {
     128        char t_name[50];
     129        uint64_t total;
     130        pthread_getname_np(pthread_self(), t_name, sizeof(t_name));
     131
     132        fprintf(stderr, "Thread ID#%d - %s\n", (int) pthread_self(), t_name);
     133
     134        total = mem_hits.read.cache_hit + mem_hits.read.ring_hit + mem_hits.read.miss;
     135        if (total) {
     136                fprintf(stderr, "\tRead:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
     137                                mem_hits.read.cache_hit, mem_hits.read.ring_hit, mem_hits.read.miss, mem_hits.read.recycled);
     138                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
     139                                total, (double) mem_hits.read.miss / (double) total * 100.0);
     140        }
     141
     142        total = mem_hits.readbulk.cache_hit + mem_hits.readbulk.ring_hit + mem_hits.readbulk.miss;
     143        if (total) {
     144                fprintf(stderr, "\tReadbulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
     145                                mem_hits.readbulk.cache_hit, mem_hits.readbulk.ring_hit, mem_hits.readbulk.miss, mem_hits.readbulk.recycled);
     146
     147
     148                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
     149                                total, (double) mem_hits.readbulk.miss / (double) total * 100.0);
     150        }
     151
     152        total = mem_hits.write.cache_hit + mem_hits.write.ring_hit + mem_hits.write.miss;
     153        if (total) {
     154                fprintf(stderr, "\tWrite:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
     155                                mem_hits.write.cache_hit, mem_hits.write.ring_hit, mem_hits.write.miss, mem_hits.write.recycled);
     156
     157                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
     158                                total, (double) mem_hits.write.miss / (double) total * 100.0);
     159        }
     160
     161        total = mem_hits.writebulk.cache_hit + mem_hits.writebulk.ring_hit + mem_hits.writebulk.miss;
     162        if (total) {
     163                fprintf(stderr, "\tWritebulk:\n\t---CHits=%"PRIu64"\n\t---RHits=%"PRIu64"\n\t---Misses=%"PRIu64"\n\t---Recycled=%"PRIu64"\n",
     164                                mem_hits.writebulk.cache_hit, mem_hits.writebulk.ring_hit, mem_hits.writebulk.miss, mem_hits.writebulk.recycled);
     165
     166                fprintf(stderr, "\t---Total=%"PRIu64"\n\t---Miss %%=%f\n",
     167                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
     168        }
     169
     170}
    111171
    112172/**
     
    165225        const enum trace_state new_state, const bool need_lock)
    166226{
    167         enum trace_state prev_state;
     227        UNUSED enum trace_state prev_state;
    168228        if (need_lock)
    169229                pthread_mutex_lock(&trace->libtrace_lock);
     
    293353static void trace_thread_pause(libtrace_t *trace, libtrace_thread_t *t) {
    294354        trace_make_results_packets_safe(trace);
    295         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     355        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    296356        thread_change_state(trace, t, THREAD_PAUSED, false);
    297357        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
    298                 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
     358                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
    299359        }
    300360        thread_change_state(trace, t, THREAD_RUNNING, false);
    301         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
    302 }
     361        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
     362}
     363
     364#define PACKETQUEUES 10
    303365
    304366/**
     
    309371        libtrace_thread_t * t;
    310372        libtrace_message_t message = {0};
    311         libtrace_packet_t *packet = NULL;
    312 
     373        libtrace_packet_t *packets[PACKETQUEUES] = {NULL};
     374        size_t nb_packets;
     375        size_t i;
    313376
    314377        // Force this thread to wait until trace_pstart has been completed
    315         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     378        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    316379        t = get_thread_table(trace);
    317380        assert(t);
     
    320383                trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
    321384        }
    322         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     385        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    323386
    324387        /* ~~~~~~~~~~~ Setup complete now we loop ~~~~~~~~~~~~~~~ */
     
    333396
    334397        for (;;) {
    335                 int psize;
    336398
    337399                if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) {
     
    346408                                                // The hasher has stopped by this point, so the queue shouldn't be filling
    347409                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    348                                                         psize = trace_pread_packet(trace, t, &packet);
    349                                                         if (psize > 0) {
    350                                                                 packet = (*trace->per_pkt)(trace, packet, NULL, t);
     410                                                        nb_packets = trace_pread_packet(trace, t, packets, 1);
     411                                                        if (nb_packets == 1) {
     412                                                                if (packets[0]->error > 0)
     413                                                                        packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t);
    351414                                                        } else {
    352                                                                 fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer));
     415                                                                fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", packets[0]->error, libtrace_ringbuffer_is_empty(&t->rbuffer));
    353416                                                        }
    354417                                                }
     
    369432
    370433                if (trace->perpkt_thread_count == 1) {
    371                         if (!packet) {
    372                                 if (!libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
    373                                         packet = trace_create_packet();
     434                        if (!packets[0]) {
     435                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packets[0], 1, 1);
    374436                        }
    375                         assert(packet);
    376                         if ((psize = trace_read_packet(trace, packet)) <1) {
    377                                 break;
     437                        assert(packets[0]);
     438                        packets[0]->error = trace_read_packet(trace, packets[0]);
     439                        nb_packets = 1;
     440                } else {
     441                        nb_packets = trace_pread_packet(trace, t, packets, PACKETQUEUES);
     442                }
     443                // Loop through the packets we just read
     444                for (i = 0; i < nb_packets; ++i) {
     445                       
     446                        if (packets[i]->error > 0) {
     447                                packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
     448                        } else if (packets[i]->error != -2) {
     449                                // An error this should be the last packet we read
     450                                size_t z;
     451                                for (z = i ; z < nb_packets; ++z)
     452                                        fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[i]->error);
     453                                assert (i == nb_packets-1);
     454                                goto stop;
    378455                        }
    379                 } else {
    380                         psize = trace_pread_packet(trace, t, &packet);
    381                 }
    382 
    383                 if (psize > 0) {
    384                         packet = (*trace->per_pkt)(trace, packet, NULL, t);
    385                         continue;
    386                 }
    387 
    388                 if (psize == -2)
    389                         continue; // We have a message
    390 
    391                 if (psize < 1) { // consider sending a message
    392                         break;
    393                 }
    394 
     456                        // -2 is a message its not worth checking now just finish this lot and we'll check
     457                        // when we loop next
     458                }
    395459        }
    396460
     
    404468        (*trace->per_pkt)(trace, NULL, &message, t);
    405469
    406         // Free our last packet
    407         if (packet)
    408                 trace_destroy_packet(packet);
     470        // Free any remaining packets
     471        for (i = 0; i < PACKETQUEUES; i++) {
     472                if (packets[i]) {
     473                        libtrace_ocache_free(&trace->packet_freelist, (void **) &packets[i], 1, 1);
     474                        packets[i] = NULL;
     475                }
     476        }
    409477
    410478       
     
    416484        trace_send_message_to_reducer(trace, &message);
    417485
    418         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     486        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    419487        if (trace->format->punregister_thread) {
    420488                trace->format->punregister_thread(trace, t);
    421489        }
    422         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     490        print_memory_stats();
     491
     492        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    423493
    424494        pthread_exit(NULL);
     
    439509        assert(trace_has_dedicated_hasher(trace));
    440510        /* Wait until all threads are started and objects are initialised (ring buffers) */
    441         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     511        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    442512        t = &trace->hasher_thread;
    443513        assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));
     
    446516                trace->format->pregister_thread(trace, t, true);
    447517        }
    448         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     518        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    449519        int pkt_skipped = 0;
    450520        /* Read all packets in then hash and queue against the correct thread */
    451521        while (1) {
    452522                int thread;
    453                 if (!pkt_skipped && !libtrace_ringbuffer_try_sread_bl(&trace->packet_freelist, (void **) &packet))
    454                         packet = trace_create_packet();
     523                if (!pkt_skipped)
     524                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
    455525                assert(packet);
    456526
     
    462532                        switch(message.code) {
    463533                                case MESSAGE_DO_PAUSE:
    464                                         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     534                                        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    465535                                        thread_change_state(trace, t, THREAD_PAUSED, false);
    466536                                        pthread_cond_broadcast(&trace->perpkt_cond);
    467537                                        while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) {
    468                                                 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0);
     538                                                ASSERT_RET(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock), == 0);
    469539                                        }
    470540                                        thread_change_state(trace, t, THREAD_RUNNING, false);
    471541                                        pthread_cond_broadcast(&trace->perpkt_cond);
    472                                         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     542                                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    473543                                        break;
    474544                                case MESSAGE_DO_STOP:
     
    506576                        bcast = packet;
    507577                } else {
    508                         bcast = trace_create_packet();
     578                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &bcast, 1, 1);
    509579                        bcast->error = packet->error;
    510580                }
    511                 assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     581                ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    512582                if (trace->perpkt_threads[i].state != THREAD_FINISHED) {
    513                         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     583                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    514584                        // Unlock early otherwise we could deadlock
    515585                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast);
    516586                } else {
    517                         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     587                        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    518588                }
    519589        }
     
    526596        message.additional.uint64 = 0;
    527597        trace_send_message_to_reducer(trace, &message);
    528         assert(pthread_mutex_lock(&trace->libtrace_lock) == 0);
     598        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    529599        if (trace->format->punregister_thread) {
    530600                trace->format->punregister_thread(trace, t);
    531601        }
    532         assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0);
     602        print_memory_stats();
     603        ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0);
    533604
    534605        // TODO remove from TTABLE t sometime
     
    553624}
    554625
     626/**
     627 * @brief Move NULLs to the end of an array.
     628 * @param values
     629 * @param len
     630 * @return The location the first NULL, aka the number of non NULL elements
     631 */
     632static inline size_t move_nulls_back(void *arr[], size_t len) {
     633        size_t fr=0, en = len-1;
     634        // Shift all non NULL elements to the front of the array, and NULLs to the
     635        // end, traverses every element at most once
     636        for (;fr < en; ++fr) {
     637                if (arr[fr] == NULL) {
     638                        for (;en > fr; --en) {
     639                                if(arr[en]) {
     640                                        arr[fr] = arr[en];
     641                                        arr[en] = NULL;
     642                                        break;
     643                                }
     644                        }
     645                }
     646        }
     647        // This is the index of the first NULL
     648        en = MIN(fr, en);
     649        // Or the end of the array if this special case
     650        if (arr[en])
     651                en++;
     652        return en;
     653}
     654
     655/** returns the number of packets successfully allocated in the final array
     656 these will all be at the front of the array */
     657inline static size_t fill_array_with_empty_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
     658        size_t nb;
     659        nb = move_nulls_back((void **) packets, nb_packets);
     660        mem_hits.read.recycled += nb;
     661        nb += libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[nb], nb_packets - nb, nb_packets - nb);
     662        assert(nb_packets == nb);
     663        return nb;
     664}
     665
     666
     667inline static size_t empty_array_of_packets(libtrace_t *libtrace, libtrace_packet_t *packets[], size_t nb_packets) {
     668        size_t nb;
     669        nb = move_nulls_back((void **) packets, nb_packets);
     670        mem_hits.write.recycled += nb_packets - nb;
     671        nb += nb_packets - libtrace_ocache_free(&libtrace->packet_freelist, (void **)packets, nb, nb);
     672        memset(packets, 0, nb); // XXX make better, maybe do this in ocache??
     673        return nb;
     674}
     675
    555676/* Our simplest case when a thread becomes ready it can obtain an exclusive
    556  * lock to read a packet from the underlying trace.
    557  */
    558 inline static int trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    559 {
    560         // We need this to fill the 'first' packet table
    561         if (!*packet) {
    562                 if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
    563                         *packet = trace_create_packet();
    564         }
    565         assert(*packet);
    566         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
    567         /* Read a packet */
    568         (*packet)->error = trace_read_packet(libtrace, *packet);
    569         // Doing this inside the lock ensures the first packet is always
    570         // recorded first
    571         if ((*packet)->error > 0)
    572                 store_first_packet(libtrace, *packet, t);
    573 
    574         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    575         return (*packet)->error;
     677 * lock to read packets from the underlying trace.
     678 */
     679inline static size_t trace_pread_packet_first_in_first_served(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
     680{
     681        size_t i = 0;
     682
     683        nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets);
     684
     685        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     686        /* Read nb_packets */
     687        for (i = 0; i < nb_packets; ++i) {
     688                packets[i]->error = trace_read_packet(libtrace, packets[i]);
     689                // Doing this inside the lock ensures the first packet is always
     690                // recorded first
     691                if (packets[i]->error <= 0) {
     692                        ++i;
     693                        break;
     694                }
     695        }
     696        if (packets[0]->error > 0) {
     697                store_first_packet(libtrace, packets[0], t);
     698        }
     699        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     700        return i;
    576701}
    577702
     
    581706 * 2. Move that into the packet provided (packet)
    582707 */
    583 inline static int trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    584 {
    585         if (*packet) // Recycle the old get the new
    586                 if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
    587                         trace_destroy_packet(*packet);
    588         *packet = libtrace_ringbuffer_read(&t->rbuffer);
    589 
    590         if (*packet) {
     708inline static size_t trace_pread_packet_hasher_thread(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets)
     709{
     710        size_t i;
     711
     712        // Always grab at least one
     713        if (packets[0]) // Recycle the old get the new
     714                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
     715        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
     716
     717
     718        if (packets[0] == NULL) {
     719                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packets, 1, 1);
     720                packets[0]->error = -2;
     721        }
     722
     723        if (packets[0]->error < 0)
     724                return 1;
     725
     726        for (i = 1; i < nb_packets; i++) {
     727                if (packets[i]) // Recycle the old get the new
     728                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
     729                if (!libtrace_ringbuffer_try_read(&t->rbuffer, (void **) &packets[i])) {
     730                        packets[i] = NULL;
     731                        break;
     732                }
     733                // Message wating
     734                if (packets[i] == NULL) {
     735                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
     736                        packets[i]->error = -2;
     737                        ++i;
     738                        break;
     739                }
     740        }
     741       
     742        return i;
     743        /*if (*packet) {
    591744                return (*packet)->error;
    592745        } else {
     
    597750                fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
    598751                return -2;
    599         }
     752        }*/
    600753}
    601754
     
    613766
    614767                if (*packet) // Recycle the old get the new
    615                         if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, (void *) *packet))
    616                                 trace_destroy_packet(*packet);
     768                        libtrace_ocache_free(&libtrace->packet_freelist, (void **) packet, 1, 1);
    617769                *packet = retrived_packet;
    618770                *ret = (*packet)->error;
     
    635787        do {
    636788                // Wait for a thread to end
    637                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     789                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    638790
    639791                // Check before
    640792                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
    641793                        complete = true;
    642                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     794                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    643795                        continue;
    644796                }
    645797
    646                 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
     798                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
    647799
    648800                // Check after
    649801                if (libtrace->perpkt_thread_states[THREAD_FINISHING] == libtrace->perpkt_thread_count) {
    650802                        complete = true;
    651                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     803                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    652804                        continue;
    653805                }
    654806
    655                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     807                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    656808
    657809                // Always trying to keep our buffer empty for the unlikely case more threads than buffer space want to write into our queue
     
    696848                        return ret;
    697849                // Can still block here if another thread is writing to a full queue
    698                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     850                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    699851
    700852                // Its impossible for our own queue to overfill, because no one can write
    701853                // when we are in the lock
    702854                if(try_waiting_queue(libtrace, t, packet, &ret)) {
    703                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     855                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    704856                        return ret;
    705857                }
     
    708860                if (libtrace->perpkt_queue_full) {
    709861                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    710                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     862                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    711863                        continue;
    712864                }
    713865
    714                 if (!*packet) {
    715                         if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
    716                                 *packet = trace_create_packet();
    717                 }
     866                if (!*packet)
     867                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
    718868                assert(*packet);
    719869
    720870                // If we fail here we can guarantee that our queue is empty (and no new data will be added because we hold the lock)
    721871                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
    722                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     872                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    723873                        if (libtrace_halt)
    724874                                return 0;
     
    731881                if (thread == t->perpkt_num) {
    732882                        // If it's this thread we must be in order because we checked the buffer once we got the lock
    733                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     883                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    734884                        return (*packet)->error;
    735885                }
     
    738888                        while (!libtrace_ringbuffer_try_swrite_bl(&libtrace->perpkt_threads[thread].rbuffer, *packet)) {
    739889                                libtrace->perpkt_queue_full = true;
    740                                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     890                                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    741891                                contention_stats[t->perpkt_num].full_queue_hits++;
    742                                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     892                                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    743893                        }
    744894                        *packet = NULL;
     
    748898                        assert (!"packet_hash_locked() The user terminated the trace in a abnormal manner");
    749899                }
    750                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     900                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    751901        }
    752902}
     
    779929                // We limit the number of packets we get to the size of the sliding window
    780930                // such that it is impossible for any given thread to fail to store a packet
    781                 assert(sem_wait(&libtrace->sem) == 0);
     931                ASSERT_RET(sem_wait(&libtrace->sem), == 0);
    782932                /*~~~~Single threaded read of a packet~~~~*/
    783                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     933                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    784934
    785935                /* Re-check our queue things we might have data waiting */
    786936                if(try_waiting_queue(libtrace, t, packet, &ret)) {
    787                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    788                         assert(sem_post(&libtrace->sem) == 0);
     937                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     938                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    789939                        return ret;
    790940                }
     
    792942                // TODO put on *proper* condition variable
    793943                if (libtrace->perpkt_queue_full) {
    794                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    795                         assert(sem_post(&libtrace->sem) == 0);
     944                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     945                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    796946                        contention_stats[t->perpkt_num].wait_for_fill_complete_hits++;
    797947                        continue;
    798948                }
    799949
    800                 if (!*packet) {
    801                         if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) packet))
    802                                 *packet = trace_create_packet();
    803                 }
     950                if (!*packet)
     951                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packet, 1, 1);
    804952                assert(*packet);
    805953
    806954                if (libtrace_halt || ((*packet)->error = trace_read_packet(libtrace, *packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) {
    807                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
    808                         assert(sem_post(&libtrace->sem) == 0);
     955                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     956                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    809957                        // Finish this thread ensuring that any data written later by another thread is retrieved also
    810958                        if (libtrace_halt)
     
    813961                                return trace_finish_perpkt(libtrace, packet, t);
    814962                }
    815                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     963                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    816964
    817965                /* ~~~~Multiple threads can run the hasher~~~~ */
     
    819967
    820968                /* Yes this is correct opposite read lock for a write operation */
    821                 assert(pthread_rwlock_rdlock(&libtrace->window_lock) == 0);
     969                ASSERT_RET(pthread_rwlock_rdlock(&libtrace->window_lock), == 0);
    822970                if (!libtrace_slidingwindow_try_write(&libtrace->sliding_window, trace_packet_get_order(*packet), *packet))
    823971                        assert(!"Semaphore should stop us from ever overfilling the sliding window");
    824                 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     972                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    825973                *packet = NULL;
    826974
    827975                // Always try read any data from the sliding window
    828976                while (libtrace_slidingwindow_read_ready(&libtrace->sliding_window)) {
    829                         assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
     977                        ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
    830978                        if (libtrace->perpkt_queue_full) {
    831979                                // I might be the holdup in which case if I can read my queue I should do that and return
    832980                                if(try_waiting_queue(libtrace, t, packet, &ret)) {
    833                                         assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     981                                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    834982                                        return ret;
    835983                                }
    836                                 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     984                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    837985                                continue;
    838986                        }
     
    851999                                                                // We must be able to write this now 100% without fail
    8521000                                                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[thread].rbuffer, *packet);
    853                                                                 assert(sem_post(&libtrace->sem) == 0);
    854                                                                 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     1001                                                                ASSERT_RET(sem_post(&libtrace->sem), == 0);
     1002                                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    8551003                                                                return ret;
    8561004                                                        } else {
     
    8601008                                                // Not us we have to give the other threads a chance to write there packets then
    8611009                                                libtrace->perpkt_queue_full = true;
    862                                                 assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     1010                                                ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    8631011                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    864                                                         assert(sem_post(&libtrace->sem) == 0);
     1012                                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    8651013
    8661014                                                contention_stats[t->perpkt_num].full_queue_hits++;
    867                                                 assert(pthread_rwlock_wrlock(&libtrace->window_lock) == 0);
     1015                                                ASSERT_RET(pthread_rwlock_wrlock(&libtrace->window_lock), == 0);
    8681016                                                // Grab these back
    8691017                                                for (i = 0; i < libtrace->perpkt_thread_count-1; i++) // Release all other threads to read there packets
    870                                                         assert(sem_wait(&libtrace->sem) == 0);
     1018                                                        ASSERT_RET(sem_wait(&libtrace->sem), == 0);
    8711019                                                libtrace->perpkt_queue_full = false;
    8721020                                        }
    873                                         assert(sem_post(&libtrace->sem) == 0);
     1021                                        ASSERT_RET(sem_post(&libtrace->sem), == 0);
    8741022                                        *packet = NULL;
    8751023                                } else {
     
    8791027                                }
    8801028                        }
    881                         assert(pthread_rwlock_unlock(&libtrace->window_lock) == 0);
     1029                        ASSERT_RET(pthread_rwlock_unlock(&libtrace->window_lock), == 0);
    8821030                }
    8831031                // Now we go back to checking our queue anyways
     
    9021050                gettimeofday(&tv, NULL);
    9031051                dup = trace_copy_packet(packet);
    904                 assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
     1052                ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
    9051053                libtrace->first_packets.packets[t->perpkt_num].packet = dup;
    9061054                //printf("Stored first packet time=%f\n", trace_get_seconds(dup));
     
    9181066                                libtrace->first_packets.first = t->perpkt_num;
    9191067                }
    920                 assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
     1068                ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
    9211069                libtrace_message_t mesg = {0};
    9221070                mesg.code = MESSAGE_FIRST_PACKET;
     
    9361084{
    9371085        int ret = 0;
    938         assert(pthread_spin_lock(&libtrace->first_packets.lock) == 0);
     1086        ASSERT_RET(pthread_spin_lock(&libtrace->first_packets.lock), == 0);
    9391087        if (libtrace->first_packets.count) {
    9401088                *packet = libtrace->first_packets.packets[libtrace->first_packets.first].packet;
     
    9561104                *tv = NULL;
    9571105        }
    958         assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0);
     1106        ASSERT_RET(pthread_spin_unlock(&libtrace->first_packets.lock), == 0);
    9591107        return ret;
    9601108}
     
    11101258
    11111259/**
    1112  * Read a packet from the parallel trace
    1113  */
    1114 DLLEXPORT int trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packet)
    1115 {
    1116         int ret;
    1117 
    1118         // Cleanup the packet passed back
    1119         if (*packet)
    1120                 trace_fin_packet(*packet);
     1260 * Read packets from the parallel trace
     1261 * @return the number of packets read, null packets indicate messages. Check packet->error before
     1262 * assuming a packet is valid.
     1263 */
     1264static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets)
     1265{
     1266        size_t ret;
     1267        size_t i;
     1268        assert(nb_packets);
     1269
     1270        for (i = 0; i < nb_packets; i++) {
     1271                // Cleanup the packet passed back
     1272                if (packets[i])
     1273                        trace_fin_packet(packets[i]);
     1274        }
    11211275
    11221276        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
    1123                 if (!*packet)
    1124                         *packet = trace_create_packet();
    1125                 ret = trace_pread_packet_wrapper(libtrace, t, *packet);
     1277                if (!packets[0])
     1278                        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **)packets, 1, 1);
     1279                packets[0]->error = trace_pread_packet_wrapper(libtrace, t, *packets);
     1280                ret = 1;
    11261281        } else if (trace_has_dedicated_hasher(libtrace)) {
    1127                 ret = trace_pread_packet_hasher_thread(libtrace, t, packet);
     1282                ret = trace_pread_packet_hasher_thread(libtrace, t, packets, nb_packets);
    11281283        } else if (!trace_has_dedicated_hasher(libtrace)) {
    11291284                /* We don't care about which core a packet goes to */
    1130                 ret = trace_pread_packet_first_in_first_served(libtrace, t, packet);
     1285                ret = trace_pread_packet_first_in_first_served(libtrace, t, packets, nb_packets);
    11311286        } /* else {
    11321287                ret = trace_pread_packet_hash_locked(libtrace, packet);
     
    11351290        // Formats can also optionally do this internally to ensure the first
    11361291        // packet is always reported correctly
    1137         if (ret > 0) {
    1138                 store_first_packet(libtrace, *packet, t);
     1292        assert(ret);
     1293        assert(ret <= nb_packets);
     1294        if (packets[0]->error > 0) {
     1295                store_first_packet(libtrace, packets[0], t);
    11391296                if (libtrace->tracetime)
    1140                         delay_tracetime(libtrace, *packet, t);
     1297                        delay_tracetime(libtrace, packets[0], t);
    11411298        }
    11421299
     
    11491306static inline int trace_start_perpkt_threads (libtrace_t *libtrace) {
    11501307        int i;
    1151 
     1308        char name[16];
    11521309        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
    11531310                libtrace_thread_t *t = &libtrace->perpkt_threads[i];
    1154                 assert(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace) == 0);
     1311                ASSERT_RET(pthread_create(&t->tid, NULL, perpkt_threads_entry, (void *) libtrace), == 0);
     1312                snprintf(name, 16, "perpkt-%d", i);
     1313                pthread_setname_np(t->tid, name);
    11551314        }
    11561315        return libtrace->perpkt_thread_count;
     
    11681327{
    11691328        int i;
     1329        char name[16];
    11701330        sigset_t sig_before, sig_block_all;
    11711331        assert(libtrace);
     
    11771337        if (libtrace->state != STATE_NEW) {
    11781338                int err = 0;
    1179                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1339                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    11801340                if (libtrace->state != STATE_PAUSED) {
    11811341                        trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
    11821342                                "The trace(%s) has already been started and is not paused!!", libtrace->uridata);
    1183                         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1343                        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    11841344                        return -1;
    11851345                }
     
    12071367                        libtrace_change_state(libtrace, STATE_RUNNING, false);
    12081368                }
    1209                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1369                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    12101370                return err;
    12111371        }
     
    12191379        libtrace->reducer = reducer;
    12201380
    1221         assert(pthread_mutex_init(&libtrace->libtrace_lock, NULL) == 0);
    1222         assert(pthread_cond_init(&libtrace->perpkt_cond, NULL) == 0);
    1223         assert(pthread_rwlock_init(&libtrace->window_lock, NULL) == 0);
     1381        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     1382        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
     1383        ASSERT_RET(pthread_rwlock_init(&libtrace->window_lock, NULL), == 0);
    12241384        // Grab the lock
    1225         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1385        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    12261386
    12271387        // Set default buffer sizes
     
    12501410        sigemptyset(&sig_block_all);
    12511411
    1252         assert(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) == 0);
     1412        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before), == 0);
    12531413
    12541414        // If we are using a hasher start it
     
    12611421                t->state = THREAD_RUNNING;
    12621422                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    1263                 assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0);
     1423                ASSERT_RET(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace), == 0);
     1424                snprintf(name, sizeof(name), "hasher-thread");
     1425                pthread_setname_np(t->tid, name);
    12641426        } else {
    12651427                libtrace->hasher_thread.type = THREAD_EMPTY;
    12661428        }
    1267         libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING);
     1429        //libtrace_ocache_init(&libtrace->packet_freelist, trace_create_packet, trace_destroy_packet, 64, libtrace->packet_freelist_size * 4, true);
     1430        libtrace_ocache_init(&libtrace->packet_freelist,
     1431                                                 (void* (*)()) trace_create_packet,
     1432                                                 (void (*)(void *))trace_destroy_packet,
     1433                                                 64,
     1434                                                 libtrace->packet_freelist_size * 4,
     1435                                                 true);
    12681436        //libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0);
    1269         assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0);
     1437        ASSERT_RET(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size), == 0);
    12701438        // This will be applied to every new thread that starts, i.e. they will block all signals
    12711439        // Lets start a fixed number of reading threads
     
    12811449        libtrace->first_packets.first = 0;
    12821450        libtrace->first_packets.count = 0;
    1283         assert(pthread_spin_init(&libtrace->first_packets.lock, 0) == 0);
     1451        ASSERT_RET(pthread_spin_init(&libtrace->first_packets.lock, 0), == 0);
    12841452        libtrace->first_packets.packets = calloc(libtrace->perpkt_thread_count, sizeof(struct  __packet_storage_magic_type));
    12851453
     
    12971465                t->perpkt_num = i;
    12981466                if (libtrace->hasher)
    1299                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING);
     1467                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_POLLING);
    13001468                // Depending on the mode vector or deque might be chosen
    13011469                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
     
    13051473                t->tmp_data = NULL;
    13061474                t->recorded_first = false;
    1307                 assert(pthread_spin_init(&t->tmp_spinlock, 0) == 0);
     1475                ASSERT_RET(pthread_spin_init(&t->tmp_spinlock, 0), == 0);
    13081476                t->tracetime_offset_usec = 0;;
    13091477        }
     
    13261494                libtrace->keepalive_thread.state = THREAD_RUNNING;
    13271495                libtrace_message_queue_init(&libtrace->keepalive_thread.messages, sizeof(libtrace_message_t));
    1328                 assert(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace) == 0);
     1496                ASSERT_RET(pthread_create(&libtrace->keepalive_thread.tid, NULL, keepalive_entry, (void *) libtrace), == 0);
    13291497        }
    13301498
     
    13351503
    13361504        // Revert back - Allow signals again
    1337         assert(pthread_sigmask(SIG_SETMASK, &sig_before, NULL) == 0);
    1338         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1505        ASSERT_RET(pthread_sigmask(SIG_SETMASK, &sig_before, NULL), == 0);
     1506        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    13391507
    13401508        if (threads_started < 0)
     
    13681536        t = get_thread_table(libtrace);
    13691537        // Check state from within the lock if we are going to change it
    1370         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1538        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    13711539        if (!libtrace->started || libtrace->state != STATE_RUNNING) {
    13721540                fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);
    13731541                trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()");
    1374                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1542                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    13751543                return -1;
    13761544        }
    13771545
    13781546        libtrace_change_state(libtrace, STATE_PAUSING, false);
    1379         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1547        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    13801548
    13811549        // Special case handle the hasher thread case
     
    13861554                trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message);
    13871555                // Wait for it to pause
    1388                 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1556                ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    13891557                while (libtrace->hasher_thread.state == THREAD_RUNNING) {
    1390                         assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
    1391                 }
    1392                 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1558                        ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
     1559                }
     1560                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    13931561        }
    13941562
     
    14251593
    14261594        // Wait for all threads to pause
    1427         assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0);
     1595        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    14281596        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
    1429                 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0);
    1430         }
    1431         assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0);
     1597                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
     1598        }
     1599        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    14321600
    14331601        fprintf(stderr, "Threads have paused\n");
     
    15591727        for (i=0; i< libtrace->perpkt_thread_count; i++) {
    15601728                //printf("Waiting to join with perpkt #%d\n", i);
    1561                 assert(pthread_join(libtrace->perpkt_threads[i].tid, NULL) == 0);
     1729                ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0);
    15621730                //printf("Joined with perpkt #%d\n", i);
    15631731                // So we must do our best effort to empty the queue - so
     
    16071775       
    16081776        libtrace_change_state(libtrace, STATE_JOINED, true);
     1777        print_memory_stats();
    16091778}
    16101779
     
    19772146DLLEXPORT libtrace_packet_t* trace_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
    19782147        libtrace_packet_t* result;
    1979         if (!libtrace_ringbuffer_try_sread_bl(&libtrace->packet_freelist, (void **) &result))
    1980                 result = trace_create_packet();
     2148        libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &result, 1, 1);
    19812149        assert(result);
    19822150        swap_packets(result, packet); // Move the current packet into our copy
     
    19842152}
    19852153
    1986 DLLEXPORT void trace_free_result_packet(libtrace_t * libtrace, libtrace_packet_t * packet) {
     2154DLLEXPORT void trace_free_result_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    19872155        // Try write back the packet
    19882156        assert(packet);
    19892157        // Always release any resources this might be holding such as a slot in a ringbuffer
    19902158        trace_fin_packet(packet);
    1991         if (!libtrace_ringbuffer_try_swrite_bl(&libtrace->packet_freelist, packet)) {
    1992                 /* We couldn't, oh well lets just destroy it - XXX consider non managed formats i.e. rings buffers loosing packets and jamming up :( */
    1993                 //assert(1 == 90);
    1994                 trace_destroy_packet(packet);
    1995         }
     2159        libtrace_ocache_free(&libtrace->packet_freelist, (void **) &packet, 1, 1);
    19962160}
    19972161
Note: See TracChangeset for help on using the changeset viewer.