Changeset 54cf135


Ignore:
Timestamp:
08/25/15 14:37:07 (5 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
341c38f
Parents:
6210f82 (diff), eea427f (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'new_interface' into develop

Files:
8 edited

Legend:

Unmodified
Added
Removed
  • configure.in

    r16cb2a2 r4007dbb  
    116116gcc_PURE
    117117gcc_FORMAT
     118
     119# Check for gcc style TLS (__thread)
     120gcc_TLS
    118121       
    119122# Check for libtool
     
    208211#include <net/bpf.h>
    209212])
     213
     214AC_ARG_ENABLE(memory-debugging,
     215                AS_HELP_STRING(--enable-memory-debugging, prints internal memory statistics),[
     216                if test "$HAVE_TLS" = 1
     217                then
     218                    AC_DEFINE([ENABLE_MEM_STATS], 1, [print debug memory statistics])
     219                fi
     220],[])
    210221
    211222# Configure options for man pages
  • lib/data-struct/object_cache.c

    r14c6c08 r4007dbb  
    2727};
    2828
     29#ifdef ENABLE_MEM_STATS
    2930extern __thread struct mem_stats mem_hits;
     31#endif
    3032
    3133struct local_caches {
     
    123125/* Get TLS for the list of local_caches */
    124126static inline struct local_caches *get_local_caches() {
     127#if HAVE_TLS
    125128        static __thread struct local_caches *lcs = NULL;
    126129        if (lcs) {
    127130                return lcs;
    128         } else {
     131        }
     132#else
     133        struct local_caches *lcs;
     134        pthread_once(&memory_destructor_once, &once_memory_cache_key_init);
     135        if ((lcs=pthread_getspecific(memory_destructor_key)) != 0) {
     136                return lcs;
     137        }
     138#endif
     139        else {
    129140                /* This thread has not been used with a memory pool before */
    130141                /* Allocate our TLS */
     
    280291                memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers);
    281292                lc->used -= nb_buffers;
     293#ifdef ENABLE_MEM_STATS
    282294                mem_hits.read.cache_hit += nb_buffers;
    283295                mem_hits.readbulk.cache_hit += 1;
     296#endif
    284297                return nb_buffers;
    285298        }
     
    287300        else if (nb_buffers > lc->total) {
    288301                i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers);
     302#ifdef ENABLE_MEM_STATS
    289303                if (i)
    290304                        mem_hits.readbulk.ring_hit += 1;
     
    292306                        mem_hits.readbulk.miss += 1;
    293307                mem_hits.read.ring_hit += i;
     308#endif
    294309        } else { // Not enough cached
    295310                // Empty the cache and re-fill it and then see what we're left with
    296311                i = lc->used;
    297312                memcpy(values, lc->cache, sizeof(void *) * lc->used);
     313#ifdef ENABLE_MEM_STATS
    298314                mem_hits.read.cache_hit += i;
     315#endif
    299316
    300317                // Make sure we still meet the minimum requirement
     
    303320                else
    304321                        lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0);
    305 
     322#ifdef ENABLE_MEM_STATS
    306323                if (lc->used == lc->total)
    307324                        mem_hits.readbulk.ring_hit += 1;
     
    309326                        mem_hits.readbulk.miss += 1;
    310327                mem_hits.read.ring_hit += lc->used;
     328#endif
    311329        }
    312330
     
    319337                i += remaining;
    320338        }
     339#ifdef ENABLE_MEM_STATS
    321340        mem_hits.read.miss += nb_buffers - i;
     341#endif
    322342        assert(i >= min_nb_buffers);
    323343        return i;
     
    379399                memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers);
    380400                lc->used += nb_buffers;
     401#ifdef ENABLE_MEM_STATS
    381402                mem_hits.write.cache_hit += nb_buffers;
    382403                mem_hits.writebulk.cache_hit += 1;
     404#endif
    383405                return nb_buffers;
    384406        }
     
    386408        else if (nb_buffers > lc->total) {
    387409                i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers);
     410#ifdef ENABLE_MEM_STATS
    388411                if (i)
    389412                        mem_hits.writebulk.ring_hit += 1;
     
    391414                        mem_hits.writebulk.miss += 1;
    392415                mem_hits.write.ring_hit += i;
     416#endif
    393417        } else { // Not enough cache space but there might later
    394418                // Fill the cache and empty it and then see what we're left with
    395419                i = (lc->total - lc->used);
    396420                memcpy(&lc->cache[lc->used], values, sizeof(void *) * i);
     421#ifdef ENABLE_MEM_STATS
    397422                mem_hits.write.cache_hit += i;
     423#endif
    398424
    399425                // Make sure we still meet the minimum requirement
     
    407433                        memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used);
    408434
     435#ifdef ENABLE_MEM_STATS
    409436                if (lc->used)
    410437                        mem_hits.writebulk.miss += 1;
     
    412439                        mem_hits.writebulk.ring_hit += 1;
    413440                mem_hits.write.ring_hit += lc->total - lc->used;
     441#endif
    414442        }
    415443
     
    422450                i += remaining;
    423451        }
     452#ifdef ENABLE_MEM_STATS
    424453        mem_hits.write.miss += nb_buffers - i;
     454#endif
    425455        return i;
    426456}
  • lib/libtrace_int.h

    rc723e9e r4007dbb  
    330330        /** The actual freelist */
    331331        libtrace_ocache_t packet_freelist;
    332         /** User defined per_pkt function called when a pkt is ready */
    333         fn_per_pkt per_pkt;
     332        /** User defined per_msg function called when a message is ready */
     333        fn_cb_msg per_msg;
    334334        /** User defined reporter function entry point XXX not hooked up */
    335335        fn_reporter reporter;
     
    358358        struct user_configuration config;
    359359        libtrace_combine_t combiner;
     360        struct {
     361                fn_cb_starting message_starting;
     362                fn_cb_dataless message_stopping;
     363                fn_cb_dataless message_resuming;
     364                fn_cb_dataless message_pausing;
     365                fn_cb_packet message_packet;
     366                fn_cb_first_packet message_first_packet;
     367                fn_cb_tick message_tick_count;
     368                fn_cb_tick message_tick_interval;
     369        } callbacks;
    360370};
    361371
  • lib/libtrace_parallel.h

    rd3849c7 r4007dbb  
    154154         * @param data unused, do not use this
    155155         * @param sender The sender will be set as the current thread
     156         * @return When using a function callback for starting, the returned
     157         * value is stored against the thread tls. Otherwise the return is ignored.
    156158         */
    157159        MESSAGE_STARTING,
     
    211213         * sent once a new packet is encountered
    212214         *
    213          * @
     215         * @see trace_get_first_packet()
    214216         */
    215217        MESSAGE_FIRST_PACKET,
     
    225227         * trace_post_reporter()
    226228         *
    227          * @see trace_get_first_packet()
    228229         */
    229230        MESSAGE_POST_REPORTER,
     
    236237         * with an earlier time-stamp.
    237238         *
    238          * @param data data.uint64_t holds the system time-stamp in the
     239         * @param data data.uint64 holds the system time-stamp in the
    239240         * erf format
    240241         * @param sender should be ignored
     
    248249         * threads will see it between the same packets.
    249250         *
    250          * @param data The number of packets seen so far across all threads
     251         * @param data data.uint64 holds the number of packets seen so far across all threads
    251252         * @param sender Set to the current per-packet thread
    252253         */
     
    397398/**
    398399 * The definition for the main function that the user supplies to process
    399  * packets.
     400 * messages.
    400401 *
    401402 * @param trace The trace the packet is related to.
     
    412413 * documentation for the message as to what value these will contain.
    413414 */
    414 typedef void* (*fn_per_pkt)(libtrace_t* trace,
    415                             libtrace_thread_t *thread,
    416                             int mesg_code,
    417                             libtrace_generic_t data,
    418                             libtrace_thread_t *sender);
     415typedef void* (*fn_cb_msg)(libtrace_t* trace,
     416                           libtrace_thread_t *thread,
     417                           int mesg_code,
     418                           libtrace_generic_t data,
     419                           libtrace_thread_t *sender);
    419420
    420421/**
     
    452453 * @param libtrace The input trace to start
    453454 * @param global_blob Global data related to this trace accessible using trace_get_global()
    454  * @param per_pkt A user supplied function called when a packet is ready
     455 * @param per_msg A user supplied function called when a message is ready
    455456 * @param reporter A user supplied function called when a result is ready.
    456457 * Optional if NULL the reporter thread will not be started.
     
    459460 * This can also be used to restart an existing parallel trace,
    460461 * that has previously been paused using trace_ppause().
    461  * In this case global_blob,per_pkt and reporter will only be updated
     462 * In this case global_blob,per_msg and reporter will only be updated
    462463 * if they are non-null. Otherwise their previous values will be maintained.
    463464 *
    464465 */
    465466DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    466                            fn_per_pkt per_pkt, fn_reporter reporter);
     467                           fn_cb_msg per_msg, fn_reporter reporter);
     468
     469/**
     470 *
     471 * @param libtrace The parallel trace
     472 * @param t The thread that is running
     473 * @param global The global storage
     474 * @return The returned value is stored against the threads tls.
     475 *         This is typically passed as tls argument to other messages.
     476 */
     477typedef void* (*fn_cb_starting)(libtrace_t *libtrace,
     478                                     libtrace_thread_t *t,
     479                                     void *global);
     480
     481/**
     482 * @param libtrace The parallel trace
     483 * @param t The thread that is running
     484 * @param global The global storage
     485 * @param tls The thread local storage
     486 */
     487typedef void (*fn_cb_dataless)(libtrace_t *libtrace,
     488                                    libtrace_thread_t *t,
     489                                    void *global,
     490                                    void *tls);
     491
     492/**
     493 * @param libtrace The parallel trace
     494 * @param t The thread that is running
     495 * @param global The global storage
     496 * @param tls The thread local storage
     497 */
     498typedef void (*fn_cb_first_packet)(libtrace_t *libtrace,
     499                                   libtrace_thread_t *t,
     500                                   void *global,
     501                                   void *tls,
     502                                   libtrace_packet_t *first_packet,
     503                                   libtrace_thread_t *sender);
     504
     505/**
     506 * @param libtrace The parallel trace
     507 * @param t The thread that is running
     508 * @param global The global storage
     509 * @param tls The thread local storage
     510 * @param uint64_t Either the timestamp or packet count depending on message type
     511 */
     512typedef void (*fn_cb_tick)(libtrace_t *libtrace,
     513                           libtrace_thread_t *t,
     514                           void *global,
     515                           void *tls,
     516                           uint64_t order);
     517
     518/**
     519 * @param libtrace The parallel trace
     520 * @param t The thread
     521 * @param packet The packet associated with the message
     522 * @param global The global storage
     523 * @param tls The thread local storage
     524 *
     525 * @return optionally a packet which is handed back to the library,
     526 *         typically this is the packet supplied. Otherwise NULL.
     527 */
     528typedef libtrace_packet_t* (*fn_cb_packet)(libtrace_t *libtrace,
     529                                           libtrace_thread_t *t,
     530                                           void *global,
     531                                           void *tls,
     532                                           libtrace_packet_t *packet);
     533
     534/** Registers a built-in message with a handler.
     535 * Note we do not include the sending thread as an argument to the reporter.
     536 * If set to NULL, the message will be sent to default perpkt handler.
     537 *
     538 * @param libtrace The input trace to start
     539 * @param message The message to intercept
     540 * @param handler the handler to be called when the message is received
     541 * @return 0 if successful otherwise -1.
     542 */
     543
     544DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler);
     545DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler);
     546DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler);
     547DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler);
     548DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler);
     549DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler);
     550DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler);
     551DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler);
    467552
    468553/** Pauses a trace previously started with trace_pstart()
  • lib/trace.c

    r8370482 r4007dbb  
    268268        libtrace->perpkt_queue_full = false;
    269269        libtrace->global_blob = NULL;
    270         libtrace->per_pkt = NULL;
     270        libtrace->per_msg = NULL;
    271271        libtrace->reporter = NULL;
    272272        libtrace->hasher = NULL;
     
    287287        ZERO_USER_CONFIG(libtrace->config);
    288288        memset(&libtrace->combiner, 0, sizeof(libtrace->combiner));
     289        memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks));
    289290
    290291        /* Parse the URI to determine what sort of trace we are dealing with */
     
    389390        libtrace->perpkt_queue_full = false;
    390391        libtrace->global_blob = NULL;
    391         libtrace->per_pkt = NULL;
     392        libtrace->per_msg = NULL;
    392393        libtrace->reporter = NULL;
    393394        libtrace->hasher = NULL;
     
    405406        ZERO_USER_CONFIG(libtrace->config);
    406407        memset(&libtrace->combiner, 0, sizeof(libtrace->combiner));
     408        memset(&libtrace->callbacks, 0, sizeof(libtrace->callbacks));
    407409       
    408410        for(tmp=formats_list;tmp;tmp=tmp->next) {
  • lib/trace_parallel.c

    rd3849c7 reea427f  
    113113};
    114114
     115
     116#ifdef ENABLE_MEM_STATS
    115117// Grrr gcc wants this spelt out
    116118__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
    117119
     120
    118121static void print_memory_stats() {
    119 #if 0
    120122        uint64_t total;
    121123#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
     
    163165                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
    164166        }
     167}
     168#else
     169static void print_memory_stats() {}
    165170#endif
     171
     172static const libtrace_generic_t gen_zero = {0};
     173
     174/* This should optimise away the switch to nothing in the explict cases */
     175static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type,
     176                                libtrace_generic_t data, libtrace_thread_t *sender) {
     177        fn_cb_dataless fn = NULL;
     178        switch (type) {
     179        case MESSAGE_STARTING:
     180                if (trace->callbacks.message_starting)
     181                        thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob);
     182                else if (trace->per_msg)
     183                        (*trace->per_msg)(trace, thread, type, data, sender);
     184                return;
     185        case MESSAGE_FIRST_PACKET:
     186                if (trace->callbacks.message_first_packet)
     187                        (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender);
     188                else if (trace->per_msg)
     189                        (*trace->per_msg)(trace, thread, type, data, sender);
     190                return;
     191        case MESSAGE_TICK_COUNT:
     192                if (trace->callbacks.message_tick_count)
     193                        (*trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64);
     194                else if (trace->per_msg)
     195                        (*trace->per_msg)(trace, thread, type, data, sender);
     196                return;
     197        case MESSAGE_TICK_INTERVAL:
     198                if (trace->callbacks.message_tick_interval)
     199                        (*trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data,  data.uint64);
     200                else if (trace->per_msg)
     201                        (*trace->per_msg)(trace, thread, type, data, sender);
     202                return;
     203        case MESSAGE_STOPPING:
     204                fn = trace->callbacks.message_stopping;
     205                break;
     206        case MESSAGE_RESUMING:
     207                fn = trace->callbacks.message_resuming;
     208                break;
     209        case MESSAGE_PAUSING:
     210                fn = trace->callbacks.message_pausing;
     211                break;
     212
     213        /* These should be unused */
     214        case MESSAGE_DO_PAUSE:
     215        case MESSAGE_DO_STOP:
     216        case MESSAGE_POST_REPORTER:
     217        case MESSAGE_RESULT:
     218        case MESSAGE_PACKET:
     219                return;
     220        case MESSAGE_USER:
     221                break;
     222        }
     223        if (fn)
     224                (*fn)(trace, thread, trace->global_blob, thread->user_data);
     225        else if (trace->per_msg)
     226                (*trace->per_msg)(trace, thread, type, data, sender);
    166227}
    167228
     
    316377DLLEXPORT void libtrace_make_packet_safe(libtrace_packet_t *pkt) {
    317378        // Duplicate the packet in standard malloc'd memory and free the
    318         // original, This is a 1:1 exchange so is ocache count remains unchanged.
     379        // original, This is a 1:1 exchange so the ocache count remains unchanged.
    319380        if (pkt->buf_control != TRACE_CTRL_PACKET) {
    320381                libtrace_packet_t *dup;
     
    324385                /* Copy the duplicated packet over the existing */
    325386                memcpy(pkt, dup, sizeof(libtrace_packet_t));
     387                /* Free the packet structure */
     388                free(dup);
    326389        }
    327390}
     
    377440                t->accepted_packets++;
    378441                libtrace_generic_t data = {.pkt = *packet};
    379                 *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t);
     442                if (trace->callbacks.message_packet)
     443                        *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
     444                else if (trace->per_msg)
     445                        *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t);
    380446                trace_fin_packet(*packet);
    381447        } else {
    382448                assert((*packet)->error == READ_TICK);
    383449                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
    384                 (*trace->per_pkt)(trace, t, MESSAGE_TICK_COUNT, data, t);
     450                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
    385451        }
    386452        return 0;
     
    449515
    450516        /* Let the user thread know we are going to pause */
    451         (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
     517        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
    452518
    453519        /* Send through any remaining packets (or messages) without delay */
     
    490556        /* Now we do the actual pause, this returns when we resumed */
    491557        trace_thread_pause(trace, t);
    492         (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
     558        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
    493559        return 1;
    494560}
     
    535601
    536602        /* Let the per_packet function know we have started */
    537         (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
    538         (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
     603        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
     604        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
    539605
    540606        for (;;) {
     
    556622                                        goto eof;
    557623                        }
    558                         (*trace->per_pkt)(trace, t, message.code, message.data, message.sender);
     624                        (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
     625                        (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
    559626                        /* Continue and the empty messages out before packets */
    560627                        continue;
     
    619686
    620687        // Let the per_packet function know we have stopped
    621         (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
    622         (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t);
     688        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
     689        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
    623690
    624691        // Free any remaining packets
     
    12461313 */
    12471314static int trace_prestart(libtrace_t * libtrace, void *global_blob,
    1248                           fn_per_pkt per_pkt, fn_reporter reporter) {
     1315                          fn_cb_msg per_msg, fn_reporter reporter) {
    12491316        int i, err = 0;
    12501317        if (libtrace->state != STATE_PAUSED) {
     
    12891356
    12901357        /* Update functions if requested */
    1291         if (per_pkt)
    1292                 libtrace->per_pkt = per_pkt;
    1293         assert(libtrace->per_pkt);
     1358        if (per_msg)
     1359                libtrace->per_msg = per_msg;
     1360        assert(libtrace->per_msg);
    12941361        if (reporter)
    12951362                libtrace->reporter = reporter;
     
    15131580
    15141581DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    1515                            fn_per_pkt per_pkt, fn_reporter reporter) {
     1582                           fn_cb_msg per_msg, fn_reporter reporter) {
    15161583        int i;
    15171584        int ret = -1;
     
    15261593
    15271594        if (libtrace->state == STATE_PAUSED) {
    1528                 ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
     1595                ret = trace_prestart(libtrace, global_blob, per_msg, reporter);
    15291596                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    15301597                return ret;
     
    15411608        /* Store the user defined things against the trace */
    15421609        libtrace->global_blob = global_blob;
    1543         libtrace->per_pkt = per_pkt;
     1610        libtrace->per_msg = per_msg;
    15441611        libtrace->reporter = reporter;
    15451612        /* And zero other fields */
     
    17201787}
    17211788
     1789DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) {
     1790        libtrace->callbacks.message_starting = handler;
     1791        return 0;
     1792}
     1793
     1794DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) {
     1795        libtrace->callbacks.message_pausing = handler;
     1796        return 0;
     1797}
     1798
     1799DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) {
     1800        libtrace->callbacks.message_resuming = handler;
     1801        return 0;
     1802}
     1803
     1804DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) {
     1805        libtrace->callbacks.message_stopping = handler;
     1806        return 0;
     1807}
     1808
     1809DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) {
     1810        libtrace->callbacks.message_packet = handler;
     1811        return 0;
     1812}
     1813
     1814DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) {
     1815        libtrace->callbacks.message_first_packet = handler;
     1816        return 0;
     1817}
     1818
     1819DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) {
     1820        libtrace->callbacks.message_tick_count = handler;
     1821        return 0;
     1822}
     1823
     1824DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) {
     1825        libtrace->callbacks.message_tick_interval = handler;
     1826        return 0;
     1827}
     1828
    17221829/*
    17231830 * Pauses a trace, this should only be called by the main thread
     
    17281835 *
    17291836 * Once done you should be able to modify the trace setup and call pstart again
    1730  * TODO handle changing thread numbers
     1837 * TODO add support to change the number of threads.
    17311838 */
    17321839DLLEXPORT int trace_ppause(libtrace_t *libtrace)
  • m4/attributes.m4

    r9b42f3e r4007dbb  
    154154])
    155155
     156AC_DEFUN([gcc_TLS],
     157[
     158  HAVE_TLS=0
     159  if test -n "$CC"; then
     160    AC_CACHE_CHECK([if compiler supports TLS __thread],
     161      [lt_cv_attribute_tls],
     162      [
     163       AC_COMPILE_IFELSE([AC_LANG_SOURCE(
     164         [static __thread int apples;])],
     165         [lt_cv_attribute_tls=yes],
     166         [lt_cv_attribute_tls=no]
     167       )
     168      ])
     169    if test x$lt_cv_attribute_tls = xyes; then
     170      HAVE_TLS=1
     171    fi
     172  fi
     173  AC_SUBST([HAVE_TLS])
     174  AC_DEFINE_UNQUOTED([HAVE_TLS], [$HAVE_TLS],
     175    [Define to 1 or 0, depending on whether the compiler supports tls via __thread.])
     176])
    156177
  • tools/tracestats/tracestats_parallel.c

    rdfbdda7a r4007dbb  
    8787
    8888
    89 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    90                         int mesg, libtrace_generic_t data,
     89static void* per_packet(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     90                        int mesg UNUSED, libtrace_generic_t data UNUSED,
    9191                        libtrace_thread_t *sender UNUSED)
    9292{
    93         /* Using first entry as total and those after for filter counts */
    94         static __thread statistics_t * results = NULL;
    95         int i, wlen;
    96         libtrace_stat_t *stats;
    97         libtrace_generic_t gen;
    98 
    99         switch (mesg) {
    100         case MESSAGE_PACKET:
    101                 /* Apply filters to every packet note the result */
    102                 wlen = trace_get_wire_length(data.pkt);
    103                 for(i=0;i<filter_count;++i) {
    104                         if (filters[i].filter == NULL)
    105                                 continue;
    106                         if(trace_apply_filter(filters[i].filter,data.pkt) > 0) {
    107                                 results[i+1].count++;
    108                                 results[i+1].bytes+=wlen;
    109                         }
    110                         if (trace_is_err(trace)) {
    111                                 trace_perror(trace, "trace_apply_filter");
    112                                 fprintf(stderr, "Removing filter from filterlist\n");
    113                                 /* This is a race, but will be atomic */
    114                                 filters[i].filter = NULL;
    115                         }
    116                 }
    117                 results[0].count++;
    118                 results[0].bytes +=wlen;
    119                 return data.pkt;
    120         case MESSAGE_STARTING:
    121                 /* Allocate space to hold a total count and one for each filter */
    122                 results = calloc(1, sizeof(statistics_t) * (filter_count + 1));
    123                 break;
    124         case MESSAGE_STOPPING:
    125                 /* We only output one result per thread with the key 0 when the
    126                  * trace is over. */
    127                 gen.ptr = results;
    128                 trace_publish_result(trace, t, 0, gen, RESULT_USER);
    129                 break;
    130         default:
    131                 break;
    132         }
    13393        return NULL;
    13494}
     
    184144}
    185145
     146
     147static void* fn_starting(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, void *global UNUSED) {
     148        /* Allocate space to hold a total count and one for each filter */
     149        return calloc(1, sizeof(statistics_t) * (filter_count + 1));
     150}
     151
     152
     153static void fn_stopping(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     154                        void *global UNUSED, void*tls) {
     155        statistics_t *results = tls;
     156        libtrace_generic_t gen;
     157        /* We only output one result per thread with the key 0 when the
     158         * trace is over. */
     159        gen.ptr = results;
     160        trace_publish_result(trace, t, 0, gen, RESULT_USER);
     161}
     162
     163static libtrace_packet_t* fn_packet(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     164                   void *global UNUSED, void*tls, libtrace_packet_t *pkt) {
     165        statistics_t *results = tls;
     166        int i, wlen;
     167
     168        /* Apply filters to every packet note the result */
     169        wlen = trace_get_wire_length(pkt);
     170        for(i=0;i<filter_count;++i) {
     171                if (filters[i].filter == NULL)
     172                        continue;
     173                if(trace_apply_filter(filters[i].filter,pkt) > 0) {
     174                        results[i+1].count++;
     175                        results[i+1].bytes+=wlen;
     176                }
     177                if (trace_is_err(trace)) {
     178                        trace_perror(trace, "trace_apply_filter");
     179                        fprintf(stderr, "Removing filter from filterlist\n");
     180                        /* This is a race, but will be atomic */
     181                        filters[i].filter = NULL;
     182                }
     183        }
     184        results[0].count++;
     185        results[0].bytes +=wlen;
     186        return pkt;
     187}
     188
    186189/* Process a trace, counting packets that match filter(s) */
    187190static void run_trace(char *uri, char *config, char *config_file)
     
    212215                }
    213216        }
     217
     218        trace_cb_packet(trace, fn_packet);
     219        trace_cb_starting(trace, fn_starting);
     220        trace_cb_stopping(trace, fn_stopping);
    214221
    215222        /* Start the trace as a parallel trace */
Note: See TracChangeset for help on using the changeset viewer.