Changeset f17b546


Ignore:
Timestamp:
08/26/15 10:18:31 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
285acaa
Parents:
7c17e4a (diff), 9a3a846 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge remote branch 'parallel/develop' into libtrace4

Files:
12 edited

Legend:

Unmodified
Added
Removed
  • configure.in

    rd420777 r341c38f  
    115115gcc_PURE
    116116gcc_FORMAT
     117
     118# Check for gcc style TLS (__thread)
     119gcc_TLS
    117120       
    118121# Check for libtool
     
    217220#include <net/bpf.h>
    218221])
     222
     223AC_ARG_ENABLE(memory-debugging,
     224                AS_HELP_STRING(--enable-memory-debugging, prints internal memory statistics),[
     225                if test "$HAVE_TLS" = 1
     226                then
     227                    AC_DEFINE([ENABLE_MEM_STATS], 1, [print debug memory statistics])
     228                fi
     229],[])
    219230
    220231# Configure options for man pages
  • lib/data-struct/linked_list.c

    rcb39d35 r03aca91  
    5252
    5353        new->prev = NULL;
    54         memcpy(&new->data, item, l->element_size);
     54        memcpy(new->data, item, l->element_size);
    5555
    5656        if (l->head == NULL) {
  • lib/data-struct/object_cache.c

    r14c6c08 r4007dbb  
    2727};
    2828
     29#ifdef ENABLE_MEM_STATS
    2930extern __thread struct mem_stats mem_hits;
     31#endif
    3032
    3133struct local_caches {
     
    123125/* Get TLS for the list of local_caches */
    124126static inline struct local_caches *get_local_caches() {
     127#if HAVE_TLS
    125128        static __thread struct local_caches *lcs = NULL;
    126129        if (lcs) {
    127130                return lcs;
    128         } else {
     131        }
     132#else
     133        struct local_caches *lcs;
     134        pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
     135        if ((lcs=pthread_getspecific(memory_destructor_key)) != 0) {
     136                return lcs;
     137        }
     138#endif
     139        else {
    129140                /* This thread has not been used with a memory pool before */
    130141                /* Allocate our TLS */
     
    280291                memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers);
    281292                lc->used -= nb_buffers;
     293#ifdef ENABLE_MEM_STATS
    282294                mem_hits.read.cache_hit += nb_buffers;
    283295                mem_hits.readbulk.cache_hit += 1;
     296#endif
    284297                return nb_buffers;
    285298        }
     
    287300        else if (nb_buffers > lc->total) {
    288301                i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers);
     302#ifdef ENABLE_MEM_STATS
    289303                if (i)
    290304                        mem_hits.readbulk.ring_hit += 1;
     
    292306                        mem_hits.readbulk.miss += 1;
    293307                mem_hits.read.ring_hit += i;
     308#endif
    294309        } else { // Not enough cached
    295310                // Empty the cache and re-fill it and then see what we're left with
    296311                i = lc->used;
    297312                memcpy(values, lc->cache, sizeof(void *) * lc->used);
     313#ifdef ENABLE_MEM_STATS
    298314                mem_hits.read.cache_hit += i;
     315#endif
    299316
    300317                // Make sure we still meet the minimum requirement
     
    303320                else
    304321                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0);
    305 
     322#ifdef ENABLE_MEM_STATS
    306323                if (lc->used == lc->total)
    307324                        mem_hits.readbulk.ring_hit += 1;
     
    309326                        mem_hits.readbulk.miss += 1;
    310327                mem_hits.read.ring_hit += lc->used;
     328#endif
    311329        }
    312330
     
    319337                i += remaining;
    320338        }
     339#ifdef ENABLE_MEM_STATS
    321340        mem_hits.read.miss += nb_buffers - i;
     341#endif
    322342        assert(i >= min_nb_buffers);
    323343        return i;
     
    379399                memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers);
    380400                lc->used += nb_buffers;
     401#ifdef ENABLE_MEM_STATS
    381402                mem_hits.write.cache_hit += nb_buffers;
    382403                mem_hits.writebulk.cache_hit += 1;
     404#endif
    383405                return nb_buffers;
    384406        }
     
    386408        else if (nb_buffers > lc->total) {
    387409                i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers);
     410#ifdef ENABLE_MEM_STATS
    388411                if (i)
    389412                        mem_hits.writebulk.ring_hit += 1;
     
    391414                        mem_hits.writebulk.miss += 1;
    392415                mem_hits.write.ring_hit += i;
     416#endif
    393417        } else { // Not enough cache space but there might later
    394418                // Fill the cache and empty it and then see what we're left with
    395419                i = (lc->total - lc->used);
    396420                memcpy(&lc->cache[lc->used], values, sizeof(void *) * i);
     421#ifdef ENABLE_MEM_STATS
    397422                mem_hits.write.cache_hit += i;
     423#endif
    398424
    399425                // Make sure we still meet the minimum requirement
     
    407433                        memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used);
    408434
     435#ifdef ENABLE_MEM_STATS
    409436                if (lc->used)
    410437                        mem_hits.writebulk.miss += 1;
     
    412439                        mem_hits.writebulk.ring_hit += 1;
    413440                mem_hits.write.ring_hit += lc->total - lc->used;
     441#endif
    414442        }
    415443
     
    422450                i += remaining;
    423451        }
     452#ifdef ENABLE_MEM_STATS
    424453        mem_hits.write.miss += nb_buffers - i;
     454#endif
    425455        return i;
    426456}
  • lib/format_linux_common.c

    r773a2a3 rf2066fa  
    661661                        linuxcommon_close_input_stream(libtrace, stream);
    662662                }
    663                 libtrace_list_deinit(FORMAT_DATA->per_stream);
    664                 free(libtrace->format_data);
    665                 libtrace->format_data = NULL;
    666663                return -1;
    667664        }
  • lib/format_linux_int.c

    re99c493 rf2066fa  
    6969{
    7070        int ret = linuxcommon_start_input_stream(libtrace, FORMAT_DATA_FIRST);
    71         if (ret != 0) {
    72                 libtrace_list_deinit(FORMAT_DATA->per_stream);
    73                 free(libtrace->format_data);
    74                 libtrace->format_data = NULL;
    75         }
    7671        return ret;
    7772}
  • lib/format_linux_ring.c

    r9d89626 rf2066fa  
    280280{
    281281        int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
    282         if (ret != 0) {
    283                 libtrace_list_deinit(FORMAT_DATA->per_stream);
    284                 free(libtrace->format_data);
    285                 libtrace->format_data = NULL;
    286         }
    287282        return ret;
    288283}
  • lib/libtrace_int.h

    rc723e9e r2fa43fa  
    307307        /** The sequence is like accepted_packets but we don't reset this after a pause. */
    308308        uint64_t sequence_number;
     309        /** The packet read out by the trace, backwards compatibility to allow us to finalise
     310         * a packet when the trace is destroyed */
     311        libtrace_packet_t *last_packet;
    309312        /** The filename from the uri for the trace */
    310313        char *uridata;
     
    330333        /** The actual freelist */
    331334        libtrace_ocache_t packet_freelist;
    332         /** User defined per_pkt function called when a pkt is ready */
    333         fn_per_pkt per_pkt;
    334         /** User defined reporter function entry point XXX not hooked up */
     335        /** User defined per_msg function called when a message is ready */
     336        fn_cb_msg per_msg;
     337        /** User defined reporter function entry point */
    335338        fn_reporter reporter;
    336339        /** The hasher function */
     
    358361        struct user_configuration config;
    359362        libtrace_combine_t combiner;
     363        struct {
     364                fn_cb_starting message_starting;
     365                fn_cb_dataless message_stopping;
     366                fn_cb_dataless message_resuming;
     367                fn_cb_dataless message_pausing;
     368                fn_cb_packet message_packet;
     369                fn_cb_first_packet message_first_packet;
     370                fn_cb_tick message_tick_count;
     371                fn_cb_tick message_tick_interval;
     372        } callbacks;
    360373};
    361374
  • lib/libtrace_parallel.h

    rd3849c7 rf2066fa  
    154154         * @param data unused, do not use this
    155155         * @param sender The sender will be set as the current thread
     156         * @return When using a function callback for starting, the returned
     157         * value is stored against the thread tls. Otherwise the return is ignored.
    156158         */
    157159        MESSAGE_STARTING,
     
    211213         * sent once a new packet is encountered
    212214         *
    213          * @
     215         * @see trace_get_first_packet()
    214216         */
    215217        MESSAGE_FIRST_PACKET,
     
    225227         * trace_post_reporter()
    226228         *
    227          * @see trace_get_first_packet()
    228229         */
    229230        MESSAGE_POST_REPORTER,
     
    236237         * with an earlier time-stamp.
    237238         *
    238          * @param data data.uint64_t holds the system time-stamp in the
     239         * @param data data.uint64 holds the system time-stamp in the
    239240         * erf format
    240241         * @param sender should be ignored
     
    248249         * threads will see it between the same packets.
    249250         *
    250          * @param data The number of packets seen so far across all threads
     251         * @param data data.uint64 holds the number of packets seen so far across all threads
    251252         * @param sender Set to the current per-packet thread
    252253         */
     
    397398/**
    398399 * The definition for the main function that the user supplies to process
    399  * packets.
     400 * messages.
    400401 *
    401402 * @param trace The trace the packet is related to.
     
    412413 * documentation for the message as to what value these will contain.
    413414 */
    414 typedef void* (*fn_per_pkt)(libtrace_t* trace,
    415                             libtrace_thread_t *thread,
    416                             int mesg_code,
    417                             libtrace_generic_t data,
    418                             libtrace_thread_t *sender);
     415typedef void* (*fn_cb_msg)(libtrace_t* trace,
     416                           libtrace_thread_t *thread,
     417                           int mesg_code,
     418                           libtrace_generic_t data,
     419                           libtrace_thread_t *sender);
    419420
    420421/**
     
    452453 * @param libtrace The input trace to start
    453454 * @param global_blob Global data related to this trace accessible using trace_get_global()
    454  * @param per_pkt A user supplied function called when a packet is ready
     455 * @param per_msg A user supplied function called when a message is ready
    455456 * @param reporter A user supplied function called when a result is ready.
    456457 * Optional if NULL the reporter thread will not be started.
     
    459460 * This can also be used to restart an existing parallel trace,
    460461 * that has previously been paused using trace_ppause().
    461  * In this case global_blob,per_pkt and reporter will only be updated
     462 * In this case global_blob,per_msg and reporter will only be updated
    462463 * if they are non-null. Otherwise their previous values will be maintained.
    463464 *
    464465 */
    465466DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    466                            fn_per_pkt per_pkt, fn_reporter reporter);
     467                           fn_cb_msg per_msg, fn_reporter reporter);
     468
     469/**
     470 *
     471 * @param libtrace The parallel trace
     472 * @param t The thread that is running
     473 * @param global The global storage
     474 * @return The returned value is stored against the threads tls.
     475 *         This is typically passed as tls argument to other messages.
     476 */
     477typedef void* (*fn_cb_starting)(libtrace_t *libtrace,
     478                                     libtrace_thread_t *t,
     479                                     void *global);
     480
     481/**
     482 * @param libtrace The parallel trace
     483 * @param t The thread that is running
     484 * @param global The global storage
     485 * @param tls The thread local storage
     486 */
     487typedef void (*fn_cb_dataless)(libtrace_t *libtrace,
     488                                    libtrace_thread_t *t,
     489                                    void *global,
     490                                    void *tls);
     491
     492/**
     493 * @param libtrace The parallel trace
     494 * @param t The thread that is running
     495 * @param global The global storage
     496 * @param tls The thread local storage
     497 */
     498typedef void (*fn_cb_first_packet)(libtrace_t *libtrace,
     499                                   libtrace_thread_t *t,
     500                                   void *global,
     501                                   void *tls,
     502                                   libtrace_packet_t *first_packet,
     503                                   libtrace_thread_t *sender);
     504
     505/**
     506 * @param libtrace The parallel trace
     507 * @param t The thread that is running
     508 * @param global The global storage
     509 * @param tls The thread local storage
     510 * @param uint64_t Either the timestamp or packet count depending on message type
     511 */
     512typedef void (*fn_cb_tick)(libtrace_t *libtrace,
     513                           libtrace_thread_t *t,
     514                           void *global,
     515                           void *tls,
     516                           uint64_t order);
     517
     518/**
     519 * @param libtrace The parallel trace
     520 * @param t The thread
     521 * @param packet The packet associated with the message
     522 * @param global The global storage
     523 * @param tls The thread local storage
     524 *
     525 * @return optionally a packet which is handed back to the library,
     526 *         typically this is the packet supplied. Otherwise NULL.
     527 */
     528typedef libtrace_packet_t* (*fn_cb_packet)(libtrace_t *libtrace,
     529                                           libtrace_thread_t *t,
     530                                           void *global,
     531                                           void *tls,
     532                                           libtrace_packet_t *packet);
     533
     534/** Registers a built-in message with a handler.
     535 * Note we do not include the sending thread as an argument to the reporter.
     536 * If set to NULL, the message will be sent to default perpkt handler.
     537 *
     538 * @param libtrace The input trace to start
     539 * @param handler the handler to be called when the message is received
     540 * @return 0 if successful otherwise -1.
     541 */
     542
     543DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler);
     544DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler);
     545DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler);
     546DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler);
     547DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler);
     548DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler);
     549DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler);
     550DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler);
    467551
    468552/** Pauses a trace previously started with trace_pstart()
     
    542626/** Store data against a thread.
    543627 *
    544  * @param The parallel trace.
    545  * @param data The new value to save against the trace
     628 * @param thread The thread
     629 * @param data The new value to save against the thread
    546630 * @return The previously stored value
    547631 *
     
    628712 * as possible (real-time).
    629713 *
    630  * @param A parallel input trace
     714 * @param trace A parallel input trace
    631715 * @param tracetime If true packets are released with time intervals matching
    632716 * the original trace. Otherwise packets are read as fast as possible.
     
    810894         *
    811895         * @param key (Typically) The packets order, see trace_packet_get_order()
    812          * @param
    813896         */
    814897        RESULT_PACKET,
     
    9661049 */
    9671050DLLEXPORT bool trace_has_finished(libtrace_t * libtrace);
     1051
     1052
     1053/** Check if libtrace is directly reading from multiple queues
     1054 * from the format (such as a NICs hardware queues).
     1055 *
     1056 * When a parallel trace is running, or if checked after its completion
     1057 * this returns true if a trace was able to run natively parallel
     1058 * from the format. Otherwise false is returned, meaning libtrace is
     1059 * distibuting packets across multiple threads from a single source.
     1060 *
     1061 * Factors that may stop this happening despite the format supporting
     1062 * native parallel reads include: the choice of hasher function,
     1063 * the number of threads choosen (such as 1 or more than the trace supports)
     1064 * or another error when trying to start the parallel format.
     1065 *
     1066 * If this is called before the trace is started. I.e. before pstart
     1067 * this returns an indication that the trace has the possiblity to support
     1068 * native parallel reads. After trace pstart is called this should be
     1069 * checked again to confirm this has happened.
     1070 *
     1071 *
     1072 * @return true if the trace is parallel or false if the library is splitting
     1073 * the trace into multiple threads.
     1074 */
     1075DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace);
    9681076
    9691077/** Returns either the sequence number or erf timestamp of a packet.
  • lib/trace.c

    r8370482 r9a3a846  
    261261        libtrace->filtered_packets = 0;
    262262        libtrace->accepted_packets = 0;
     263        libtrace->last_packet = NULL;
    263264       
    264265        /* Parallel inits */
     
    268269        libtrace->perpkt_queue_full = false;
    269270        libtrace->global_blob = NULL;
    270         libtrace->per_pkt = NULL;
     271        libtrace->per_msg = NULL;
    271272        libtrace->reporter = NULL;
    272273        libtrace->hasher = NULL;
     
    287288        ZERO_USER_CONFIG(libtrace->config);
    288289        memset(&libtrace->combiner, 0, sizeof(libtrace->combiner));
     290        memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks));
    289291
    290292        /* Parse the URI to determine what sort of trace we are dealing with */
     
    382384        libtrace->io = NULL;
    383385        libtrace->filtered_packets = 0;
     386        libtrace->accepted_packets = 0;
     387        libtrace->last_packet = NULL;
    384388       
    385389        /* Parallel inits */
     
    389393        libtrace->perpkt_queue_full = false;
    390394        libtrace->global_blob = NULL;
    391         libtrace->per_pkt = NULL;
     395        libtrace->per_msg = NULL;
    392396        libtrace->reporter = NULL;
    393397        libtrace->hasher = NULL;
     
    405409        ZERO_USER_CONFIG(libtrace->config);
    406410        memset(&libtrace->combiner, 0, sizeof(libtrace->combiner));
     411        memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks));
    407412       
    408413        for(tmp=formats_list;tmp;tmp=tmp->next) {
     
    547552                return -1;
    548553        }
     554
     555        /* Finish the last packet we read - for backwards compatibility */
     556        if (libtrace->last_packet)
     557                trace_fin_packet(libtrace->last_packet);
     558        assert(libtrace->last_packet == NULL);
     559
    549560        if (libtrace->format->pause_input)
    550561                libtrace->format->pause_input(libtrace);
     562
    551563        libtrace->started=false;
    552564        return 0;
     
    687699        }
    688700
     701        /* Finish any the last packet we read - for backwards compatibility */
     702        if (libtrace->last_packet)
     703                trace_fin_packet(libtrace->last_packet);
     704        assert(libtrace->last_packet == NULL);
     705
    689706        if (libtrace->format) {
    690707                if (libtrace->started && libtrace->format->pause_input)
     
    803820                packet->trace->format->fin_packet(packet);
    804821        }
     822        if (packet->trace && packet->trace->last_packet == packet)
     823                packet->trace->last_packet = NULL;
    805824       
    806825        if (packet->buf_control == TRACE_CTRL_PACKET && packet->buffer) {
     
    825844                        packet->trace->format->fin_packet(packet);
    826845                }
     846                if (packet->trace && packet->trace->last_packet == packet)
     847                        packet->trace->last_packet = NULL;
    827848
    828849                // No matter what we remove the header and link pointers
     
    903924                        ++libtrace->accepted_packets;
    904925                        ++libtrace->sequence_number;
     926                        libtrace->last_packet = packet;
    905927                        return ret;
    906928                } while(1);
  • lib/trace_parallel.c

    rd3849c7 rf2066fa  
    113113};
    114114
     115
     116#ifdef ENABLE_MEM_STATS
    115117// Grrr gcc wants this spelt out
    116118__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
    117119
     120
    118121static void print_memory_stats() {
    119 #if 0
    120122        uint64_t total;
    121123#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
     
    163165                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
    164166        }
     167}
     168#else
     169static void print_memory_stats() {}
    165170#endif
     171
     172static const libtrace_generic_t gen_zero = {0};
     173
     174/* This should optimise away the switch to nothing in the explict cases */
     175static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type,
     176                                libtrace_generic_t data, libtrace_thread_t *sender) {
     177        fn_cb_dataless fn = NULL;
     178        switch (type) {
     179        case MESSAGE_STARTING:
     180                if (trace->callbacks.message_starting)
     181                        thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob);
     182                else if (trace->per_msg)
     183                        (*trace->per_msg)(trace, thread, type, data, sender);
     184                return;
     185        case MESSAGE_FIRST_PACKET:
     186                if (trace->callbacks.message_first_packet)
     187                        (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender);
     188                else if (trace->per_msg)
     189                        (*trace->per_msg)(trace, thread, type, data, sender);
     190                return;
     191        case MESSAGE_TICK_COUNT:
     192                if (trace->callbacks.message_tick_count)
     193                        (*trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64);
     194                else if (trace->per_msg)
     195                        (*trace->per_msg)(trace, thread, type, data, sender);
     196                return;
     197        case MESSAGE_TICK_INTERVAL:
     198                if (trace->callbacks.message_tick_interval)
     199                        (*trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data,  data.uint64);
     200                else if (trace->per_msg)
     201                        (*trace->per_msg)(trace, thread, type, data, sender);
     202                return;
     203        case MESSAGE_STOPPING:
     204                fn = trace->callbacks.message_stopping;
     205                break;
     206        case MESSAGE_RESUMING:
     207                fn = trace->callbacks.message_resuming;
     208                break;
     209        case MESSAGE_PAUSING:
     210                fn = trace->callbacks.message_pausing;
     211                break;
     212
     213        /* These should be unused */
     214        case MESSAGE_DO_PAUSE:
     215        case MESSAGE_DO_STOP:
     216        case MESSAGE_POST_REPORTER:
     217        case MESSAGE_RESULT:
     218        case MESSAGE_PACKET:
     219                return;
     220        case MESSAGE_USER:
     221                break;
     222        }
     223        if (fn)
     224                (*fn)(trace, thread, trace->global_blob, thread->user_data);
     225        else if (trace->per_msg)
     226                (*trace->per_msg)(trace, thread, type, data, sender);
    166227}
    167228
     
    304365                pthread_t tid = pthread_self();
    305366                // Check if we are reporter or something else
    306                 if (pthread_equal(tid, libtrace->reporter_thread.tid))
     367                if (libtrace->hasher_thread.type == THREAD_REPORTER &&
     368                                pthread_equal(tid, libtrace->reporter_thread.tid))
    307369                        ret = &libtrace->reporter_thread;
    308                 else if (pthread_equal(tid, libtrace->hasher_thread.tid))
     370                else if (libtrace->hasher_thread.type == THREAD_HASHER &&
     371                         pthread_equal(tid, libtrace->hasher_thread.tid))
    309372                        ret = &libtrace->hasher_thread;
    310373                else
     
    316379DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
    317380        // Duplicate the packet in standard malloc'd memory and free the
    318         // original, This is a 1:1 exchange so is ocache count remains unchanged.
     381        // original, This is a 1:1 exchange so the ocache count remains unchanged.
    319382        if (pkt->buf_control != TRACE_CTRL_PACKET) {
    320383                libtrace_packet_t *dup;
     
    324387                /* Copy the duplicated packet over the existing */
    325388                memcpy(pkt, dup, sizeof(libtrace_packet_t));
     389                /* Free the packet structure */
     390                free(dup);
    326391        }
    327392}
     
    377442                t->accepted_packets++;
    378443                libtrace_generic_t data = {.pkt = *packet};
    379                 *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t);
     444                if (trace->callbacks.message_packet)
     445                        *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
     446                else if (trace->per_msg)
     447                        *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t);
    380448                trace_fin_packet(*packet);
    381449        } else {
    382450                assert((*packet)->error == READ_TICK);
    383451                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
    384                 (*trace->per_pkt)(trace, t, MESSAGE_TICK_COUNT, data, t);
     452                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
    385453        }
    386454        return 0;
     
    449517
    450518        /* Let the user thread know we are going to pause */
    451         (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
     519        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
    452520
    453521        /* Send through any remaining packets (or messages) without delay */
     
    490558        /* Now we do the actual pause, this returns when we resumed */
    491559        trace_thread_pause(trace, t);
    492         (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
     560        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
    493561        return 1;
    494562}
     
    523591
    524592        if (trace->format->pregister_thread) {
    525                 trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));
     593                trace->format->pregister_thread(trace, t, trace_is_parallel(trace));
    526594        }
    527595
     
    535603
    536604        /* Let the per_packet function know we have started */
    537         (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
    538         (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
     605        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
     606        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
    539607
    540608        for (;;) {
     
    556624                                        goto eof;
    557625                        }
    558                         (*trace->per_pkt)(trace, t, message.code, message.data, message.sender);
     626                        (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
     627                        (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
    559628                        /* Continue and the empty messages out before packets */
    560629                        continue;
     
    619688
    620689        // Let the per_packet function know we have stopped
    621         (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
    622         (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t);
     690        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
     691        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
    623692
    624693        // Free any remaining packets
     
    12461315 */
    12471316static int trace_prestart(libtrace_t * libtrace, void *global_blob,
    1248                           fn_per_pkt per_pkt, fn_reporter reporter) {
     1317                          fn_cb_msg per_msg, fn_reporter reporter) {
    12491318        int i, err = 0;
    12501319        if (libtrace->state != STATE_PAUSED) {
     
    12891358
    12901359        /* Update functions if requested */
    1291         if (per_pkt)
    1292                 libtrace->per_pkt = per_pkt;
    1293         assert(libtrace->per_pkt);
     1360        if (per_msg)
     1361                libtrace->per_msg = per_msg;
     1362        assert(libtrace->per_msg);
    12941363        if (reporter)
    12951364                libtrace->reporter = reporter;
     
    12971366                libtrace->global_blob = global_blob;
    12981367
    1299         if (libtrace->perpkt_thread_count > 1 &&
    1300             trace_supports_parallel(libtrace) &&
    1301             !trace_has_dedicated_hasher(libtrace)) {
     1368        if (trace_is_parallel(libtrace)) {
    13021369                err = libtrace->format->pstart_input(libtrace);
    13031370        } else {
     
    15121579}
    15131580
     1581DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) {
     1582        if (libtrace->state == STATE_NEW)
     1583                return trace_supports_parallel(libtrace);
     1584        return libtrace->pread == trace_pread_packet_wrapper;
     1585}
     1586
    15141587DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    1515                            fn_per_pkt per_pkt, fn_reporter reporter) {
     1588                           fn_cb_msg per_msg, fn_reporter reporter) {
    15161589        int i;
    15171590        int ret = -1;
     
    15261599
    15271600        if (libtrace->state == STATE_PAUSED) {
    1528                 ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
     1601                ret = trace_prestart(libtrace, global_blob, per_msg, reporter);
    15291602                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    15301603                return ret;
     
    15411614        /* Store the user defined things against the trace */
    15421615        libtrace->global_blob = global_blob;
    1543         libtrace->per_pkt = per_pkt;
     1616        libtrace->per_msg = per_msg;
    15441617        libtrace->reporter = reporter;
    15451618        /* And zero other fields */
     
    15591632        verify_configuration(libtrace);
    15601633
     1634        ret = -1;
    15611635        /* Try start the format - we prefer parallel over single threaded, as
    15621636         * these formats should support messages better */
     
    15651639                ret = libtrace->format->pstart_input(libtrace);
    15661640                libtrace->pread = trace_pread_packet_wrapper;
    1567         } else {
     1641        }
     1642        if (ret != 0) {
    15681643                if (libtrace->format->start_input) {
    15691644                        ret = libtrace->format->start_input(libtrace);
     
    17031778        libtrace->perpkt_thread_states[THREAD_FINISHED] = 0;
    17041779cleanup_started:
    1705         if (trace_supports_parallel(libtrace) &&
    1706             !trace_has_dedicated_hasher(libtrace)
    1707             && libtrace->perpkt_thread_count > 1) {
     1780        if (libtrace->pread == trace_pread_packet_wrapper) {
    17081781                if (libtrace->format->ppause_input)
    17091782                        libtrace->format->ppause_input(libtrace);
     
    17201793}
    17211794
     1795DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) {
     1796        libtrace->callbacks.message_starting = handler;
     1797        return 0;
     1798}
     1799
     1800DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) {
     1801        libtrace->callbacks.message_pausing = handler;
     1802        return 0;
     1803}
     1804
     1805DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) {
     1806        libtrace->callbacks.message_resuming = handler;
     1807        return 0;
     1808}
     1809
     1810DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) {
     1811        libtrace->callbacks.message_stopping = handler;
     1812        return 0;
     1813}
     1814
     1815DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) {
     1816        libtrace->callbacks.message_packet = handler;
     1817        return 0;
     1818}
     1819
     1820DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) {
     1821        libtrace->callbacks.message_first_packet = handler;
     1822        return 0;
     1823}
     1824
     1825DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) {
     1826        libtrace->callbacks.message_tick_count = handler;
     1827        return 0;
     1828}
     1829
     1830DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) {
     1831        libtrace->callbacks.message_tick_interval = handler;
     1832        return 0;
     1833}
     1834
    17221835/*
    17231836 * Pauses a trace, this should only be called by the main thread
     
    17281841 *
    17291842 * Once done you should be able to modify the trace setup and call pstart again
    1730  * TODO handle changing thread numbers
     1843 * TODO add support to change the number of threads.
    17311844 */
    17321845DLLEXPORT int trace_ppause(libtrace_t *libtrace)
     
    18261939        // Save the statistics against the trace
    18271940        trace_get_statistics(libtrace, NULL);
    1828         if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {
     1941        if (trace_is_parallel(libtrace)) {
    18291942                libtrace->started = false;
    18301943                if (libtrace->format->ppause_input)
     
    22482361        return 0;
    22492362}
    2250 
    2251 
    22522363
    22532364static bool config_bool_parse(char *value, size_t nvalue) {
  • m4/attributes.m4

    r9b42f3e r4007dbb  
    154154])
    155155
     156AC_DEFUN([gcc_TLS],
     157[
     158  HAVE_TLS=0
     159  if test -n "$CC"; then
     160    AC_CACHE_CHECK([if compiler supports TLS __thread],
     161      [lt_cv_attribute_tls],
     162      [
     163       AC_COMPILE_IFELSE([AC_LANG_SOURCE(
     164         [static __thread int apples;])],
     165         [lt_cv_attribute_tls=yes],
     166         [lt_cv_attribute_tls=no]
     167       )
     168      ])
     169    if test x$lt_cv_attribute_tls = xyes; then
     170      HAVE_TLS=1
     171    fi
     172  fi
     173  AC_SUBST([HAVE_TLS])
     174  AC_DEFINE_UNQUOTED([HAVE_TLS], [$HAVE_TLS],
     175    [Define to 1 or 0, depending on whether the compiler supports tls via __thread.])
     176])
    156177
  • tools/tracestats/tracestats_parallel.c

    rdfbdda7a r4007dbb  
    8787
    8888
    89 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    90                         int mesg, libtrace_generic_t data,
     89static void* per_packet(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     90                        int mesg UNUSED, libtrace_generic_t data UNUSED,
    9191                        libtrace_thread_t *sender UNUSED)
    9292{
    93         /* Using first entry as total and those after for filter counts */
    94         static __thread statistics_t * results = NULL;
    95         int i, wlen;
    96         libtrace_stat_t *stats;
    97         libtrace_generic_t gen;
    98 
    99         switch (mesg) {
    100         case MESSAGE_PACKET:
    101                 /* Apply filters to every packet note the result */
    102                 wlen = trace_get_wire_length(data.pkt);
    103                 for(i=0;i<filter_count;++i) {
    104                         if (filters[i].filter == NULL)
    105                                 continue;
    106                         if(trace_apply_filter(filters[i].filter,data.pkt) > 0) {
    107                                 results[i+1].count++;
    108                                 results[i+1].bytes+=wlen;
    109                         }
    110                         if (trace_is_err(trace)) {
    111                                 trace_perror(trace, "trace_apply_filter");
    112                                 fprintf(stderr, "Removing filter from filterlist\n");
    113                                 /* This is a race, but will be atomic */
    114                                 filters[i].filter = NULL;
    115                         }
    116                 }
    117                 results[0].count++;
    118                 results[0].bytes +=wlen;
    119                 return data.pkt;
    120         case MESSAGE_STARTING:
    121                 /* Allocate space to hold a total count and one for each filter */
    122                 results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    123                 break;
    124         case MESSAGE_STOPPING:
    125                 /* We only output one result per thread with the key 0 when the
    126                  * trace is over. */
    127                 gen.ptr = results;
    128                 trace_publish_result(trace, t, 0, gen, RESULT_USER);
    129                 break;
    130         default:
    131                 break;
    132         }
    13393        return NULL;
    13494}
     
    184144}
    185145
     146
     147static void* fn_starting(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, void *global UNUSED) {
     148        /* Allocate space to hold a total count and one for each filter */
     149        return calloc(1, sizeof(statistics_t) * (filter_count + 1));
     150}
     151
     152
     153static void fn_stopping(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     154                        void *global UNUSED, void*tls) {
     155        statistics_t *results = tls;
     156        libtrace_generic_t gen;
     157        /* We only output one result per thread with the key 0 when the
     158         * trace is over. */
     159        gen.ptr = results;
     160        trace_publish_result(trace, t, 0, gen, RESULT_USER);
     161}
     162
     163static libtrace_packet_t* fn_packet(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     164                   void *global UNUSED, void*tls, libtrace_packet_t *pkt) {
     165        statistics_t *results = tls;
     166        int i, wlen;
     167
     168        /* Apply filters to every packet note the result */
     169        wlen = trace_get_wire_length(pkt);
     170        for(i=0;i<filter_count;++i) {
     171                if (filters[i].filter == NULL)
     172                        continue;
     173                if(trace_apply_filter(filters[i].filter,pkt) > 0) {
     174                        results[i+1].count++;
     175                        results[i+1].bytes+=wlen;
     176                }
     177                if (trace_is_err(trace)) {
     178                        trace_perror(trace, "trace_apply_filter");
     179                        fprintf(stderr, "Removing filter from filterlist\n");
     180                        /* This is a race, but will be atomic */
     181                        filters[i].filter = NULL;
     182                }
     183        }
     184        results[0].count++;
     185        results[0].bytes +=wlen;
     186        return pkt;
     187}
     188
    186189/* Process a trace, counting packets that match filter(s) */
    187190static void run_trace(char *uri, char *config, char *config_file)
     
    212215                }
    213216        }
     217
     218        trace_cb_packet(trace, fn_packet);
     219        trace_cb_starting(trace, fn_starting);
     220        trace_cb_stopping(trace, fn_stopping);
    214221
    215222        /* Start the trace as a parallel trace */
Note: See TracChangeset for help on using the changeset viewer.