Ignore:
Timestamp:
08/21/15 11:22:30 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
eea427f
Parents:
76291d1
Message:

Updates the new interface to be more complete

This should work around any issues with systems without thread support.
This still remains compatible with existing code.
Examples/tools/tests still need to be updated to make use of the new interface.
And tests also need to be updated.
Adds debug memory stats as an option to configure.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r76291d1 r4007dbb  
    113113};
    114114
     115
     116#ifdef ENABLE_MEM_STATS
    115117// Grrr gcc wants this spelt out
    116118__thread struct mem_stats mem_hits = {{0},{0},{0},{0}};
    117119
     120
    118121static void print_memory_stats() {
    119 #if 0
    120122        uint64_t total;
    121123#if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__)
     
    163165                                total, (double) mem_hits.writebulk.miss / (double) total * 100.0);
    164166        }
     167}
     168#else
     169static void print_memory_stats() {}
    165170#endif
     171
     172static const libtrace_generic_t gen_zero = {0};
     173
     174/* This should optimise away the switch to nothing in the explict cases */
     175static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type,
     176                                libtrace_generic_t data, libtrace_thread_t *sender) {
     177        fn_cb_dataless fn = NULL;
     178        switch (type) {
     179        case MESSAGE_STARTING:
     180                if (trace->callbacks.message_starting)
     181                        thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob);
     182                else if (trace->per_msg)
     183                        (*trace->per_msg)(trace, thread, type, data, sender);
     184                return;
     185        case MESSAGE_FIRST_PACKET:
     186                if (trace->callbacks.message_first_packet)
     187                        (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender);
     188                else if (trace->per_msg)
     189                        (*trace->per_msg)(trace, thread, type, data, sender);
     190                return;
     191        case MESSAGE_TICK_COUNT:
     192                if (trace->callbacks.message_tick_count)
     193                        (*trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64);
     194                else if (trace->per_msg)
     195                        (*trace->per_msg)(trace, thread, type, data, sender);
     196                return;
     197        case MESSAGE_TICK_INTERVAL:
     198                if (trace->callbacks.message_tick_interval)
     199                        (*trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data,  data.uint64);
     200                else if (trace->per_msg)
     201                        (*trace->per_msg)(trace, thread, type, data, sender);
     202                return;
     203        case MESSAGE_STOPPING:
     204                fn = trace->callbacks.message_stopping;
     205                break;
     206        case MESSAGE_RESUMING:
     207                fn = trace->callbacks.message_resuming;
     208                break;
     209        case MESSAGE_PAUSING:
     210                fn = trace->callbacks.message_pausing;
     211                break;
     212
     213        /* These should be unused */
     214        case MESSAGE_DO_PAUSE:
     215        case MESSAGE_DO_STOP:
     216        case MESSAGE_POST_REPORTER:
     217        case MESSAGE_RESULT:
     218        case MESSAGE_PACKET:
     219                return;
     220        case MESSAGE_USER:
     221                break;
     222        }
     223        if (fn)
     224                (*fn)(trace, thread, trace->global_blob, thread->user_data);
     225        else if (trace->per_msg)
     226                (*trace->per_msg)(trace, thread, type, data, sender);
    166227}
    167228
     
    378439                libtrace_generic_t data = {.pkt = *packet};
    379440                if (trace->callbacks.message_packet)
    380                         *packet = (*trace->callbacks.message_packet)(trace, t, data, trace->global_blob, t->user_data);
    381                 else
    382                         *packet = (*trace->per_pkt)(trace, t, MESSAGE_PACKET, data, t);
     441                        *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
     442                else if (trace->per_msg)
     443                        *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t);
    383444                trace_fin_packet(*packet);
    384445        } else {
    385446                assert((*packet)->error == READ_TICK);
    386447                libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)};
    387                 (*trace->per_pkt)(trace, t, MESSAGE_TICK_COUNT, data, t);
     448                send_message(trace, t, MESSAGE_TICK_COUNT, data, t);
    388449        }
    389450        return 0;
     
    452513
    453514        /* Let the user thread know we are going to pause */
    454         (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
     515        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
    455516
    456517        /* Send through any remaining packets (or messages) without delay */
     
    493554        /* Now we do the actual pause, this returns when we resumed */
    494555        trace_thread_pause(trace, t);
    495         (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
     556        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
    496557        return 1;
    497558}
     
    538599
    539600        /* Let the per_packet function know we have started */
    540         if (trace->callbacks.message_starting)
    541                 (*trace->callbacks.message_starting)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data);
    542         else
    543                 (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
    544 
    545         if (trace->callbacks.message_resuming)
    546                 (*trace->callbacks.message_resuming)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data);
    547         else
    548                 (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
     601        send_message(trace, t, MESSAGE_STARTING, gen_zero, t);
     602        send_message(trace, t, MESSAGE_RESUMING, gen_zero, t);
    549603
    550604        for (;;) {
     
    566620                                        goto eof;
    567621                        }
    568                         (*trace->per_pkt)(trace, t, message.code, message.data, message.sender);
     622                        (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
     623                        (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
    569624                        /* Continue and the empty messages out before packets */
    570625                        continue;
     
    629684
    630685        // Let the per_packet function know we have stopped
    631         if (trace->callbacks.message_pausing)
    632                 (*trace->callbacks.message_pausing)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data);
    633         else
    634                 (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);
    635         if (trace->callbacks.message_stopping)
    636                 (*trace->callbacks.message_stopping)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data);
    637         else
    638                 (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t);
    639 
     686        send_message(trace, t, MESSAGE_PAUSING, gen_zero, t);
     687        send_message(trace, t, MESSAGE_STOPPING, gen_zero, t);
    640688
    641689        // Free any remaining packets
     
    12631311 */
    12641312static int trace_prestart(libtrace_t * libtrace, void *global_blob,
    1265                           fn_per_pkt per_pkt, fn_reporter reporter) {
     1313                          fn_cb_msg per_msg, fn_reporter reporter) {
    12661314        int i, err = 0;
    12671315        if (libtrace->state != STATE_PAUSED) {
     
    13061354
    13071355        /* Update functions if requested */
    1308         if (per_pkt)
    1309                 libtrace->per_pkt = per_pkt;
    1310         assert(libtrace->per_pkt);
     1356        if (per_msg)
     1357                libtrace->per_msg = per_msg;
     1358        assert(libtrace->per_msg);
    13111359        if (reporter)
    13121360                libtrace->reporter = reporter;
     
    15301578
    15311579DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    1532                            fn_per_pkt per_pkt, fn_reporter reporter) {
     1580                           fn_cb_msg per_msg, fn_reporter reporter) {
    15331581        int i;
    15341582        int ret = -1;
     
    15431591
    15441592        if (libtrace->state == STATE_PAUSED) {
    1545                 ret = trace_prestart(libtrace, global_blob, per_pkt, reporter);
     1593                ret = trace_prestart(libtrace, global_blob, per_msg, reporter);
    15461594                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    15471595                return ret;
     
    15581606        /* Store the user defined things against the trace */
    15591607        libtrace->global_blob = global_blob;
    1560         libtrace->per_pkt = per_pkt;
     1608        libtrace->per_msg = per_msg;
    15611609        libtrace->reporter = reporter;
    15621610        /* And zero other fields */
     
    17371785}
    17381786
    1739 DLLEXPORT int trace_set_handler(libtrace_t *libtrace, enum libtrace_messages message, fn_handler handler) {
    1740         switch (message) {
    1741         case MESSAGE_STARTING:
    1742                 libtrace->callbacks.message_starting = handler;
    1743                 return 0;
    1744         case MESSAGE_STOPPING:
    1745                 libtrace->callbacks.message_stopping = handler;
    1746                 return 0;
    1747         case MESSAGE_RESUMING:
    1748                 libtrace->callbacks.message_resuming = handler;
    1749                 return 0;
    1750         case MESSAGE_PAUSING:
    1751                 libtrace->callbacks.message_pausing = handler;
    1752                 return 0;
    1753         case MESSAGE_PACKET:
    1754                 libtrace->callbacks.message_packet = handler;
    1755                 return 0;
    1756         default:
    1757                 return -1;
    1758         }
    1759         return -1;
    1760 }
    1761 
     1787DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) {
     1788        libtrace->callbacks.message_starting = handler;
     1789        return 0;
     1790}
     1791
     1792DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) {
     1793        libtrace->callbacks.message_pausing = handler;
     1794        return 0;
     1795}
     1796
     1797DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) {
     1798        libtrace->callbacks.message_resuming = handler;
     1799        return 0;
     1800}
     1801
     1802DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) {
     1803        libtrace->callbacks.message_stopping = handler;
     1804        return 0;
     1805}
     1806
     1807DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) {
     1808        libtrace->callbacks.message_packet = handler;
     1809        return 0;
     1810}
     1811
     1812DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) {
     1813        libtrace->callbacks.message_first_packet = handler;
     1814        return 0;
     1815}
     1816
     1817DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) {
     1818        libtrace->callbacks.message_tick_count = handler;
     1819        return 0;
     1820}
     1821
     1822DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) {
     1823        libtrace->callbacks.message_tick_interval = handler;
     1824        return 0;
     1825}
    17621826
    17631827/*
     
    17691833 *
    17701834 * Once done you should be able to modify the trace setup and call pstart again
    1771  * TODO handle changing thread numbers
     1835 * TODO add support to change the number of threads.
    17721836 */
    17731837DLLEXPORT int trace_ppause(libtrace_t *libtrace)
Note: See TracChangeset for help on using the changeset viewer.