Ignore:
Timestamp:
09/11/15 15:00:27 (6 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
322c516
Parents:
8c7490fe
Message:

Reworked callback API and removed old per_msg and reporter functions

Updated tracertstats to use the new callback API.

Extended the callback approach to the reporter thread as well as the per
packet threads.

Added libtrace_callback_set_t structure, which is used to register the
user callback functions.

Added callback functionality for MESSAGE_RESULT (needed now that reporter
threads also do callbacks) and MESSAGE_USER (for user-defined messages). The
MESSAGE_USER callback is essentially the same as the old per_msg function
style.

Updated combiners to use send_message to pass results to the reporter thread.
send_message itself is now no longer static, so that combiners can use it.

Disabled building of tracestats_parallel as it was using the older version
of the callback API. Will update in a future commit.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    r9346e4a rf625817  
    8080
    8181#include "libtrace.h"
    82 #include "libtrace_int.h"
     82#include "libtrace_parallel.h"
    8383
    8484#ifdef HAVE_PCAP_BPF_H
     
    173173
    174174/* This should optimise away the switch to nothing in the explict cases */
    175 static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type,
    176                                 libtrace_generic_t data, libtrace_thread_t *sender) {
     175inline void send_message(libtrace_t *trace, libtrace_thread_t *thread,
     176                const enum libtrace_messages type,
     177                libtrace_generic_t data, libtrace_thread_t *sender) {
     178
    177179        fn_cb_dataless fn = NULL;
    178         switch (type) {
     180        enum libtrace_messages switchtype;
     181        libtrace_callback_set_t *cbs = NULL;
     182
     183        if (thread == &trace->reporter_thread) {
     184                cbs = trace->reporter_cbs;
     185        } else {
     186                cbs = trace->perpkt_cbs;
     187        }
     188
     189        if (cbs == NULL)
     190                return;
     191
     192        if (type >= MESSAGE_USER)
     193                switchtype = MESSAGE_USER;
     194        else
     195                switchtype = (enum libtrace_messages) type;
     196
     197        switch (switchtype) {
    179198        case MESSAGE_STARTING:
    180                 if (trace->callbacks.message_starting)
    181                         thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob);
    182                 else if (trace->per_msg)
    183                         (*trace->per_msg)(trace, thread, type, data, sender);
     199                if (cbs->message_starting)
     200                        thread->user_data = (*cbs->message_starting)(trace,
     201                                        thread, trace->global_blob);
    184202                return;
    185203        case MESSAGE_FIRST_PACKET:
    186                 if (trace->callbacks.message_first_packet)
    187                         (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender);
    188                 else if (trace->per_msg)
    189                         (*trace->per_msg)(trace, thread, type, data, sender);
     204                if (cbs->message_first_packet)
     205                                (*cbs->message_first_packet)(trace, thread,
     206                                trace->global_blob, thread->user_data,
     207                                data.pkt, sender);
    190208                return;
    191209        case MESSAGE_TICK_COUNT:
    192                 if (trace->callbacks.message_tick_count)
    193                         (*trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64);
    194                 else if (trace->per_msg)
    195                         (*trace->per_msg)(trace, thread, type, data, sender);
     210                if (cbs->message_tick_count)
     211                        (*cbs->message_tick_count)(trace, thread,
     212                                        trace->global_blob, thread->user_data,
     213                                        data.uint64);
    196214                return;
    197215        case MESSAGE_TICK_INTERVAL:
    198                 if (trace->callbacks.message_tick_interval)
    199                         (*trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data,  data.uint64);
    200                 else if (trace->per_msg)
    201                         (*trace->per_msg)(trace, thread, type, data, sender);
     216                if (cbs->message_tick_interval)
     217                        (*cbs->message_tick_interval)(trace, thread,
     218                                        trace->global_blob, thread->user_data,
     219                                        data.uint64);
    202220                return;
    203221        case MESSAGE_STOPPING:
    204                 fn = trace->callbacks.message_stopping;
     222                fn = cbs->message_stopping;
    205223                break;
    206224        case MESSAGE_RESUMING:
    207                 fn = trace->callbacks.message_resuming;
     225                fn = cbs->message_resuming;
    208226                break;
    209227        case MESSAGE_PAUSING:
    210                 fn = trace->callbacks.message_pausing;
     228                fn = cbs->message_pausing;
    211229                break;
     230        case MESSAGE_USER:
     231                if (cbs->message_user)
     232                        (*cbs->message_user)(trace, thread, trace->global_blob,
     233                                        thread->user_data, type, data);
     234                return;
     235        case MESSAGE_RESULT:
     236                if (cbs->message_result)
     237                        (*cbs->message_result)(trace, thread,
     238                                        trace->global_blob, thread->user_data,
     239                                        data.res);
    212240
    213241        /* These should be unused */
     
    215243        case MESSAGE_DO_STOP:
    216244        case MESSAGE_POST_REPORTER:
    217         case MESSAGE_RESULT:
    218245        case MESSAGE_PACKET:
    219246                return;
    220         case MESSAGE_USER:
    221                 break;
    222         }
     247        }
     248
    223249        if (fn)
    224250                (*fn)(trace, thread, trace->global_blob, thread->user_data);
    225         else if (trace->per_msg)
    226                 (*trace->per_msg)(trace, thread, type, data, sender);
     251}
     252
     253DLLEXPORT libtrace_callback_set_t *trace_create_callback_set() {
     254        libtrace_callback_set_t *cbset;
     255
     256        cbset = (libtrace_callback_set_t *)malloc(sizeof(libtrace_callback_set_t));
     257        memset(cbset, 0, sizeof(libtrace_callback_set_t));
     258        return cbset;
     259}
     260
     261DLLEXPORT void trace_destroy_callback_set(libtrace_callback_set_t *cbset) {
     262        free(cbset);
    227263}
    228264
     
    239275{
    240276        assert(libtrace->state != STATE_NEW);
    241         return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter;
     277        return libtrace->reporter_thread.type == THREAD_REPORTER && libtrace->reporter_cbs;
    242278}
    243279
     
    441477                }
    442478                t->accepted_packets++;
    443                 libtrace_generic_t data = {.pkt = *packet};
    444                 if (trace->callbacks.message_packet)
    445                         *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
    446                 else if (trace->per_msg)
    447                         *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t);
     479                if (trace->perpkt_cbs->message_packet)
     480                        *packet = (*trace->perpkt_cbs->message_packet)(trace, t, trace->global_blob, t->user_data, *packet);
    448481                trace_fin_packet(*packet);
    449482        } else {
     
    624657                                        goto eof;
    625658                        }
    626                         (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
    627                         (*trace->per_msg)(trace, t, message.code, message.data, message.sender);
     659                        send_message(trace, t, message.code, message.data,
     660                                        message.sender);
    628661                        /* Continue and the empty messages out before packets */
    629662                        continue;
     
    10671100        }
    10681101
    1069         (*trace->reporter)(trace, MESSAGE_STARTING, (libtrace_generic_t) {0}, t);
    1070         (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
     1102        send_message(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t);
     1103        send_message(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);
    10711104
    10721105        while (!trace_has_finished(trace)) {
     
    10851118                                assert(trace->combiner.pause);
    10861119                                trace->combiner.pause(trace, &trace->combiner);
    1087                                 (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t);
     1120                                send_message(trace, t, MESSAGE_PAUSING,
     1121                                                (libtrace_generic_t) {0}, t);
    10881122                                trace_thread_pause(trace, t);
    1089                                 (*trace->reporter)(trace, MESSAGE_RESUMING, (libtrace_generic_t) {0}, t);
     1123                                send_message(trace, t, MESSAGE_RESUMING,
     1124                                                (libtrace_generic_t) {0}, t);
    10901125                                break;
    10911126                default:
    1092                         (*trace->reporter)(trace, message.code, message.data, message.sender);
     1127                        send_message(trace, t, message.code, message.data,
     1128                                        message.sender);
    10931129                }
    10941130        }
     
    10981134
    10991135        // GOODBYE
    1100         (*trace->reporter)(trace, MESSAGE_PAUSING, (libtrace_generic_t) {0}, t);
    1101         (*trace->reporter)(trace, MESSAGE_STOPPING, (libtrace_generic_t) {0}, t);
     1136        send_message(trace, t, MESSAGE_PAUSING,(libtrace_generic_t) {0}, t);
     1137        send_message(trace, t, MESSAGE_STOPPING,(libtrace_generic_t) {0}, t);
    11021138
    11031139        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
     
    13151351 */
    13161352static int trace_prestart(libtrace_t * libtrace, void *global_blob,
    1317                           fn_cb_msg per_msg, fn_reporter reporter) {
     1353                          libtrace_callback_set_t *per_packet_cbs,
     1354                          libtrace_callback_set_t *reporter_cbs) {
    13181355        int i, err = 0;
    13191356        if (libtrace->state != STATE_PAUSED) {
     
    13581395
    13591396        /* Update functions if requested */
    1360         if (per_msg)
    1361                 libtrace->per_msg = per_msg;
    1362         assert(libtrace->per_msg);
    1363         if (reporter)
    1364                 libtrace->reporter = reporter;
    13651397        if(global_blob)
    13661398                libtrace->global_blob = global_blob;
     1399
     1400        if (per_packet_cbs) {
     1401                if (libtrace->perpkt_cbs)
     1402                        trace_destroy_callback_set(libtrace->perpkt_cbs);
     1403                libtrace->perpkt_cbs = trace_create_callback_set();
     1404                memcpy(libtrace->perpkt_cbs, per_packet_cbs,
     1405                                sizeof(libtrace_callback_set_t));
     1406        }
     1407
     1408        if (reporter_cbs) {
     1409                if (libtrace->reporter_cbs)
     1410                        trace_destroy_callback_set(libtrace->reporter_cbs);
     1411
     1412                libtrace->reporter_cbs = trace_create_callback_set();
     1413                memcpy(libtrace->reporter_cbs, reporter_cbs,
     1414                                sizeof(libtrace_callback_set_t));
     1415        }
    13671416
    13681417        if (trace_is_parallel(libtrace)) {
     
    15861635
    15871636DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob,
    1588                            fn_cb_msg per_msg, fn_reporter reporter) {
     1637                           libtrace_callback_set_t *per_packet_cbs,
     1638                           libtrace_callback_set_t *reporter_cbs) {
    15891639        int i;
    15901640        int ret = -1;
     
    15991649
    16001650        if (libtrace->state == STATE_PAUSED) {
    1601                 ret = trace_prestart(libtrace, global_blob, per_msg, reporter);
     1651                ret = trace_prestart(libtrace, global_blob, per_packet_cbs,
     1652                                reporter_cbs);
    16021653                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    16031654                return ret;
     
    16141665        /* Store the user defined things against the trace */
    16151666        libtrace->global_blob = global_blob;
    1616         libtrace->per_msg = per_msg;
    1617         libtrace->reporter = reporter;
     1667
     1668        /* Save a copy of the callbacks in case the user tries to change them
     1669         * on us later */
     1670        if (!per_packet_cbs) {
     1671                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "trace_pstart "
     1672                                "requires a non-NULL set of per packet "
     1673                                "callbacks.");
     1674                goto cleanup_none;
     1675        }
     1676
     1677        if (per_packet_cbs->message_packet == NULL) {
     1678                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "The per "
     1679                                "packet callbacks must include a handler "
     1680                                "for a packet. Please set this using "
     1681                                "trace_set_packet_cb().");
     1682                goto cleanup_none;
     1683        }
     1684
     1685        libtrace->perpkt_cbs = trace_create_callback_set();
     1686        memcpy(libtrace->perpkt_cbs, per_packet_cbs, sizeof(libtrace_callback_set_t));
     1687       
     1688        if (reporter_cbs) {
     1689                libtrace->reporter_cbs = trace_create_callback_set();
     1690                memcpy(libtrace->reporter_cbs, reporter_cbs, sizeof(libtrace_callback_set_t));
     1691        }
     1692
     1693       
     1694
     1695
    16181696        /* And zero other fields */
    16191697        for (i = 0; i < THREAD_STATE_MAX; ++i) {
     
    16941772
    16951773        /* Start the reporter thread */
    1696         if (reporter) {
     1774        if (reporter_cbs) {
    16971775                if (libtrace->combiner.initialise)
    16981776                        libtrace->combiner.initialise(libtrace, &libtrace->combiner);
     
    17931871}
    17941872
    1795 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) {
    1796         libtrace->callbacks.message_starting = handler;
    1797         return 0;
    1798 }
    1799 
    1800 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) {
    1801         libtrace->callbacks.message_pausing = handler;
    1802         return 0;
    1803 }
    1804 
    1805 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) {
    1806         libtrace->callbacks.message_resuming = handler;
    1807         return 0;
    1808 }
    1809 
    1810 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) {
    1811         libtrace->callbacks.message_stopping = handler;
    1812         return 0;
    1813 }
    1814 
    1815 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) {
    1816         libtrace->callbacks.message_packet = handler;
    1817         return 0;
    1818 }
    1819 
    1820 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) {
    1821         libtrace->callbacks.message_first_packet = handler;
    1822         return 0;
    1823 }
    1824 
    1825 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) {
    1826         libtrace->callbacks.message_tick_count = handler;
    1827         return 0;
    1828 }
    1829 
    1830 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) {
    1831         libtrace->callbacks.message_tick_interval = handler;
     1873DLLEXPORT int trace_set_starting_cb(libtrace_callback_set_t *cbset,
     1874                fn_cb_starting handler) {
     1875        cbset->message_starting = handler;
     1876        return 0;
     1877}
     1878
     1879DLLEXPORT int trace_set_pausing_cb(libtrace_callback_set_t *cbset,
     1880                fn_cb_dataless handler) {
     1881        cbset->message_pausing = handler;
     1882        return 0;
     1883}
     1884
     1885DLLEXPORT int trace_set_resuming_cb(libtrace_callback_set_t *cbset,
     1886                fn_cb_dataless handler) {
     1887        cbset->message_resuming = handler;
     1888        return 0;
     1889}
     1890
     1891DLLEXPORT int trace_set_stopping_cb(libtrace_callback_set_t *cbset,
     1892                fn_cb_dataless handler) {
     1893        cbset->message_stopping = handler;
     1894        return 0;
     1895}
     1896
     1897DLLEXPORT int trace_set_packet_cb(libtrace_callback_set_t *cbset,
     1898                fn_cb_packet handler) {
     1899        cbset->message_packet = handler;
     1900        return 0;
     1901}
     1902
     1903DLLEXPORT int trace_set_first_packet_cb(libtrace_callback_set_t *cbset,
     1904                fn_cb_first_packet handler) {
     1905        cbset->message_first_packet = handler;
     1906        return 0;
     1907}
     1908
     1909DLLEXPORT int trace_set_tick_count_cb(libtrace_callback_set_t *cbset,
     1910                fn_cb_tick handler) {
     1911        cbset->message_tick_count = handler;
     1912        return 0;
     1913}
     1914
     1915DLLEXPORT int trace_set_tick_interval_cb(libtrace_callback_set_t *cbset,
     1916                fn_cb_tick handler) {
     1917        cbset->message_tick_interval = handler;
     1918        return 0;
     1919}
     1920
     1921DLLEXPORT int trace_set_result_cb(libtrace_callback_set_t *cbset,
     1922                fn_cb_result handler) {
     1923        cbset->message_result = handler;
     1924        return 0;
     1925}
     1926
     1927DLLEXPORT int trace_set_user_message_cb(libtrace_callback_set_t *cbset,
     1928                fn_cb_usermessage handler) {
     1929        cbset->message_user = handler;
    18321930        return 0;
    18331931}
Note: See TracChangeset for help on using the changeset viewer.